From 91fc649cad8c73da948556b14b5787ab2590e4b8 Mon Sep 17 00:00:00 2001 From: Hartmut Kaiser Date: Tue, 25 Jul 2023 15:15:44 -0500 Subject: [PATCH] Making sure changed number of counts is propagated to executor --- .../container_algorithms/is_sorted.hpp | 2 +- .../hpx/parallel/util/detail/chunk_size.hpp | 56 +++- .../hpx/parallel/util/foreach_partitioner.hpp | 10 +- .../include/hpx/parallel/util/partitioner.hpp | 21 +- .../tests/regressions/CMakeLists.txt | 1 + .../tests/regressions/num_cores.cpp | 39 +++ .../hpx/execution/executors/num_cores.hpp | 9 + .../tests/regressions/is_executor_1691.cpp | 9 + .../disable_thread_stealing_executor.cpp | 32 +++ .../examples/executor_with_thread_hooks.cpp | 6 + .../executors/datapar/execution_policy.hpp | 114 ++++++++- .../datapar/execution_policy_fwd.hpp | 8 +- .../hpx/executors/execution_policy.hpp | 241 +++++++++++++++++- .../hpx/executors/execution_policy_fwd.hpp | 16 +- .../restricted_thread_pool_executor.hpp | 2 +- .../hpx/resiliency/replay_executor.hpp | 56 +++- .../hpx/resiliency/replicate_executor.hpp | 63 ++++- .../examples/1d_stencil_4_checkpoint.cpp | 88 +++---- 18 files changed, 664 insertions(+), 109 deletions(-) create mode 100644 libs/core/algorithms/tests/regressions/num_cores.cpp diff --git a/libs/core/algorithms/include/hpx/parallel/container_algorithms/is_sorted.hpp b/libs/core/algorithms/include/hpx/parallel/container_algorithms/is_sorted.hpp index beed8d6b620f..f72353ebd092 100644 --- a/libs/core/algorithms/include/hpx/parallel/container_algorithms/is_sorted.hpp +++ b/libs/core/algorithms/include/hpx/parallel/container_algorithms/is_sorted.hpp @@ -508,7 +508,7 @@ namespace hpx { namespace ranges { namespace hpx::ranges { - inline constexpr struct is_sorted_t final + inline constexpr struct is_sorted_t : hpx::detail::tag_parallel_algorithm { private: diff --git a/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp b/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp index 22836ce2de03..691c399c5744 100644 --- a/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp +++ b/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp @@ -8,6 +8,7 @@ #include #include +#include #include #include #include @@ -16,6 +17,7 @@ #include #include #include +#include #include #include @@ -132,7 +134,7 @@ namespace hpx::parallel::util::detail { template hpx::util::iterator_range> - get_bulk_iteration_shape(ExPolicy&& policy, IterOrR& it_or_r, + get_bulk_iteration_shape(ExPolicy& policy, IterOrR& it_or_r, std::size_t& count, Stride s = Stride(1)) { if (count == 0) @@ -166,6 +168,10 @@ namespace hpx::parallel::util::detail { // clang-format on } + // update executor with new values + policy = hpx::experimental::prefer( + execution::with_processing_units_count, policy, cores); + auto shape_begin = chunk_size_iterator(it_or_r, chunk_size, count); auto shape_end = chunk_size_iterator(last, chunk_size, count, count); @@ -175,7 +181,7 @@ namespace hpx::parallel::util::detail { template hpx::util::iterator_range> - get_bulk_iteration_shape(ExPolicy&& policy, std::vector& workitems, + get_bulk_iteration_shape(ExPolicy& policy, std::vector& workitems, F1&& f1, IterOrR& it_or_r, std::size_t& count, Stride s = Stride(1)) { if (count == 0) @@ -241,6 +247,10 @@ namespace hpx::parallel::util::detail { // clang-format on } + // update executor with new values + policy = hpx::experimental::prefer( + execution::with_processing_units_count, policy, cores); + auto shape_begin = chunk_size_iterator(it_or_r, chunk_size, count); auto shape_end = chunk_size_iterator(last, chunk_size, count, count); @@ -250,7 +260,7 @@ namespace hpx::parallel::util::detail { template std::vector> - get_bulk_iteration_shape_variable(ExPolicy&& policy, IterOrR& it_or_r, + get_bulk_iteration_shape_variable(ExPolicy& policy, IterOrR& it_or_r, std::size_t& count, Stride s = Stride(1)) { using tuple_type = hpx::tuple; @@ -308,27 +318,31 @@ namespace hpx::parallel::util::detail { } // clang-format on + // update executor with new values + policy = hpx::experimental::prefer( + execution::with_processing_units_count, policy, cores); + return shape; } template - decltype(auto) get_bulk_iteration_shape(std::false_type, ExPolicy&& policy, + decltype(auto) get_bulk_iteration_shape(std::false_type, ExPolicy& policy, std::vector& workitems, F1&& f1, FwdIter& begin, std::size_t& count, Stride s = Stride(1)) { - return get_bulk_iteration_shape(HPX_FORWARD(ExPolicy, policy), - workitems, HPX_FORWARD(F1, f1), begin, count, s); + return get_bulk_iteration_shape( + policy, workitems, HPX_FORWARD(F1, f1), begin, count, s); } template - decltype(auto) get_bulk_iteration_shape(std::true_type, ExPolicy&& policy, + decltype(auto) get_bulk_iteration_shape(std::true_type, ExPolicy& policy, std::vector& workitems, F1&& f1, FwdIter& begin, std::size_t& count, Stride s = Stride(1)) { - return get_bulk_iteration_shape_variable(HPX_FORWARD(ExPolicy, policy), - workitems, HPX_FORWARD(F1, f1), begin, count, s); + return get_bulk_iteration_shape_variable( + policy, workitems, HPX_FORWARD(F1, f1), begin, count, s); } /////////////////////////////////////////////////////////////////////////// @@ -360,7 +374,7 @@ namespace hpx::parallel::util::detail { typename Stride = std::size_t> hpx::util::iterator_range< parallel::util::detail::chunk_size_idx_iterator> - get_bulk_iteration_shape_idx(ExPolicy&& policy, FwdIter begin, + get_bulk_iteration_shape_idx(ExPolicy& policy, FwdIter begin, std::size_t count, Stride s = Stride(1)) { using iterator = @@ -397,6 +411,13 @@ namespace hpx::parallel::util::detail { // clang-format on } + // update executor with new values + policy = hpx::experimental::prefer( + execution::with_processing_units_count, policy, cores); + + using iterator = + parallel::util::detail::chunk_size_idx_iterator; + iterator shape_begin(begin, chunk_size, count, 0, 0); iterator shape_end(last, chunk_size, count, count, 0); @@ -407,7 +428,7 @@ namespace hpx::parallel::util::detail { typename Stride = std::size_t> hpx::util::iterator_range< parallel::util::detail::chunk_size_idx_iterator> - get_bulk_iteration_shape_idx(ExPolicy&& policy, + get_bulk_iteration_shape_idx(ExPolicy& policy, std::vector& workitems, F1&& f1, FwdIter begin, std::size_t count, Stride s = Stride(1)) { @@ -475,6 +496,13 @@ namespace hpx::parallel::util::detail { // clang-format on } + // update executor with new values + policy = hpx::experimental::prefer( + execution::with_processing_units_count, policy, cores); + + using iterator = + parallel::util::detail::chunk_size_idx_iterator; + iterator shape_begin(begin, chunk_size, count, 0, base_idx); iterator shape_end(last, chunk_size, count, count, base_idx); @@ -484,7 +512,7 @@ namespace hpx::parallel::util::detail { template std::vector> - get_bulk_iteration_shape_idx_variable(ExPolicy&& policy, FwdIter first, + get_bulk_iteration_shape_idx_variable(ExPolicy& policy, FwdIter first, std::size_t count, Stride s = Stride(1)) { using tuple_type = hpx::tuple; @@ -543,6 +571,10 @@ namespace hpx::parallel::util::detail { } // clang-format on + // update executor with new values + policy = hpx::experimental::prefer( + execution::with_processing_units_count, policy, cores); + return shape; } } // namespace hpx::parallel::util::detail diff --git a/libs/core/algorithms/include/hpx/parallel/util/foreach_partitioner.hpp b/libs/core/algorithms/include/hpx/parallel/util/foreach_partitioner.hpp index fccac47d871c..a47090890fc9 100644 --- a/libs/core/algorithms/include/hpx/parallel/util/foreach_partitioner.hpp +++ b/libs/core/algorithms/include/hpx/parallel/util/foreach_partitioner.hpp @@ -36,7 +36,7 @@ namespace hpx::parallel::util::detail { template auto foreach_partition( - ExPolicy&& policy, FwdIter first, std::size_t count, F&& f) + ExPolicy policy, FwdIter first, std::size_t count, F&& f) { // estimate a chunk size based on number of cores used using parameters_type = @@ -53,7 +53,7 @@ namespace hpx::parallel::util::detail { "has_variable_chunk_size and invokes_testing_function"); auto&& shape = detail::get_bulk_iteration_shape_idx_variable( - HPX_FORWARD(ExPolicy, policy), first, count); + policy, first, count); return execution::bulk_async_execute(policy.executor(), partitioner_iteration{HPX_FORWARD(F, f)}, @@ -61,8 +61,8 @@ namespace hpx::parallel::util::detail { } else if constexpr (!invokes_testing_function) { - auto&& shape = detail::get_bulk_iteration_shape_idx( - HPX_FORWARD(ExPolicy, policy), first, count); + auto&& shape = + detail::get_bulk_iteration_shape_idx(policy, first, count); return execution::bulk_async_execute(policy.executor(), partitioner_iteration{HPX_FORWARD(F, f)}, @@ -72,7 +72,7 @@ namespace hpx::parallel::util::detail { { std::vector> inititems; auto&& shape = detail::get_bulk_iteration_shape_idx( - HPX_FORWARD(ExPolicy, policy), inititems, f, first, count); + policy, inititems, f, first, count); auto&& workitems = execution::bulk_async_execute(policy.executor(), partitioner_iteration{HPX_FORWARD(F, f)}, diff --git a/libs/core/algorithms/include/hpx/parallel/util/partitioner.hpp b/libs/core/algorithms/include/hpx/parallel/util/partitioner.hpp index 2decc47e4f19..d458e422a6e5 100644 --- a/libs/core/algorithms/include/hpx/parallel/util/partitioner.hpp +++ b/libs/core/algorithms/include/hpx/parallel/util/partitioner.hpp @@ -40,7 +40,7 @@ namespace hpx::parallel::util::detail { template - auto partition(ExPolicy&& policy, IterOrR it_or_r, std::size_t count, F&& f) + auto partition(ExPolicy policy, IterOrR it_or_r, std::size_t count, F&& f) { // estimate a chunk size based on number of cores used using parameters_type = @@ -57,7 +57,7 @@ namespace hpx::parallel::util::detail { "has_variable_chunk_size and invokes_testing_function"); auto&& shape = detail::get_bulk_iteration_shape_variable( - HPX_FORWARD(ExPolicy, policy), it_or_r, count); + policy, it_or_r, count); return execution::bulk_async_execute(policy.executor(), partitioner_iteration{HPX_FORWARD(F, f)}, @@ -65,8 +65,8 @@ namespace hpx::parallel::util::detail { } else if constexpr (!invokes_testing_function) { - auto&& shape = detail::get_bulk_iteration_shape( - HPX_FORWARD(ExPolicy, policy), it_or_r, count); + auto&& shape = + detail::get_bulk_iteration_shape(policy, it_or_r, count); return execution::bulk_async_execute(policy.executor(), partitioner_iteration{HPX_FORWARD(F, f)}, @@ -76,7 +76,7 @@ namespace hpx::parallel::util::detail { { std::vector> inititems; auto&& shape = detail::get_bulk_iteration_shape( - HPX_FORWARD(ExPolicy, policy), inititems, f, it_or_r, count); + policy, inititems, f, it_or_r, count); auto&& workitems = execution::bulk_async_execute(policy.executor(), partitioner_iteration{HPX_FORWARD(F, f)}, @@ -88,8 +88,8 @@ namespace hpx::parallel::util::detail { template - auto partition_with_index(ExPolicy&& policy, FwdIter first, - std::size_t count, Stride stride, F&& f) + auto partition_with_index( + ExPolicy policy, FwdIter first, std::size_t count, Stride stride, F&& f) { // estimate a chunk size based on number of cores used using parameters_type = @@ -106,7 +106,7 @@ namespace hpx::parallel::util::detail { "has_variable_chunk_size and invokes_testing_function"); auto&& shape = detail::get_bulk_iteration_shape_idx_variable( - HPX_FORWARD(ExPolicy, policy), first, count, stride); + policy, first, count, stride); return execution::bulk_async_execute(policy.executor(), partitioner_iteration{HPX_FORWARD(F, f)}, @@ -115,7 +115,7 @@ namespace hpx::parallel::util::detail { else if constexpr (!invokes_testing_function) { auto&& shape = detail::get_bulk_iteration_shape_idx( - HPX_FORWARD(ExPolicy, policy), first, count, stride); + policy, first, count, stride); return execution::bulk_async_execute(policy.executor(), partitioner_iteration{HPX_FORWARD(F, f)}, @@ -125,8 +125,7 @@ namespace hpx::parallel::util::detail { { std::vector> inititems; auto&& shape = detail::get_bulk_iteration_shape_idx( - HPX_FORWARD(ExPolicy, policy), inititems, f, first, count, - stride); + policy, inititems, f, first, count, stride); auto&& workitems = execution::bulk_async_execute(policy.executor(), partitioner_iteration{HPX_FORWARD(F, f)}, diff --git a/libs/core/algorithms/tests/regressions/CMakeLists.txt b/libs/core/algorithms/tests/regressions/CMakeLists.txt index e4a023dad590..9c132a490c22 100644 --- a/libs/core/algorithms/tests/regressions/CMakeLists.txt +++ b/libs/core/algorithms/tests/regressions/CMakeLists.txt @@ -12,6 +12,7 @@ set(tests for_loop_5735 for_loop_with_auto_chunk_size minimal_findend + num_cores reduce_3641 scan_different_inits scan_non_commutative diff --git a/libs/core/algorithms/tests/regressions/num_cores.cpp b/libs/core/algorithms/tests/regressions/num_cores.cpp new file mode 100644 index 000000000000..b24095b09203 --- /dev/null +++ b/libs/core/algorithms/tests/regressions/num_cores.cpp @@ -0,0 +1,39 @@ +// Copyright (c) 2023 Hartmut Kaiser +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include +#include +#include +#include +#include + +int hpx_main() +{ + hpx::execution::experimental::num_cores nc(2); + auto policy = hpx::execution::par.with(nc); + + HPX_TEST_EQ( + hpx::parallel::execution::processing_units_count(policy.parameters(), + policy.executor(), hpx::chrono::null_duration, 0), + 2); + + auto policy2 = + hpx::parallel::execution::with_processing_units_count(policy, 2); + HPX_TEST_EQ(hpx::parallel::execution::processing_units_count( + hpx::execution::par.parameters(), policy2.executor(), + hpx::chrono::null_duration, 0), + 2); + + return hpx::local::finalize(); +} + +int main(int argc, char* argv[]) +{ + HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv), 0, + "HPX main exited with non-zero status"); + + return hpx::util::report_errors(); +} diff --git a/libs/core/execution/include/hpx/execution/executors/num_cores.hpp b/libs/core/execution/include/hpx/execution/executors/num_cores.hpp index eb3a0e128d73..ab02e9b4f2c9 100644 --- a/libs/core/execution/include/hpx/execution/executors/num_cores.hpp +++ b/libs/core/execution/include/hpx/execution/executors/num_cores.hpp @@ -35,6 +35,15 @@ namespace hpx::execution::experimental { /// \cond NOINTERNAL // discover the number of cores to use for parallelization + template + friend std::size_t tag_invoke( + hpx::parallel::execution::processing_units_count_t, + num_cores params, Executor&&, hpx::chrono::steady_duration const&, + std::size_t) noexcept + { + return params.num_cores_; + } + template constexpr std::size_t processing_units_count(Executor&&, hpx::chrono::steady_duration const&, std::size_t) const noexcept diff --git a/libs/core/execution/tests/regressions/is_executor_1691.cpp b/libs/core/execution/tests/regressions/is_executor_1691.cpp index 3184f83b7721..562b82762128 100644 --- a/libs/core/execution/tests/regressions/is_executor_1691.cpp +++ b/libs/core/execution/tests/regressions/is_executor_1691.cpp @@ -14,9 +14,18 @@ /////////////////////////////////////////////////////////////////////////////// struct my_executor : hpx::execution::parallel_executor { + my_executor() = default; + + my_executor(hpx::execution::parallel_executor const& rhs) {} + + my_executor& operator=(hpx::execution::parallel_executor const& rhs) + { + return *this; + } }; namespace hpx::parallel::execution { + template <> struct is_one_way_executor : std::true_type { diff --git a/libs/core/executors/examples/disable_thread_stealing_executor.cpp b/libs/core/executors/examples/disable_thread_stealing_executor.cpp index 4a3e952a40df..721b1064ec64 100644 --- a/libs/core/executors/examples/disable_thread_stealing_executor.cpp +++ b/libs/core/executors/examples/disable_thread_stealing_executor.cpp @@ -66,6 +66,38 @@ namespace executor_example { } }; + // support all properties exposed by the wrapped executor + // clang-format off + template + )> + auto tag_invoke(Tag tag, + disable_thread_stealing_executor const& exec, + Property&& prop) + -> decltype(disable_thread_stealing_executor( + std::declval()( + std::declval(), std::declval()))) + // clang-format on + { + return disable_thread_stealing_executor( + tag(static_cast(exec), + HPX_FORWARD(Property, prop))); + } + + // clang-format off + template + )> + // clang-format on + auto tag_invoke( + Tag tag, disable_thread_stealing_executor const& exec) + -> decltype(std::declval()(std::declval())) + { + return tag(static_cast(exec)); + } + template auto make_disable_thread_stealing_executor(BaseExecutor&& exec) { diff --git a/libs/core/executors/examples/executor_with_thread_hooks.cpp b/libs/core/executors/examples/executor_with_thread_hooks.cpp index 19a5db048b82..5dc5bd7b113e 100644 --- a/libs/core/executors/examples/executor_with_thread_hooks.cpp +++ b/libs/core/executors/examples/executor_with_thread_hooks.cpp @@ -168,6 +168,12 @@ namespace executor_example { std::forward(predecessor), std::forward(ts)...); } + [[nodiscard]] constexpr std::decay_t const& get_executor() + const noexcept + { + return exec_; + } + private: using thread_hook = hpx::function; diff --git a/libs/core/executors/include/hpx/executors/datapar/execution_policy.hpp b/libs/core/executors/include/hpx/executors/datapar/execution_policy.hpp index dd46113d68ed..6ade940bab80 100644 --- a/libs/core/executors/include/hpx/executors/datapar/execution_policy.hpp +++ b/libs/core/executors/include/hpx/executors/datapar/execution_policy.hpp @@ -58,6 +58,32 @@ namespace hpx::execution { HPX_FORWARD(Parameters_, params)) { } + + template , + simd_task_policy_shim> && + std::is_convertible_v && + std::is_convertible_v>> + explicit constexpr simd_task_policy_shim( + simd_task_policy_shim const& rhs) + : base_type( + simd_task_policy_shim(rhs.executor(), rhs.parameters())) + { + } + + template && + std::is_convertible_v>> + simd_task_policy_shim& operator=( + simd_task_policy_shim const& rhs) + { + base_type::operator=( + simd_task_policy_shim(rhs.executor(), rhs.parameters())); + return *this; + } /// \endcond }; } // namespace detail @@ -70,7 +96,8 @@ namespace hpx::execution { /// /// The algorithm returns a future representing the result of the /// corresponding algorithm when invoked with the sequenced_policy. - using simd_task_policy = detail::simd_task_policy_shim; + using simd_task_policy = detail::simd_task_policy_shim>; namespace detail { @@ -96,6 +123,30 @@ namespace hpx::execution { HPX_FORWARD(Parameters_, params)) { } + + template , + simd_policy_shim> && + std::is_convertible_v && + std::is_convertible_v>> + explicit constexpr simd_policy_shim( + simd_policy_shim const& rhs) + : base_type(simd_policy_shim(rhs.executor(), rhs.parameters())) + { + } + + template && + std::is_convertible_v>> + simd_policy_shim& operator=( + simd_policy_shim const& rhs) + { + base_type::operator=( + simd_policy_shim(rhs.executor(), rhs.parameters())); + return *this; + } /// \endcond }; } // namespace detail @@ -104,7 +155,8 @@ namespace hpx::execution { /// The class simd_policy is an execution policy type used as a unique type /// to disambiguate parallel algorithm overloading and require that a /// parallel algorithm's execution may not be parallelized. - using simd_policy = detail::simd_policy_shim; + using simd_policy = detail::simd_policy_shim>; /// Default sequential execution policy object. inline constexpr simd_policy simd{}; @@ -135,6 +187,32 @@ namespace hpx::execution { HPX_FORWARD(Parameters_, params)) { } + + template , + par_simd_task_policy_shim> && + std::is_convertible_v && + std::is_convertible_v>> + explicit constexpr par_simd_task_policy_shim( + par_simd_task_policy_shim const& rhs) + : base_type( + par_simd_task_policy_shim(rhs.executor(), rhs.parameters())) + { + } + + template && + std::is_convertible_v>> + par_simd_task_policy_shim& operator=( + par_simd_task_policy_shim const& rhs) + { + base_type::operator=(par_simd_task_policy_shim( + rhs.executor(), rhs.parameters())); + return *this; + } /// \endcond }; } // namespace detail @@ -147,7 +225,8 @@ namespace hpx::execution { /// The algorithm returns a future representing the result of the /// corresponding algorithm when invoked with the parallel_policy. using par_simd_task_policy = - detail::par_simd_task_policy_shim; + detail::par_simd_task_policy_shim>; namespace detail { @@ -174,6 +253,32 @@ namespace hpx::execution { HPX_FORWARD(Parameters_, params)) { } + + template , + par_simd_policy_shim> && + std::is_convertible_v && + std::is_convertible_v>> + explicit constexpr par_simd_policy_shim( + par_simd_policy_shim const& rhs) + : base_type( + par_simd_policy_shim(rhs.executor(), rhs.parameters())) + { + } + + template && + std::is_convertible_v>> + par_simd_policy_shim& operator=( + par_simd_policy_shim const& rhs) + { + base_type::operator=( + par_simd_policy_shim(rhs.executor(), rhs.parameters())); + return *this; + } /// \endcond }; } // namespace detail @@ -185,7 +290,8 @@ namespace hpx::execution { /// /// The algorithm returns a future representing the result of the /// corresponding algorithm when invoked with the parallel_policy. - using par_simd_policy = detail::par_simd_policy_shim; + using par_simd_policy = detail::par_simd_policy_shim>; /// Default data-parallel execution policy object. inline constexpr par_simd_policy par_simd{}; diff --git a/libs/core/executors/include/hpx/executors/datapar/execution_policy_fwd.hpp b/libs/core/executors/include/hpx/executors/datapar/execution_policy_fwd.hpp index e34c4c8a9289..e54f6731e76e 100644 --- a/libs/core/executors/include/hpx/executors/datapar/execution_policy_fwd.hpp +++ b/libs/core/executors/include/hpx/executors/datapar/execution_policy_fwd.hpp @@ -15,16 +15,16 @@ namespace hpx::execution::detail { /////////////////////////////////////////////////////////////////////////// - template + template struct simd_policy_shim; - template + template struct simd_task_policy_shim; - template + template struct par_simd_policy_shim; - template + template struct par_simd_task_policy_shim; } // namespace hpx::execution::detail diff --git a/libs/core/executors/include/hpx/executors/execution_policy.hpp b/libs/core/executors/include/hpx/executors/execution_policy.hpp index 1336d99d6241..e6ce801008fb 100644 --- a/libs/core/executors/include/hpx/executors/execution_policy.hpp +++ b/libs/core/executors/include/hpx/executors/execution_policy.hpp @@ -124,10 +124,7 @@ namespace hpx::execution { // The type of the associated executor parameters object which is // associated with this execution policy - using executor_parameters_type = - std::conditional_t, - hpx::traits::executor_parameters_type_t, - decayed_parameters_type>; + using executor_parameters_type = decayed_parameters_type; // The category of the execution agents created by this execution // policy. @@ -284,6 +281,32 @@ namespace hpx::execution { HPX_FORWARD(Parameters_, params)) { } + + template , + sequenced_task_policy_shim> && + std::is_convertible_v && + std::is_convertible_v>> + explicit constexpr sequenced_task_policy_shim( + sequenced_task_policy_shim const& rhs) + : base_type(sequenced_task_policy_shim( + rhs.executor(), rhs.parameters())) + { + } + + template && + std::is_convertible_v>> + sequenced_task_policy_shim& operator=( + sequenced_task_policy_shim const& rhs) + { + base_type::operator=(sequenced_task_policy_shim( + rhs.executor(), rhs.parameters())); + return *this; + } /// \endcond }; } // namespace detail @@ -297,7 +320,8 @@ namespace hpx::execution { /// The algorithm returns a future representing the result of the /// corresponding algorithm when invoked with the sequenced_policy. using sequenced_task_policy = - detail::sequenced_task_policy_shim; + detail::sequenced_task_policy_shim>; namespace detail { @@ -324,6 +348,32 @@ namespace hpx::execution { HPX_FORWARD(Parameters_, params)) { } + + template , + sequenced_policy_shim> && + std::is_convertible_v && + std::is_convertible_v>> + explicit constexpr sequenced_policy_shim( + sequenced_policy_shim const& rhs) + : base_type( + sequenced_policy_shim(rhs.executor(), rhs.parameters())) + { + } + + template && + std::is_convertible_v>> + sequenced_policy_shim& operator=( + sequenced_policy_shim const& rhs) + { + base_type::operator=( + sequenced_policy_shim(rhs.executor(), rhs.parameters())); + return *this; + } /// \endcond }; } // namespace detail @@ -332,7 +382,8 @@ namespace hpx::execution { /// The class sequenced_policy is an execution policy type used as a unique /// type to disambiguate parallel algorithm overloading and require that a /// parallel algorithm's execution may not be parallelized. - using sequenced_policy = detail::sequenced_policy_shim; + using sequenced_policy = detail::sequenced_policy_shim>; /// Default sequential execution policy object. inline constexpr sequenced_policy seq{}; @@ -364,6 +415,32 @@ namespace hpx::execution { HPX_FORWARD(Parameters_, params)) { } + + template , + parallel_task_policy_shim> && + std::is_convertible_v && + std::is_convertible_v>> + explicit constexpr parallel_task_policy_shim( + parallel_task_policy_shim const& rhs) + : base_type( + parallel_task_policy_shim(rhs.executor(), rhs.parameters())) + { + } + + template && + std::is_convertible_v>> + parallel_task_policy_shim& operator=( + parallel_task_policy_shim const& rhs) + { + base_type::operator=(parallel_task_policy_shim( + rhs.executor(), rhs.parameters())); + return *this; + } /// \endcond }; } // namespace detail @@ -376,7 +453,8 @@ namespace hpx::execution { /// The algorithm returns a future representing the result of the /// corresponding algorithm when invoked with the parallel_policy. using parallel_task_policy = - detail::parallel_task_policy_shim; + detail::parallel_task_policy_shim>; namespace detail { @@ -402,6 +480,32 @@ namespace hpx::execution { HPX_FORWARD(Parameters_, params)) { } + + template , + parallel_policy_shim> && + std::is_convertible_v && + std::is_convertible_v>> + explicit constexpr parallel_policy_shim( + parallel_policy_shim const& rhs) + : base_type( + parallel_policy_shim(rhs.executor(), rhs.parameters())) + { + } + + template && + std::is_convertible_v>> + parallel_policy_shim& operator=( + parallel_policy_shim const& rhs) + { + base_type::operator=( + parallel_policy_shim(rhs.executor(), rhs.parameters())); + return *this; + } /// \endcond }; } // namespace detail @@ -410,7 +514,8 @@ namespace hpx::execution { /// The class parallel_policy is an execution policy type used as a unique /// type to disambiguate parallel algorithm overloading and indicate that a /// parallel algorithm's execution may be parallelized. - using parallel_policy = detail::parallel_policy_shim; + using parallel_policy = detail::parallel_policy_shim>; /// Default parallel execution policy object. inline constexpr parallel_policy par{}; @@ -443,6 +548,34 @@ namespace hpx::execution { HPX_FORWARD(Parameters_, params)) { } + + template , + parallel_unsequenced_task_policy_shim> && + std::is_convertible_v && + std::is_convertible_v>> + explicit constexpr parallel_unsequenced_task_policy_shim( + parallel_unsequenced_task_policy_shim const& rhs) + : base_type(parallel_unsequenced_task_policy_shim( + rhs.executor(), rhs.parameters())) + { + } + + template && + std::is_convertible_v>> + parallel_unsequenced_task_policy_shim& operator=( + parallel_unsequenced_task_policy_shim const& rhs) + { + base_type::operator=(parallel_unsequenced_task_policy_shim( + rhs.executor(), rhs.parameters())); + return *this; + } /// \endcond }; } // namespace detail @@ -453,7 +586,8 @@ namespace hpx::execution { /// and indicate that a parallel algorithm's execution may be parallelized /// and vectorized. using parallel_unsequenced_task_policy = - detail::parallel_unsequenced_task_policy_shim; + detail::parallel_unsequenced_task_policy_shim>; namespace detail { @@ -480,6 +614,34 @@ namespace hpx::execution { HPX_FORWARD(Parameters_, params)) { } + + template , + parallel_unsequenced_policy_shim> && + std::is_convertible_v && + std::is_convertible_v>> + explicit constexpr parallel_unsequenced_policy_shim( + parallel_unsequenced_policy_shim const& + rhs) + : base_type(parallel_unsequenced_policy_shim( + rhs.executor(), rhs.parameters())) + { + } + + template && + std::is_convertible_v>> + parallel_unsequenced_policy_shim& operator=( + parallel_unsequenced_policy_shim const& + rhs) + { + base_type::operator=( + parallel_policy_shim(rhs.executor(), rhs.parameters())); + return *this; + } /// \endcond }; } // namespace detail @@ -490,7 +652,8 @@ namespace hpx::execution { /// indicate that a parallel algorithm's execution may be parallelized and /// vectorized. using parallel_unsequenced_policy = - detail::parallel_unsequenced_policy_shim; + detail::parallel_unsequenced_policy_shim>; /// Default vector execution policy object. inline constexpr parallel_unsequenced_policy par_unseq{}; @@ -524,6 +687,32 @@ namespace hpx::execution { HPX_FORWARD(Parameters_, params)) { } + + template , + unsequenced_task_policy_shim> && + std::is_convertible_v && + std::is_convertible_v>> + explicit constexpr unsequenced_task_policy_shim( + unsequenced_task_policy_shim const& rhs) + : base_type(unsequenced_task_policy_shim( + rhs.executor(), rhs.parameters())) + { + } + + template && + std::is_convertible_v>> + unsequenced_task_policy_shim& operator=( + unsequenced_task_policy_shim const& rhs) + { + base_type::operator=(unsequenced_task_policy_shim( + rhs.executor(), rhs.parameters())); + return *this; + } /// \endcond }; } // namespace detail @@ -533,7 +722,8 @@ namespace hpx::execution { /// unique type to disambiguate parallel algorithm overloading and indicate /// that a parallel algorithm's execution may be vectorized. using unsequenced_task_policy = - detail::unsequenced_task_policy_shim; + detail::unsequenced_task_policy_shim>; namespace detail { @@ -559,6 +749,32 @@ namespace hpx::execution { HPX_FORWARD(Parameters_, params)) { } + + template , + unsequenced_policy_shim> && + std::is_convertible_v && + std::is_convertible_v>> + explicit constexpr unsequenced_policy_shim( + unsequenced_policy_shim const& rhs) + : base_type( + unsequenced_policy_shim(rhs.executor(), rhs.parameters())) + { + } + + template && + std::is_convertible_v>> + unsequenced_policy_shim& operator=( + unsequenced_policy_shim const& rhs) + { + base_type::operator=( + unsequenced_policy_shim(rhs.executor(), rhs.parameters())); + return *this; + } /// \endcond }; } // namespace detail @@ -568,7 +784,8 @@ namespace hpx::execution { /// unique type to disambiguate parallel algorithm overloading and indicate /// that a parallel algorithm's execution may be vectorized. using unsequenced_policy = - detail::unsequenced_policy_shim; + detail::unsequenced_policy_shim>; /// Default vector execution policy object. inline constexpr unsequenced_policy unseq{}; diff --git a/libs/core/executors/include/hpx/executors/execution_policy_fwd.hpp b/libs/core/executors/include/hpx/executors/execution_policy_fwd.hpp index 557b0f6f5350..a07f8707c3ee 100644 --- a/libs/core/executors/include/hpx/executors/execution_policy_fwd.hpp +++ b/libs/core/executors/include/hpx/executors/execution_policy_fwd.hpp @@ -11,27 +11,27 @@ namespace hpx::execution::detail { // forward declarations, see execution_policy.hpp - template + template struct sequenced_policy_shim; - template + template struct sequenced_task_policy_shim; - template + template struct parallel_policy_shim; - template + template struct parallel_task_policy_shim; - template + template struct unsequenced_task_policy_shim; - template + template struct unsequenced_policy_shim; - template + template struct parallel_unsequenced_task_policy_shim; - template + template struct parallel_unsequenced_policy_shim; } // namespace hpx::execution::detail diff --git a/libs/core/executors/include/hpx/executors/restricted_thread_pool_executor.hpp b/libs/core/executors/include/hpx/executors/restricted_thread_pool_executor.hpp index 6404f60243df..1073292d0f45 100644 --- a/libs/core/executors/include/hpx/executors/restricted_thread_pool_executor.hpp +++ b/libs/core/executors/include/hpx/executors/restricted_thread_pool_executor.hpp @@ -242,7 +242,7 @@ namespace hpx::parallel::execution { /// \endcond private: - std::uint16_t const first_thread_; + std::uint16_t first_thread_; mutable std::atomic os_thread_; embedded_executor exec_; diff --git a/libs/core/resiliency/include/hpx/resiliency/replay_executor.hpp b/libs/core/resiliency/include/hpx/resiliency/replay_executor.hpp index 23cecc84c582..c0afdedb3b22 100644 --- a/libs/core/resiliency/include/hpx/resiliency/replay_executor.hpp +++ b/libs/core/resiliency/include/hpx/resiliency/replay_executor.hpp @@ -43,9 +43,9 @@ namespace hpx::resiliency::experimental { using future_type = hpx::traits::executor_future_t; - template - explicit replay_executor(BaseExecutor& exec, std::size_t n, F&& f) - : exec_(exec) + template + explicit replay_executor(BaseExecutor_&& exec, std::size_t n, F&& f) + : exec_(HPX_FORWARD(BaseExecutor_, exec)) , replay_count_(n) , validator_(HPX_FORWARD(F, f)) { @@ -154,12 +154,60 @@ namespace hpx::resiliency::experimental { } /// \endcond + public: + BaseExecutor const& get_executor() const + { + return exec_; + } + std::size_t get_replay_count() const + { + return replay_count_; + } + Validate const& get_validator() const + { + return validator_; + } + private: - BaseExecutor& exec_; + BaseExecutor exec_; std::size_t replay_count_; Validate validator_; }; + /////////////////////////////////////////////////////////////////////////// + // support all properties exposed by the wrapped executor + // clang-format off + template + )> + // clang-format on + auto tag_invoke(Tag tag, + replay_executor const& exec, Property&& prop) + -> decltype(replicate_executor( + std::declval()( + std::declval(), std::declval()), + std::declval(), std::declval())) + { + return replay_executor( + tag(exec.get_executor(), HPX_FORWARD(Property, prop)), + exec.get_replay_count(), exec.get_validator()); + } + + // clang-format off + template + )> + // clang-format on + auto tag_invoke( + Tag tag, replay_executor const& exec) + -> decltype(std::declval()(std::declval())) + { + return tag(exec.get_executor()); + } + /////////////////////////////////////////////////////////////////////////// template replay_executor> make_replay_executor( diff --git a/libs/core/resiliency/include/hpx/resiliency/replicate_executor.hpp b/libs/core/resiliency/include/hpx/resiliency/replicate_executor.hpp index 81f499fd4b28..2fee3fe6eb2e 100644 --- a/libs/core/resiliency/include/hpx/resiliency/replicate_executor.hpp +++ b/libs/core/resiliency/include/hpx/resiliency/replicate_executor.hpp @@ -43,10 +43,10 @@ namespace hpx::resiliency::experimental { using future_type = hpx::traits::executor_future_t; - template + template explicit replicate_executor( - BaseExecutor& exec, std::size_t n, V&& v, F&& f) - : exec_(exec) + BaseExecutor_&& exec, std::size_t n, V&& v, F&& f) + : exec_(HPX_FORWARD(BaseExecutor_, exec)) , replicate_count_(n) , voter_(HPX_FORWARD(V, v)) , validator_(HPX_FORWARD(F, f)) @@ -157,13 +157,68 @@ namespace hpx::resiliency::experimental { } /// \endcond + public: + BaseExecutor const& get_executor() const + { + return exec_; + } + std::size_t get_replicate_count() const + { + return replicate_count_; + } + Vote const& get_voter() const + { + return voter_; + } + Validate const& get_validator() const + { + return validator_; + } + private: - BaseExecutor& exec_; + BaseExecutor exec_; std::size_t replicate_count_; Vote voter_; Validate validator_; }; + /////////////////////////////////////////////////////////////////////////// + // support all properties exposed by the wrapped executor + // clang-format off + template + )> + // clang-format on + auto tag_invoke(Tag tag, + replicate_executor const& exec, + Property&& prop) + -> decltype(replicate_executor( + std::declval()( + std::declval(), std::declval()), + std::declval(), std::declval(), + std::declval())) + { + return replicate_executor( + tag(exec.get_executor(), HPX_FORWARD(Property, prop)), + exec.get_replicate_count(), exec.get_voter(), exec.get_validator()); + } + + // clang-format off + template + )> + // clang-format on + auto tag_invoke( + Tag tag, replicate_executor const& exec) + -> decltype(std::declval()(std::declval())) + { + return tag(exec.get_executor()); + } + /////////////////////////////////////////////////////////////////////////// template replicate_executor, diff --git a/libs/full/checkpoint/examples/1d_stencil_4_checkpoint.cpp b/libs/full/checkpoint/examples/1d_stencil_4_checkpoint.cpp index 7e52a55f9632..9a8fcf5f933d 100644 --- a/libs/full/checkpoint/examples/1d_stencil_4_checkpoint.cpp +++ b/libs/full/checkpoint/examples/1d_stencil_4_checkpoint.cpp @@ -6,25 +6,26 @@ // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// This is the fourth in a series of examples demonstrating the development of -// a fully distributed solver for a simple 1D heat distribution problem. +// This is the fourth in a series of examples demonstrating the development of a +// fully distributed solver for a simple 1D heat distribution problem. // // This example builds on example three. It futurizes the code from that // example. Compared to example two this code runs much more efficiently. It // allows for changing the amount of work executed in one HPX thread which -// enables tuning the performance for the optimal grain size of the -// computation. This example is still fully local but demonstrates nice -// scalability on SMP machines. +// enables tuning the performance for the optimal grain size of the computation. +// This example is still fully local but demonstrates nice scalability on SMP +// machines. // // In this variation of stencil we use the save_checkpoint and -// revive_checkpint functions to back up the state of the application -// every n time steps. +// restore_checkpoint functions to back up the state of the application every n +// time steps. // #include #include #include +#include #include #include #include @@ -63,16 +64,12 @@ inline std::size_t idx(std::size_t i, int dir, std::size_t size) struct partition_data { private: - typedef hpx::serialization::serialize_buffer buffer_type; + using buffer_type = hpx::serialization::serialize_buffer; public: - partition_data() - : data_() - , size_(0) - { - } + partition_data() = default; - partition_data(std::size_t size) + explicit partition_data(std::size_t size) : data_(std::allocator().allocate(size), size, buffer_type::take) , size_(size) { @@ -82,12 +79,12 @@ struct partition_data : data_(std::allocator().allocate(size), size, buffer_type::take) , size_(size) { - double base_value = double(initial_value * size); + double const base_value = static_cast(initial_value * size); for (std::size_t i = 0; i != size; ++i) - data_[i] = base_value + double(i); + data_[i] = base_value + static_cast(i); } - partition_data(const partition_data& old_part) + partition_data(partition_data const& old_part) : data_(std::allocator().allocate(old_part.size()), old_part.size(), buffer_type::take) , size_(old_part.size()) @@ -114,12 +111,12 @@ struct partition_data private: buffer_type data_; - std::size_t size_; + std::size_t size_ = 0; // Serialization Definitions friend class hpx::serialization::access; template - void serialize(Volume& vol, const unsigned int) + void serialize(Volume& vol, unsigned int const) { // clang-format off vol & data_ & size_; @@ -153,12 +150,12 @@ struct backup , file_name_(file_name) { } - backup(backup&& old) + backup(backup&& old) noexcept : bin(std::move(old.bin)) , file_name_(std::move(old.file_name_)) { } - ~backup() {} + ~backup() = default; void save(partition_data const& status, std::size_t index) { @@ -167,9 +164,10 @@ struct backup void write() { - hpx::util::checkpoint archive_data = + hpx::util::checkpoint const archive_data = hpx::util::save_checkpoint(hpx::launch::sync, bin); - // Make sure file stream is bianary for Windows/Mac machines + + // Make sure file stream is binary for Windows/Mac machines std::ofstream file_archive( file_name_, std::ios::binary | std::ios::out); if (file_archive.is_open()) @@ -180,20 +178,19 @@ struct backup { std::cout << "Error opening file!" << std::endl; } - file_archive.close(); } void revive(std::vector>>& U, std::size_t nx) { hpx::util::checkpoint temp_archive; - // Make sure file stream is bianary for Windows/Mac machines + // Make sure file stream is binary for Windows/Mac machines std::ifstream ist(file_name_, std::ios::binary | std::ios::in); ist >> temp_archive; hpx::util::restore_checkpoint(temp_archive, bin); for (std::size_t i = 0; i < U[0].size(); i++) { - partition_data temp(nx, double(i)); + partition_data temp(nx, static_cast(i)); hpx::util::restore_checkpoint(bin[i], temp); //Check for (std::size_t e = 0; e < temp.size(); e++) @@ -206,7 +203,8 @@ struct backup } }; -void print(std::vector>> U) +void print( + std::vector>> const& U) { for (std::size_t out = 0; out < U[0].size(); out++) { @@ -220,7 +218,7 @@ void print(std::vector>> U) } std::cout << std::endl; } -void print_space(std::vector> next) +void print_space(std::vector> const& next) { for (std::size_t out = 0; out < next.size(); out++) { @@ -239,8 +237,8 @@ void print_space(std::vector> next) struct stepper { // Our data for one time step - typedef hpx::shared_future partition; - typedef std::vector space; + using partition = hpx::shared_future; + using space = std::vector; // Our operator static double heat(double left, double middle, double right) @@ -253,7 +251,7 @@ struct stepper static partition_data heat_part(partition_data const& left, partition_data const& middle, partition_data const& right) { - std::size_t size = middle.size(); + std::size_t const size = middle.size(); partition_data next(size); next[0] = heat(left[size - 1], middle[0], middle[1]); @@ -270,8 +268,9 @@ struct stepper // do all the work on 'np' partitions, 'nx' data points each, for 'nt' // time steps, limit depth of dependency tree to 'nd' - hpx::future do_work(std::size_t np, std::size_t nx, std::size_t nt, - std::uint64_t nd, std::uint64_t cp, std::string rsf, std::string fn) + static hpx::future do_work(std::size_t np, std::size_t nx, + std::size_t nt, std::uint64_t nd, std::uint64_t cp, std::string rsf, + std::string fn) { using hpx::dataflow; using hpx::unwrapping; @@ -302,7 +301,8 @@ struct stepper auto range = hpx::util::counting_shape(np); using hpx::execution::par; hpx::ranges::for_each(par, range, [&U, nx](std::size_t i) { - U[0][i] = hpx::make_ready_future(partition_data(nx, double(i))); + U[0][i] = hpx::make_ready_future( + partition_data(nx, static_cast(i))); }); //Initialize from backup @@ -404,16 +404,18 @@ struct stepper /////////////////////////////////////////////////////////////////////////////// int hpx_main(hpx::program_options::variables_map& vm) { - std::uint64_t np = vm["np"].as(); // Number of partitions. - std::uint64_t nx = + std::uint64_t const np = + vm["np"].as(); // Number of partitions. + std::uint64_t const nx = vm["nx"].as(); // Number of grid points. - std::uint64_t nt = vm["nt"].as(); // Number of steps. - std::uint64_t nd = + std::uint64_t const nt = + vm["nt"].as(); // Number of steps. + std::uint64_t const nd = vm["nd"].as(); // Max depth of dep tree. - std::uint64_t cp = + std::uint64_t const cp = vm["cp"].as(); // Num. steps to checkpoint - std::string rsf = vm["restart-file"].as(); - std::string fn = vm["output-file"].as(); + std::string const rsf = vm["restart-file"].as(); + std::string const fn = vm["output-file"].as(); if (vm.count("no-header")) header = false; @@ -422,7 +424,7 @@ int hpx_main(hpx::program_options::variables_map& vm) stepper step; // Measure execution time. - std::uint64_t t = hpx::chrono::high_resolution_clock::now(); + std::uint64_t const t = hpx::chrono::high_resolution_clock::now(); // Execute nt time steps on nx grid points and print the final solution. hpx::future result = @@ -431,7 +433,7 @@ int hpx_main(hpx::program_options::variables_map& vm) stepper::space solution = result.get(); hpx::wait_all(solution); - std::uint64_t elapsed = hpx::chrono::high_resolution_clock::now() - t; + std::uint64_t const elapsed = hpx::chrono::high_resolution_clock::now() - t; // Print the final solution if (vm.count("results"))