Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding local work requesting scheduler that is based on message passing internally #5845

Merged
merged 6 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ jobs:
-DHPX_WITH_FETCH_JSON=On \
-DCMAKE_EXPORT_COMPILE_COMMANDS=On \
-DHPX_WITH_DOCUMENTATION=On \
-DHPX_WITH_DOCUMENTATION_OUTPUT_FORMATS="${DOCUMENTATION_OUTPUT_FORMATS}"
-DHPX_WITH_DOCUMENTATION_OUTPUT_FORMATS="${DOCUMENTATION_OUTPUT_FORMATS}" \
-DHPX_WITH_TESTS_COMMAND_LINE=--hpx:queuing=local-workrequesting-fifo
- persist_to_workspace:
root: /hpx
paths:
Expand Down
2 changes: 2 additions & 0 deletions .jenkins/lsu/env-clang-13.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ configure_extra_options+=" -DHPX_WITH_PARCELPORT_MPI=ON"
configure_extra_options+=" -DHPX_WITH_PARCELPORT_LCI=ON"
configure_extra_options+=" -DHPX_WITH_FETCH_LCI=ON"
configure_extra_options+=" -DHPX_WITH_LOGGING=OFF"
configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:queuing=local-workrequesting-fifo"


# The pwrapi library still needs to be set up properly on rostam
# configure_extra_options+=" -DHPX_WITH_POWER_COUNTER=ON"
Expand Down
3 changes: 3 additions & 0 deletions .jenkins/lsu/env-gcc-11.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ module load pwrapi/1.1.1
export HPXRUN_RUNWRAPPER=srun
export CXX_STD="20"

configure_extra_options+=" -DCMAKE_BUILD_TYPE=${build_type}"
configure_extra_options+=" -DHPX_WITH_CXX_STANDARD=${CXX_STD}"
configure_extra_options+=" -DHPX_WITH_MALLOC=system"
configure_extra_options+=" -DHPX_WITH_FETCH_ASIO=ON"
Expand All @@ -25,3 +26,5 @@ configure_extra_options+=" -DHPX_WITH_DATAPAR_BACKEND=STD_EXPERIMENTAL_SIMD"

# The pwrapi library still needs to be set up properly on rostam
# configure_extra_options+=" -DHPX_WITH_POWER_COUNTER=ON"

configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:queuing=local-workrequesting-lifo"
7 changes: 7 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1407,6 +1407,13 @@ hpx_option(
ADVANCED
)

hpx_option(
HPX_WITH_TESTS_COMMAND_LINE STRING
"Add given command line options to all tests run" ""
CATEGORY "Debugging"
ADVANCED
)

hpx_option(
HPX_WITH_TESTS_MAX_THREADS_PER_LOCALITY
STRING
Expand Down
5 changes: 5 additions & 0 deletions cmake/HPX_AddTest.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ function(add_hpx_test category name)
)
endif()

# add additional command line arguments to all test executions
if(NOT x${HPX_WITH_TESTS_COMMAND_LINE} STREQUAL x"")
set(args ${args} "${HPX_WITH_TESTS_COMMAND_LINE}")
endif()

if(${HPX_WITH_PARALLEL_TESTS_BIND_NONE}
AND NOT run_serial
AND NOT "${name}_RUNWRAPPER"
Expand Down
37 changes: 26 additions & 11 deletions docs/sphinx/manual/hpx_runtime_and_resources.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
|hpx| thread scheduling policies
================================

The |hpx| runtime has five thread scheduling policies: local-priority,
static-priority, local, static and abp-priority. These policies can be specified
from the command line using the command line option :option:`--hpx:queuing`. In
order to use a particular scheduling policy, the runtime system must be built
with the appropriate scheduler flag turned on (e.g. ``cmake
-DHPX_THREAD_SCHEDULERS=local``, see :ref:`cmake_variables` for more
information).
The |hpx| runtime has six thread scheduling policies: local-priority,
static-priority, local, static, local-workrequesting-fifo, and abp-priority.
These policies can be specified from the command line using the command line
option :option:`--hpx:queuing`. In order to use a particular scheduling policy,
the runtime system must be built with the appropriate scheduler flag turned on
(e.g. ``cmake -DHPX_THREAD_SCHEDULERS=local``, see :ref:`cmake_variables` for
more information).

Priority local scheduling policy (default policy)
-------------------------------------------------
Expand Down Expand Up @@ -51,9 +51,7 @@ policy and must be invoked using the command line option
Static priority scheduling policy
---------------------------------

* invoke using: :option:`--hpx:queuing`\ ``=static-priority`` (or ``-qs``)
* flag to turn on for build: ``HPX_THREAD_SCHEDULERS=all`` or
``HPX_THREAD_SCHEDULERS=static-priority``
* invoke using: :option:`--hpx:queuing`\ ``static-priority`` (or ``-qs``)

The static scheduling policy maintains one queue per OS thread from which each
OS thread pulls its tasks (user threads). Threads are distributed in a round
Expand Down Expand Up @@ -102,7 +100,7 @@ domain first, only after that work is stolen from other NUMA domains.
This scheduler can be used with two underlying queuing policies (FIFO:
first-in-first-out, and LIFO: last-in-first-out). In order to use the LIFO
policy use the command line option
:option:`--hpx:queuing`\ ``=abp-priority-lifo``.
:option:`--hpx:queuing`\ ``abp-priority-lifo``.

..
Questions, concerns and notes:
Expand Down Expand Up @@ -151,6 +149,23 @@ policy use the command line option

I see both FIFO and double ended queues in ABP policies?

Work requesting scheduling policies
-----------------------------------

* invoke using: :option:`--hpx:queuing`\ ``local-workrequesting-fifo``
or using :option:`--hpx:queuing`\ ``local-workrequesting-lifo``

The work-requesting policies rely on a different mechanism of balancing work
between cores (compared to the other policies listed above). Instead of actively
trying to steal work from other cores, requesting work relies on a less
disruptive mechanism. If a core runs out of work, instead of actively looking at
the queues of neighboring cores, in this case a request is posted to another
core. This core now (whenever it is not busy with other work) either responds to
the original core by sending back work or passes the request on to the next
possible core in the system. In general, this scheme avoids contention on the
work queues as those are always accessed by their own cores only.


The |hpx| resource partitioner
==============================

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1571,7 +1571,9 @@ The predefined command line options for any application using

The queue scheduling policy to use. Options are ``local``,
``local-priority-fifo``, ``local-priority-lifo``, ``static``,
``static-priority``, ``abp-priority-fifo`` and ``abp-priority-lifo``
``static-priority``, ``abp-priority-fifo``,
``local-workrequesting-fifo``, ``local-workrequesting-lifo``
and ``abp-priority-lifo``
(default: ``local-priority-fifo``).

.. option:: --hpx:high-priority-threads arg
Expand Down
8 changes: 4 additions & 4 deletions examples/1d_stencil/1d_stencil_4.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ struct stepper
});

// limit depth of dependency tree
hpx::sliding_semaphore sem(nd);
auto sem = std::make_shared<hpx::sliding_semaphore>(nd);

auto Op = unwrapping(&stepper::heat_part);

Expand All @@ -184,15 +184,15 @@ struct stepper
// trigger the semaphore once computation has reached this point
if ((t % nd) == 0)
{
next[0].then([&sem, t](partition&&) {
next[0].then([sem, t](partition&&) {
// inform semaphore about new lower limit
sem.signal(t);
sem->signal(t);
});
}

// suspend if the tree has become too deep, the continuation above
// will resume this thread once the computation has caught up
sem.wait(t);
sem->wait(t);
}

// Return the solution at time-step 'nt'.
Expand Down
8 changes: 4 additions & 4 deletions examples/1d_stencil/1d_stencil_4_throttle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ struct stepper
});

// limit depth of dependency tree
hpx::sliding_semaphore sem(nd);
auto sem = std::make_shared<hpx::sliding_semaphore>(nd);

auto Op = unwrapping(&stepper::heat_part);

Expand All @@ -261,15 +261,15 @@ struct stepper
// trigger the semaphore once computation has reached this point
if ((t % nd) == 0)
{
next[0].then([&sem, t](partition&&) {
next[0].then([sem, t](partition&&) {
// inform semaphore about new lower limit
sem.signal(t);
sem->signal(t);
});
}

// suspend if the tree has become too deep, the continuation above
// will resume this thread once the computation has caught up
sem.wait(t);
sem->wait(t);
}

// Return the solution at time-step 'nt'.
Expand Down
9 changes: 5 additions & 4 deletions examples/1d_stencil/1d_stencil_7.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cstddef>
#include <cstdint>
#include <iostream>
#include <memory>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -359,7 +360,7 @@ stepper::space stepper::do_work(std::size_t np, std::size_t nx, std::size_t nt)

// limit depth of dependency tree
std::size_t nd = 3;
hpx::sliding_semaphore sem(nd);
auto sem = std::make_shared<hpx::sliding_semaphore>(nd);

heat_part_action act;
for (std::size_t t = 0; t != nt; ++t)
Expand All @@ -377,15 +378,15 @@ stepper::space stepper::do_work(std::size_t np, std::size_t nx, std::size_t nt)

if ((t % nd) == 0)
{
next[0].then([&sem, t](partition&&) {
next[0].then([sem, t](partition&&) {
// inform semaphore about new lower limit
sem.signal(t);
sem->signal(t);
});
}

// suspend if the tree has become too deep, the continuation above
// will resume this thread once the computation has caught up
sem.wait(t);
sem->wait(t);
}

// Return the solution at time-step 'nt'.
Expand Down
9 changes: 5 additions & 4 deletions examples/1d_stencil/1d_stencil_8.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <cstddef>
#include <cstdint>
#include <iostream>
#include <memory>
#include <mutex>
#include <stack>
#include <string>
Expand Down Expand Up @@ -575,7 +576,7 @@ stepper_server::space stepper_server::do_work(
}

// limit depth of dependency tree
hpx::sliding_semaphore sem(nd);
auto sem = std::make_shared<hpx::sliding_semaphore>(nd);

for (std::size_t t = 0; t != nt; ++t)
{
Expand Down Expand Up @@ -626,15 +627,15 @@ stepper_server::space stepper_server::do_work(
// trigger the semaphore once computation has reached this point
if ((t % nd) == 0)
{
next[0].then([&sem, t](partition&&) {
next[0].then([sem, t](partition&&) {
// inform semaphore about new lower limit
sem.signal(t);
sem->signal(t);
});
}

// suspend if the tree has become too deep, the continuation above
// will resume this thread once the computation has caught up
sem.wait(t);
sem->wait(t);
}

return U_[nt % 2];
Expand Down
25 changes: 15 additions & 10 deletions examples/quickstart/fibonacci_local.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

#include <cstdint>
#include <iostream>
#include <string>
#include <utility>
#include <vector>

///////////////////////////////////////////////////////////////////////////////
//[fibonacci
Expand All @@ -26,22 +29,20 @@ std::uint64_t fibonacci(std::uint64_t n)
if (n < 2)
return n;

// Invoking the Fibonacci algorithm twice is inefficient.
// However, we intentionally demonstrate it this way to create some
// heavy workload.

hpx::future<std::uint64_t> n1 = hpx::async(fibonacci, n - 1);
hpx::future<std::uint64_t> n2 = hpx::async(fibonacci, n - 2);
std::uint64_t n2 = fibonacci(n - 2);

return n1.get() +
n2.get(); // wait for the Futures to return their values
return n1.get() + n2; // wait for the Future to return their values
}
//fibonacci]

///////////////////////////////////////////////////////////////////////////////
//[hpx_main
int hpx_main(hpx::program_options::variables_map& vm)
{
hpx::threads::add_scheduler_mode(
hpx::threads::policies::scheduler_mode::fast_idle_mode);

// extract command line argument, i.e. fib(N)
std::uint64_t n = vm["n-value"].as<std::uint64_t>();

Expand All @@ -67,9 +68,13 @@ int main(int argc, char* argv[])
hpx::program_options::options_description desc_commandline(
"Usage: " HPX_APPLICATION_STRING " [options]");

desc_commandline.add_options()("n-value",
hpx::program_options::value<std::uint64_t>()->default_value(10),
"n value for the Fibonacci function");
// clang-format off
desc_commandline.add_options()
("n-value",
hpx::program_options::value<std::uint64_t>()->default_value(10),
"n value for the Fibonacci function")
;
// clang-format on

// Initialize and run HPX
hpx::local::init_params init_args;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,18 +478,24 @@ namespace hpx::local::detail {
"run on (default: 0), valid for "
"--hpx:queuing=local, --hpx:queuing=abp-priority, "
"--hpx:queuing=static, --hpx:queuing=static-priority, "
"--hpx:queuing=local-workrequesting-fifo, "
"--hpx:queuing=local-workrequesting-lifo, "
"and --hpx:queuing=local-priority only")
("hpx:pu-step", value<std::size_t>(),
"the step between used processing unit numbers for this "
"instance of HPX (default: 1), valid for "
"--hpx:queuing=local, --hpx:queuing=abp-priority, "
"--hpx:queuing=static, --hpx:queuing=static-priority "
"--hpx:queuing=local-workrequesting-fifo, "
"--hpx:queuing=local-workrequesting-lifo, "
"and --hpx:queuing=local-priority only")
("hpx:affinity", value<std::string>(),
"the affinity domain the OS threads will be confined to, "
"possible values: pu, core, numa, machine (default: pu), valid for "
"--hpx:queuing=local, --hpx:queuing=abp-priority, "
"--hpx:queuing=static, --hpx:queuing=static-priority "
"--hpx:queuing=local-workrequesting-fifo, "
"--hpx:queuing=local-workrequesting-lifo, "
" and --hpx:queuing=local-priority only")
("hpx:bind", value<std::vector<std::string> >()->composing(),
"the detailed affinity description for the OS threads, see "
Expand All @@ -514,13 +520,16 @@ namespace hpx::local::detail {
("hpx:queuing", value<std::string>(),
"the queue scheduling policy to use, options are "
"'local', 'local-priority-fifo','local-priority-lifo', "
"'abp-priority-fifo', 'abp-priority-lifo', 'static', and "
"'static-priority' (default: 'local-priority'; "
"'abp-priority-fifo', 'abp-priority-lifo', 'static', "
"'static-priority', 'local-workrequesting-fifo', and "
"'local-workrequesting-lifo' (default: 'local-priority'; "
"all option values can be abbreviated)")
("hpx:high-priority-threads", value<std::size_t>(),
"the number of operating system threads maintaining a high "
"priority queue (default: number of OS threads), valid for "
"--hpx:queuing=local-priority,--hpx:queuing=static-priority, "
"--hpx:queuing=local-workrequesting-fifo, "
"--hpx:queuing=local-workrequesting-lifo, "
" and --hpx:queuing=abp-priority only)")
("hpx:numa-sensitive", value<std::size_t>()->implicit_value(0),
"makes the local-priority scheduler NUMA sensitive ("
Expand Down
10 changes: 5 additions & 5 deletions libs/core/resiliency/tests/performance/replay/1d_stencil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ struct stepper

hpx::future<space> do_work(std::size_t subdomains,
std::size_t subdomain_width, std::size_t iterations, std::size_t sti,
std::uint64_t nd, hpx::sliding_semaphore& sem)
std::uint64_t nd, std::shared_ptr<hpx::sliding_semaphore> sem)
{
using hpx::dataflow;
using hpx::unwrapping;
Expand Down Expand Up @@ -186,15 +186,15 @@ struct stepper
// trigger the semaphore once computation has reached this point
if ((t % nd) == 0)
{
next[0].then([&sem, t](partition&&) {
next[0].then([sem, t](partition&&) {
// inform semaphore about new lower limit
sem.signal(t);
sem->signal(t);
});
}

// suspend if the tree has become too deep, the continuation above
// will resume this thread once the computation has caught up
sem.wait(t);
sem->wait(t);
}

// Return the solution at time-step 'iterations'.
Expand Down Expand Up @@ -227,7 +227,7 @@ int hpx_main(hpx::program_options::variables_map& vm)

{
// limit depth of dependency tree
hpx::sliding_semaphore sem(nd);
auto sem = std::make_shared<hpx::sliding_semaphore>(nd);

hpx::future<stepper::space> result =
step.do_work(subdomains, subdomain_width, iterations, sti, nd, sem);
Expand Down
Loading