Skip to content

Commit

Permalink
Merge #6311
Browse files Browse the repository at this point in the history
6311: Adding executor parallel invoke CPOs r=hkaiser a=hkaiser

This adds new CPOs: `sync_invoke` and `async_invoke`. It also adds an implementation for the `fork_join_executor` and `block_fork_join_executor` as well as fallback implementations for all executors not supporting these. The APIs are:
```
namespace hpx::parallel::execution {

    template <typename Executor, typename F, typename... Fs,
        HPX_CONCEPT_REQUIRES_(
            std::is_invocable_v<F> && (std::is_invocable_v<Fs> && ...)
        )>
    void sync_invoke(Executor&& exec, F&& f, Fs&&... fs);

    template <typename Executor, typename F, typename... Fs,
        HPX_CONCEPT_REQUIRES_(
            std::is_invocable_v<F> && (std::is_invocable_v<Fs> && ...)
        )>
    decltype(auto) async_invoke(Executor&& exec, F&& f, Fs&&... fs);
}
```
For executors supporting the CPOs this will concurrently launch the given functions and will either wait for them to complete or return a completion token (e.g., a `hpx::future`) representing the completion of the function invocations.


Co-authored-by: Hartmut Kaiser <[email protected]>
  • Loading branch information
StellarBot and hkaiser committed Jul 27, 2023
2 parents 6c184dd + cb8d178 commit dd17010
Show file tree
Hide file tree
Showing 9 changed files with 879 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ namespace hpx::parallel::util::detail {
// wait for all tasks to finish
if (hpx::wait_all_nothrow(items))
{
// always rethrow workitems has at least one exceptional
// always rethrow if items has at least one exceptional
// future
handle_local_exceptions::call(items);
}
Expand All @@ -324,7 +324,7 @@ namespace hpx::parallel::util::detail {
// wait for all tasks to finish
if (hpx::wait_all_nothrow(items))
{
// always rethrow workitems has at least one exceptional
// always rethrow if items has at least one exceptional
// future
handle_local_exceptions::call(items);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ namespace hpx::execution::experimental {
static hpx::threads::mask_type cores_for_targets(
std::vector<compute::host::target> const& targets)
{
auto& rp = hpx::resource::get_partitioner();
std::size_t this_pu = rp.get_pu_num(hpx::get_worker_thread_num());
auto const& rp = hpx::resource::get_partitioner();
std::size_t const this_pu =
rp.get_pu_num(hpx::get_worker_thread_num());
if (targets.size() == 1)
{
// don't build a hierarchy of executors if there is only one
Expand Down Expand Up @@ -141,17 +142,16 @@ namespace hpx::execution::experimental {
/// \param stacksize The stacksize of the worker threads. Must not be
/// nostack.
/// \param schedule The loop schedule of the parallel regions.
/// \param yield_delay The time after which the executor yields to
/// other work if it hasn't received any new work for bulk
/// execution.
/// \param yield_delay The time after which the executor yields to other
/// work if it has not received any new work for execution.
///
/// \note This constructor will create one fork_join_executor for
/// each numa domain
explicit block_fork_join_executor(
threads::thread_priority priority = threads::thread_priority::bound,
threads::thread_stacksize stacksize =
threads::thread_stacksize::small_,
fork_join_executor::loop_schedule schedule =
fork_join_executor::loop_schedule const schedule =
fork_join_executor::loop_schedule::static_,
std::chrono::nanoseconds yield_delay = std::chrono::milliseconds(1))
: block_fork_join_executor(compute::host::numa_domains(), priority,
Expand All @@ -166,9 +166,8 @@ namespace hpx::execution::experimental {
/// \param stacksize The stacksize of the worker threads. Must not be
/// nostack.
/// \param schedule The loop schedule of the parallel regions.
/// \param yield_delay The time after which the executor yields to
/// other work if it hasn't received any new work for bulk
/// execution.
/// \param yield_delay The time after which the executor yields to other
/// work if it has not received any new work for execution.
///
/// \note This constructor will create one fork_join_executor for
/// each given target
Expand All @@ -177,7 +176,7 @@ namespace hpx::execution::experimental {
threads::thread_priority priority = threads::thread_priority::bound,
threads::thread_stacksize stacksize =
threads::thread_stacksize::small_,
fork_join_executor::loop_schedule schedule =
fork_join_executor::loop_schedule const schedule =
fork_join_executor::loop_schedule::static_,
std::chrono::nanoseconds yield_delay = std::chrono::milliseconds(1))
: exec_(cores_for_targets(targets), priority, stacksize,
Expand Down Expand Up @@ -212,7 +211,7 @@ namespace hpx::execution::experimental {
template <typename F, typename S, typename... Ts>
void bulk_sync_execute_helper(F&& f, S const& shape, Ts&&... ts)
{
std::size_t num_targets = block_execs_.size();
std::size_t const num_targets = block_execs_.size();
if (num_targets == 0)
{
// simply forward call if there is no executor hierarchy
Expand All @@ -221,21 +220,21 @@ namespace hpx::execution::experimental {
return;
}

std::size_t size = std::size(shape);
auto outer_func = [&](std::size_t index, auto&& f,
auto const& shape, auto&&... ts) {
// calculate the inner shape dimensions
std::size_t const size = std::size(shape);
auto outer_func = [&](std::size_t index, auto&& func,
auto const& full_shape, auto&&... args) {
// calculate the inner inner_shape dimensions
auto const part_begin = (index * size) / num_targets;
auto const part_end = ((index + 1) * size) / num_targets;

auto begin = std::next(std::begin(shape), part_begin);
auto begin = std::next(std::begin(full_shape), part_begin);
auto inner_shape = hpx::util::iterator_range(
begin, std::next(begin, part_end - part_begin));

// invoke bulk_sync_execute on one of the inner executors
hpx::parallel::execution::bulk_sync_execute(block_execs_[index],
HPX_FORWARD(decltype(f), f), inner_shape,
HPX_FORWARD(decltype(ts), ts)...);
HPX_FORWARD(decltype(func), func), inner_shape,
HPX_FORWARD(decltype(args), args)...);
};

auto outer_shape = hpx::util::counting_shape(num_targets);
Expand All @@ -244,26 +243,107 @@ namespace hpx::execution::experimental {
outer_shape, HPX_FORWARD(F, f), shape, HPX_FORWARD(Ts, ts)...);
}

template <typename F, typename S, typename... Ts>
// clang-format off
template <typename F, typename S, typename... Ts,
HPX_CONCEPT_REQUIRES_(
!std::is_integral_v<S>
)>
// clang-format on
friend void tag_invoke(hpx::parallel::execution::bulk_sync_execute_t,
block_fork_join_executor& exec, F&& f, S const& shape, Ts&&... ts)
{
exec.bulk_sync_execute_helper(
HPX_FORWARD(F, f), shape, HPX_FORWARD(Ts, ts)...);
}

template <typename F, typename S, typename... Ts>
// clang-format off
template <typename F, typename S, typename... Ts,
HPX_CONCEPT_REQUIRES_(
!std::is_integral_v<S>
)>
// clang-format on
friend decltype(auto) tag_invoke(
hpx::parallel::execution::bulk_async_execute_t,
block_fork_join_executor& exec, F&& f, S const& shape, Ts&&... ts)
{
// Forward to the synchronous version as we can't create futures to
// the completion of the parallel region (this HPX thread
// participates in computation).
return hpx::detail::try_catch_exception_ptr(
[&]() {
exec.bulk_sync_execute_helper(
HPX_FORWARD(F, f), shape, HPX_FORWARD(Ts, ts)...);
return hpx::make_ready_future();
},
[&](std::exception_ptr&& ep) {
return hpx::make_exceptional_future<void>(HPX_MOVE(ep));
});
}

template <typename... Fs>
void sync_invoke_helper(Fs&&... fs) const
{
std::size_t const num_targets = block_execs_.size();
if (num_targets == 0 || sizeof...(Fs) == 1)
{
// simply forward call if there is no executor hierarchy
hpx::parallel::execution::sync_invoke(
exec_, HPX_FORWARD(Fs, fs)...);
return;
}

if constexpr (sizeof...(Fs) > 1)
{
auto function_pack =
hpx::forward_as_tuple(HPX_FORWARD(Fs, fs)...);

constexpr std::size_t Size = sizeof...(Fs);
auto outer_func = [&](std::size_t index) {
auto const part_begin = (index * Size) / num_targets;
auto const part_end = ((index + 1) * Size) / num_targets;

// invoke sync_invoke on one of the inner executors
block_execs_[index].sync_invoke_helper(
function_pack, part_begin, part_end - part_begin);
};

auto outer_shape = hpx::util::counting_shape(num_targets);

hpx::parallel::execution::bulk_sync_execute(
exec_, outer_func, outer_shape);
}
}

// clang-format off
template <typename F, typename... Fs,
HPX_CONCEPT_REQUIRES_(
std::is_invocable_v<F> && (std::is_invocable_v<Fs> && ...)
)>
// clang-format on
friend decltype(auto) tag_invoke(
hpx::parallel::execution::sync_invoke_t,
block_fork_join_executor const& exec, F&& f, Fs&&... fs)
{
exec.sync_invoke_helper(HPX_FORWARD(F, f), HPX_FORWARD(Fs, fs)...);
}

// clang-format off
template <typename F, typename... Fs,
HPX_CONCEPT_REQUIRES_(
std::is_invocable_v<F> && (std::is_invocable_v<Fs> && ...)
)>
// clang-format on
friend decltype(auto) tag_invoke(
hpx::parallel::execution::async_invoke_t,
block_fork_join_executor const& exec, F&& f, Fs&&... fs)
{
// Forward to the synchronous version as we can't create
// futures to the completion of the parallel region (this HPX
// thread participates in computation).
return hpx::detail::try_catch_exception_ptr(
[&]() {
exec.bulk_sync_execute_helper(
HPX_FORWARD(F, f), shape, HPX_FORWARD(Ts, ts)...);
exec.sync_invoke_helper(
HPX_FORWARD(F, f), HPX_FORWARD(Fs, fs)...);
return hpx::make_ready_future();
},
[&](std::exception_ptr&& ep) {
Expand Down
136 changes: 136 additions & 0 deletions libs/core/compute_local/tests/unit/block_fork_join_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,137 @@ void test_bulk_async_exception(ExecutorArgs&&... args)
HPX_TEST(caught_exception);
}

static std::atomic<std::size_t> count1{0};
static std::atomic<std::size_t> count2{0};
static std::atomic<std::size_t> count3{0};
static std::atomic<std::size_t> count4{0};

template <typename... ExecutorArgs>
void test_invoke_sync_homogeneous(ExecutorArgs&&... args)
{
std::cerr << "test_invoke_sync_homogeneous\n";

auto f1 = [] { ++count1; };

block_fork_join_executor exec{std::forward<ExecutorArgs>(args)...};

count1 = 0;
hpx::parallel::execution::sync_invoke(exec, f1);
HPX_TEST_EQ(count1.load(), static_cast<std::size_t>(1));

count1 = 0;
hpx::parallel::execution::sync_invoke(exec, f1, f1);
HPX_TEST_EQ(count1.load(), static_cast<std::size_t>(2));

count1 = 0;
hpx::parallel::execution::sync_invoke(exec, f1, f1, f1, f1, f1);
HPX_TEST_EQ(count1.load(), static_cast<std::size_t>(5));

count1 = 0;
hpx::parallel::execution::sync_invoke(
exec, f1, f1, f1, f1, f1, f1, f1, f1, f1, f1, f1);
HPX_TEST_EQ(count1.load(), static_cast<std::size_t>(11));
}

template <typename... ExecutorArgs>
void test_invoke_sync(ExecutorArgs&&... args)
{
std::cerr << "test_invoke_sync\n";

auto f1 = [] { ++count1; };
auto f2 = [] { ++count2; };
auto f3 = [] { ++count3; };
auto f4 = [] { ++count4; };

block_fork_join_executor exec{std::forward<ExecutorArgs>(args)...};

count1 = 0;
hpx::parallel::execution::sync_invoke(exec, f1);
HPX_TEST_EQ(count1.load(), static_cast<std::size_t>(1));

count1 = 0;
count2 = 0;
hpx::parallel::execution::sync_invoke(exec, f1, f2);
HPX_TEST_EQ(count1.load(), static_cast<std::size_t>(1));
HPX_TEST_EQ(count2.load(), static_cast<std::size_t>(1));

count1 = 0;
count2 = 0;
count3 = 0;
count4 = 0;
hpx::parallel::execution::sync_invoke(exec, f1, f2, f3, f4, f1);
HPX_TEST_EQ(count1.load(), static_cast<std::size_t>(2));
HPX_TEST_EQ(count2.load(), static_cast<std::size_t>(1));
HPX_TEST_EQ(count3.load(), static_cast<std::size_t>(1));
HPX_TEST_EQ(count4.load(), static_cast<std::size_t>(1));

count1 = 0;
count2 = 0;
count3 = 0;
count4 = 0;
hpx::parallel::execution::sync_invoke(
exec, f1, f2, f3, f4, f1, f4, f1, f2, f3, f4, f1);
HPX_TEST_EQ(count1.load(), static_cast<std::size_t>(4));
HPX_TEST_EQ(count2.load(), static_cast<std::size_t>(2));
HPX_TEST_EQ(count3.load(), static_cast<std::size_t>(2));
HPX_TEST_EQ(count4.load(), static_cast<std::size_t>(3));
}

template <typename... ExecutorArgs>
void test_invoke_sync_homogeneous_exception(ExecutorArgs&&... args)
{
std::cerr << "test_invoke_sync_homogeneous_exception\n";

auto f1 = [] { throw std::runtime_error("test"); };

block_fork_join_executor exec{std::forward<ExecutorArgs>(args)...};

bool caught_exception = false;
try
{
hpx::parallel::execution::sync_invoke(exec, f1, f1, f1);
HPX_TEST(false);
}
catch (std::runtime_error const& /*e*/)
{
caught_exception = true;
}
catch (...)
{
HPX_TEST(false);
}

HPX_TEST(caught_exception);
}

template <typename... ExecutorArgs>
void test_invoke_sync_exception(ExecutorArgs&&... args)
{
std::cerr << "test_invoke_sync_exception\n";

auto f1 = [] {};
auto f2 = [] { throw std::runtime_error("test"); };

block_fork_join_executor exec{std::forward<ExecutorArgs>(args)...};

bool caught_exception = false;
try
{
hpx::parallel::execution::sync_invoke(exec, f1, f2);
HPX_TEST(false);
}
catch (std::runtime_error const& /*e*/)
{
caught_exception = true;
}
catch (...)
{
HPX_TEST(false);
}

HPX_TEST(caught_exception);
}

template <typename... ExecutorArgs>
void test_executor(hpx::threads::thread_priority priority,
hpx::threads::thread_stacksize stacksize,
Expand All @@ -165,6 +296,11 @@ void test_executor(hpx::threads::thread_priority priority,
test_bulk_async(priority, stacksize, schedule);
test_bulk_sync_exception(priority, stacksize, schedule);
test_bulk_async_exception(priority, stacksize, schedule);

test_invoke_sync_homogeneous(priority, stacksize, schedule);
test_invoke_sync(priority, stacksize, schedule);
test_invoke_sync_homogeneous_exception(priority, stacksize, schedule);
test_invoke_sync_exception(priority, stacksize, schedule);
}

///////////////////////////////////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit dd17010

Please sign in to comment.