Skip to content

Commit

Permalink
Adding local work stealing scheduler that is based on message passing…
Browse files Browse the repository at this point in the history
… internally

- Using uniform_int_distribution with proper bounds
- Removing queue index from thread_queues as it was unused
  - flyby: remove commented out options from .clang-format
- Renaming workstealing --> workrequesting
- Adding adaptive work stealing (steal half/steal one)
  - this makes this scheduler consistently (albeit only slightly) faster than
    the (default) local-priority scheduler
- Adding LIFO and FIFO variations of local work-stealing scheduler
  - flyby: fixing HPX_WITH_SWAP_CONTEXT_EMULATION
  - flyby: minor changes to fibonacci_local example
- Adding high- and low- priority queues
  - flyby: cache_line_data now does not generate warnings errors if padding is not needed
  • Loading branch information
hkaiser committed Apr 13, 2022
1 parent 85fdb87 commit 26bbc03
Show file tree
Hide file tree
Showing 43 changed files with 2,114 additions and 281 deletions.
3 changes: 2 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ jobs:
-DHPX_WITH_CHECK_MODULE_DEPENDENCIES=On \
-DCMAKE_EXPORT_COMPILE_COMMANDS=On \
-DHPX_WITH_DOCUMENTATION=On \
-DHPX_WITH_DOCUMENTATION_OUTPUT_FORMATS="${DOCUMENTATION_OUTPUT_FORMATS}"
-DHPX_WITH_DOCUMENTATION_OUTPUT_FORMATS="${DOCUMENTATION_OUTPUT_FORMATS}" \
-DHPX_WITH_TESTS_COMMAND_LINE=--hpx:queuing=local-workrequesting-fifo
- persist_to_workspace:
root: /hpx
paths:
Expand Down
4 changes: 2 additions & 2 deletions .clang-format
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (c) 2016 Thomas Heller
# Copyright (c) 2016-2018 Hartmut Kaiser
# Copyright (c) 2016-2019 Hartmut Kaiser
#
# SPDX-License-Identifier: BSL-1.0
# Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand Down Expand Up @@ -105,7 +105,7 @@ PenaltyExcessCharacter: 1000000
PenaltyReturnTypeOnItsOwnLine: 200
PointerAlignment: Left
ReflowComments: false
SortIncludes: true
SortIncludes: true
SpaceAfterCStyleCast: true
SpaceAfterTemplateKeyword: true
SpaceBeforeAssignmentOperators: true
Expand Down
3 changes: 2 additions & 1 deletion .jenkins/lsu/env-clang-13.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ configure_extra_options+=" -DHPX_WITH_COMPILER_WARNINGS_AS_ERRORS=ON"
configure_extra_options+=" -DHPX_WITH_PARCELPORT_MPI=ON"
configure_extra_options+=" -DHPX_WITH_PARCELPORT_LCI=ON"
configure_extra_options+=" -DHPX_WITH_FETCH_LCI=ON"
configure_extra_options+=" -DLCI_SERVER=ibv"
configure_extra_options+=" -DHPX_WITH_LOGGING=OFF"
configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:queuing=local-workrequesting-fifo"
configure_extra_options+=" -DLCI_SERVER=ibv"

# Make sure HWLOC does not report 'cores'. This is purely an option to enable
# testing the topology code under conditions close to those on FreeBSD.
Expand Down
2 changes: 2 additions & 0 deletions .jenkins/lsu/env-gcc-11.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module load openmpi
export HPXRUN_RUNWRAPPER=srun
export CXX_STD="20"

configure_extra_options+=" -DCMAKE_BUILD_TYPE=${build_type}"
configure_extra_options+=" -DHPX_WITH_CXX_STANDARD=${CXX_STD}"
configure_extra_options+=" -DHPX_WITH_MALLOC=system"
configure_extra_options+=" -DHPX_WITH_FETCH_ASIO=ON"
Expand All @@ -22,4 +23,5 @@ configure_extra_options+=" -DHPX_WITH_COMPILER_WARNINGS_AS_ERRORS=ON"
configure_extra_options+=" -DHPX_WITH_PARCELPORT_MPI=ON"
configure_extra_options+=" -DHPX_WITH_PARCELPORT_LCI=ON"
configure_extra_options+=" -DHPX_WITH_FETCH_LCI=ON"
configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:queuing=local-workrequesting-fifo"
configure_extra_options+=" -DLCI_SERVER=ibv"
7 changes: 7 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1388,6 +1388,13 @@ hpx_option(
ADVANCED
)

hpx_option(
HPX_WITH_TESTS_COMMAND_LINE STRING
"Add given command line options to all tests run" ""
CATEGORY "Debugging"
ADVANCED
)

hpx_option(
HPX_WITH_TESTS_MAX_THREADS_PER_LOCALITY
STRING
Expand Down
5 changes: 5 additions & 0 deletions cmake/HPX_AddTest.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ function(add_hpx_test category name)
)
endif()

# add additional command line arguments to all test executions
if(NOT x${HPX_WITH_TESTS_COMMAND_LINE} STREQUAL x"")
set(args ${args} "${HPX_WITH_TESTS_COMMAND_LINE}")
endif()

if(${HPX_WITH_PARALLEL_TESTS_BIND_NONE}
AND NOT run_serial
AND NOT "${name}_RUNWRAPPER"
Expand Down
36 changes: 21 additions & 15 deletions docs/sphinx/manual/hpx_runtime_and_resources.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@
|hpx| thread scheduling policies
================================

The HPX runtime has five thread scheduling policies: local-priority,
static-priority, local, static and abp-priority. These policies can be specified
from the command line using the command line option :option:`--hpx:queuing`. In
order to use a particular scheduling policy, the runtime system must be built
with the appropriate scheduler flag turned on (e.g. ``cmake
-DHPX_THREAD_SCHEDULERS=local``, see :ref:`cmake_variables` for more
information).
The HPX runtime has several thread scheduling policies: local-priority-fifo,
local-priority-lifo, static-priority, local, static, local-workrequesting-fifo,
local-workrequesting-lifo, and abp-priority. These policies can be specified
from the command line using the command line option :option:`--hpx:queuing`.

Priority local scheduling policy (default policy)
-------------------------------------------------
Expand Down Expand Up @@ -54,8 +51,6 @@ Static priority scheduling policy
---------------------------------

* invoke using: :option:`--hpx:queuing`\ ``=static-priority`` (or ``-qs``)
* flag to turn on for build: ``HPX_THREAD_SCHEDULERS=all`` or
``HPX_THREAD_SCHEDULERS=static-priority``

The static scheduling policy maintains one queue per OS thread from which each
OS thread pulls its tasks (user threads). Threads are distributed in a round
Expand All @@ -65,8 +60,6 @@ Local scheduling policy
-----------------------

* invoke using: :option:`--hpx:queuing`\ ``=local`` (or ``-ql``)
* flag to turn on for build: ``HPX_THREAD_SCHEDULERS=all`` or
``HPX_THREAD_SCHEDULERS=local``

The local scheduling policy maintains one queue per OS thread from which each OS
thread pulls its tasks (user threads).
Expand All @@ -75,8 +68,6 @@ Static scheduling policy
------------------------

* invoke using: :option:`--hpx:queuing`\ ``=static``
* flag to turn on for build: ``HPX_THREAD_SCHEDULERS=all`` or
``HPX_THREAD_SCHEDULERS=static``

The static scheduling policy maintains one queue per OS thread from which each
OS thread pulls its tasks (user threads). Threads are distributed in a round
Expand All @@ -86,8 +77,6 @@ Priority ABP scheduling policy
------------------------------

* invoke using: :option:`--hpx:queuing`\ ``=abp-priority-fifo``
* flag to turn on for build: ``HPX_THREAD_SCHEDULERS=all`` or
``HPX_THREAD_SCHEDULERS=abp-priority``

Priority ABP policy maintains a double ended lock free queue for each OS thread.
By default the number of high priority queues is equal to the number of OS
Expand Down Expand Up @@ -153,6 +142,23 @@ policy use the command line option :option:`--hpx:queuing`\

I see both FIFO and double ended queues in ABP policies?

Work requesting scheduling policies
-----------------------------------

* invoke using: :option:`--hpx:queuing`\ ``=local-workrequesting-fifo``
or using :option:`--hpx:queuing`\ ``=local-workrequesting-lifo``

The work-requesting policies rely on a different mechanism of balancing work
between cores (compared to the other policies listed above). Instead of actively
trying to steal work from other cores, requesting work relies on a less
disruptive mechanism. If a core runs out of work, instead of actively looking at
the queues of neighboring cores, in this case a request is posted to another
core. This core now (whenever it is not busy with other work) either responds to
the original core by sending back work or passes the request on to the next
possible core in the system. In general, this scheme avoids contention on the
work queues as those are always accessed by their own cores only.


The |hpx| resource partitioner
==============================

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1518,7 +1518,9 @@ The predefined command line options for any application using

the queue scheduling policy to use, options are ``local``,
``local-priority-fifo``, ``local-priority-lifo``, ``static``,
``static-priority``, ``abp-priority-fifo`` and ``abp-priority-lifo``
``static-priority``, ``abp-priority-fifo``,
``local-workrequesting-fifo``, ``local-workrequesting-lifo``
and ``abp-priority-lifo``
(default: ``local-priority-fifo``)

.. option:: --hpx:high-priority-threads arg
Expand Down
25 changes: 15 additions & 10 deletions examples/quickstart/fibonacci_local.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

#include <cstdint>
#include <iostream>
#include <string>
#include <utility>
#include <vector>

///////////////////////////////////////////////////////////////////////////////
//[fibonacci
Expand All @@ -26,22 +29,20 @@ std::uint64_t fibonacci(std::uint64_t n)
if (n < 2)
return n;

// Invoking the Fibonacci algorithm twice is inefficient.
// However, we intentionally demonstrate it this way to create some
// heavy workload.

hpx::future<std::uint64_t> n1 = hpx::async(fibonacci, n - 1);
hpx::future<std::uint64_t> n2 = hpx::async(fibonacci, n - 2);
std::uint64_t n2 = fibonacci(n - 2);

return n1.get() +
n2.get(); // wait for the Futures to return their values
return n1.get() + n2; // wait for the Future to return their values
}
//fibonacci]

///////////////////////////////////////////////////////////////////////////////
//[hpx_main
int hpx_main(hpx::program_options::variables_map& vm)
{
hpx::threads::add_scheduler_mode(
hpx::threads::policies::scheduler_mode::fast_idle_mode);

// extract command line argument, i.e. fib(N)
std::uint64_t n = vm["n-value"].as<std::uint64_t>();

Expand All @@ -67,9 +68,13 @@ int main(int argc, char* argv[])
hpx::program_options::options_description desc_commandline(
"Usage: " HPX_APPLICATION_STRING " [options]");

desc_commandline.add_options()("n-value",
hpx::program_options::value<std::uint64_t>()->default_value(10),
"n value for the Fibonacci function");
// clang-format off
desc_commandline.add_options()
("n-value",
hpx::program_options::value<std::uint64_t>()->default_value(10),
"n value for the Fibonacci function")
;
// clang-format on

// Initialize and run HPX
hpx::local::init_params init_args;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,18 +327,24 @@ namespace hpx { namespace local { namespace detail {
"run on (default: 0), valid for "
"--hpx:queuing=local, --hpx:queuing=abp-priority, "
"--hpx:queuing=static, --hpx:queuing=static-priority, "
"--hpx:queuing=local-workrequesting-fifo, "
"--hpx:queuing=local-workrequesting-lifo, "
"and --hpx:queuing=local-priority only")
("hpx:pu-step", value<std::size_t>(),
"the step between used processing unit numbers for this "
"instance of HPX (default: 1), valid for "
"--hpx:queuing=local, --hpx:queuing=abp-priority, "
"--hpx:queuing=static, --hpx:queuing=static-priority "
"--hpx:queuing=static, --hpx:queuing=static-priority, "
"--hpx:queuing=local-workrequesting-fifo, "
"--hpx:queuing=local-workrequesting-lifo, "
"and --hpx:queuing=local-priority only")
("hpx:affinity", value<std::string>(),
"the affinity domain the OS threads will be confined to, "
"possible values: pu, core, numa, machine (default: pu), valid for "
"--hpx:queuing=local, --hpx:queuing=abp-priority, "
"--hpx:queuing=static, --hpx:queuing=static-priority "
"--hpx:queuing=static, --hpx:queuing=static-priority, "
"--hpx:queuing=local-workrequesting-fifo, "
"--hpx:queuing=local-workrequesting-lifo, "
" and --hpx:queuing=local-priority only")
("hpx:bind", value<std::vector<std::string> >()->composing(),
"the detailed affinity description for the OS threads, see "
Expand All @@ -363,13 +369,16 @@ namespace hpx { namespace local { namespace detail {
("hpx:queuing", value<std::string>(),
"the queue scheduling policy to use, options are "
"'local', 'local-priority-fifo','local-priority-lifo', "
"'local-workrequesting-fifo', 'local-workrequesting-lifo' "
"'abp-priority-fifo', 'abp-priority-lifo', 'static', and "
"'static-priority' (default: 'local-priority'; "
"'static-priority' (default: 'local-priority-fifo'; "
"all option values can be abbreviated)")
("hpx:high-priority-threads", value<std::size_t>(),
"the number of operating system threads maintaining a high "
"priority queue (default: number of OS threads), valid for "
"--hpx:queuing=local-priority,--hpx:queuing=static-priority, "
"--hpx:queuing=local-workrequesting-fifo, "
"--hpx:queuing=local-workrequesting-lifo, "
" and --hpx:queuing=abp-priority only)")
("hpx:numa-sensitive", value<std::size_t>()->implicit_value(0),
"makes the local-priority scheduler NUMA sensitive ("
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ namespace hpx { namespace concurrency { namespace detail {
{
if (expected_range.empty())
{
return hpx::nullopt;
return hpx::optional<T>(hpx::nullopt);
}

index = expected_range.first;
Expand All @@ -166,7 +166,7 @@ namespace hpx { namespace concurrency { namespace detail {
{
if (expected_range.empty())
{
return hpx::nullopt;
return hpx::optional<T>(hpx::nullopt);
}

desired_range = expected_range.decrement_last();
Expand Down
23 changes: 16 additions & 7 deletions libs/core/concurrency/include/hpx/concurrency/spinlock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
#include <hpx/modules/itt_notify.hpp>
#include <hpx/thread_support/spinlock.hpp>

#include <utility>

namespace hpx { namespace util {

/// Lockable spinlock class
// Lockable spinlock class
struct spinlock
{
public:
Expand All @@ -25,25 +27,32 @@ namespace hpx { namespace util {
hpx::util::detail::spinlock m;

public:
spinlock(char const* /*desc*/ = nullptr)
spinlock() noexcept
{
HPX_ITT_SYNC_CREATE(this, "util::spinlock", "");
HPX_ITT_SYNC_CREATE(this, "util::spinlock", nullptr);
}

explicit spinlock(char const* desc) noexcept
{
HPX_ITT_SYNC_CREATE(this, "util::spinlock", desc);
}

~spinlock()
{
HPX_ITT_SYNC_DESTROY(this);
}

void lock() noexcept
void lock() noexcept(
noexcept(util::register_lock(std::declval<spinlock*>())))
{
HPX_ITT_SYNC_PREPARE(this);
m.lock();
HPX_ITT_SYNC_ACQUIRED(this);
util::register_lock(this);
}

bool try_lock() noexcept
bool try_lock() noexcept(
noexcept(util::register_lock(std::declval<spinlock*>())))
{
HPX_ITT_SYNC_PREPARE(this);
if (m.try_lock())
Expand All @@ -56,13 +65,13 @@ namespace hpx { namespace util {
return false;
}

void unlock() noexcept
void unlock() noexcept(
noexcept(util::unregister_lock(std::declval<spinlock*>())))
{
HPX_ITT_SYNC_RELEASING(this);
m.unlock();
HPX_ITT_SYNC_RELEASED(this);
util::unregister_lock(this);
}
};

}} // namespace hpx::util
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,7 @@ namespace hpx { namespace resource {
abp_priority_fifo = 5,
abp_priority_lifo = 6,
shared_priority = 7,
local_workrequesting_fifo = 8,
local_workrequesting_lifo = 9,
};
}} // namespace hpx::resource
18 changes: 18 additions & 0 deletions libs/core/resource_partitioner/src/detail_partitioner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ namespace hpx { namespace resource { namespace detail {
case resource::local_priority_lifo:
sched = "local_priority_lifo";
break;
case resource::local_workrequesting_fifo:
sched = "local_workrequesting_fifo";
break;
case resource::local_workrequesting_lifo:
sched = "local_workrequesting_lifo";
break;
case resource::static_:
sched = "static";
break;
Expand Down Expand Up @@ -451,6 +457,18 @@ namespace hpx { namespace resource { namespace detail {
{
default_scheduler = scheduling_policy::local_priority_lifo;
}
else if (0 ==
std::string("local-workrequesting-fifo")
.find(default_scheduler_str))
{
default_scheduler = scheduling_policy::local_workrequesting_fifo;
}
else if (0 ==
std::string("local-workrequesting-lifo")
.find(default_scheduler_str))
{
default_scheduler = scheduling_policy::local_workrequesting_lifo;
}
else if (0 == std::string("static").find(default_scheduler_str))
{
default_scheduler = scheduling_policy::static_;
Expand Down
Loading

0 comments on commit 26bbc03

Please sign in to comment.