Skip to content

Commit

Permalink
Making sure changed number of counts is propagated to executor
Browse files Browse the repository at this point in the history
  • Loading branch information
hkaiser committed Aug 1, 2023
1 parent c16ae10 commit 91fc649
Show file tree
Hide file tree
Showing 18 changed files with 664 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ namespace hpx { namespace ranges {

namespace hpx::ranges {

inline constexpr struct is_sorted_t final
inline constexpr struct is_sorted_t
: hpx::detail::tag_parallel_algorithm<is_sorted_t>
{
private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <hpx/config.hpp>
#include <hpx/assert.hpp>
#include <hpx/async_base/scheduling_properties.hpp>
#include <hpx/datastructures/tuple.hpp>
#include <hpx/execution/algorithms/detail/is_negative.hpp>
#include <hpx/execution/algorithms/detail/predicates.hpp>
Expand All @@ -16,6 +17,7 @@
#include <hpx/futures/future.hpp>
#include <hpx/iterator_support/iterator_range.hpp>
#include <hpx/parallel/util/detail/chunk_size_iterator.hpp>
#include <hpx/properties/property.hpp>

#include <algorithm>
#include <cstddef>
Expand Down Expand Up @@ -132,7 +134,7 @@ namespace hpx::parallel::util::detail {
template <typename ExPolicy, typename IterOrR,
typename Stride = std::size_t>
hpx::util::iterator_range<chunk_size_iterator<IterOrR>>
get_bulk_iteration_shape(ExPolicy&& policy, IterOrR& it_or_r,
get_bulk_iteration_shape(ExPolicy& policy, IterOrR& it_or_r,
std::size_t& count, Stride s = Stride(1))
{
if (count == 0)
Expand Down Expand Up @@ -166,6 +168,10 @@ namespace hpx::parallel::util::detail {
// clang-format on
}

// update executor with new values
policy = hpx::experimental::prefer(
execution::with_processing_units_count, policy, cores);

auto shape_begin = chunk_size_iterator(it_or_r, chunk_size, count);
auto shape_end = chunk_size_iterator(last, chunk_size, count, count);

Expand All @@ -175,7 +181,7 @@ namespace hpx::parallel::util::detail {
template <typename ExPolicy, typename Future, typename F1, typename IterOrR,
typename Stride = std::size_t>
hpx::util::iterator_range<chunk_size_iterator<IterOrR>>
get_bulk_iteration_shape(ExPolicy&& policy, std::vector<Future>& workitems,
get_bulk_iteration_shape(ExPolicy& policy, std::vector<Future>& workitems,
F1&& f1, IterOrR& it_or_r, std::size_t& count, Stride s = Stride(1))
{
if (count == 0)
Expand Down Expand Up @@ -241,6 +247,10 @@ namespace hpx::parallel::util::detail {
// clang-format on
}

// update executor with new values
policy = hpx::experimental::prefer(
execution::with_processing_units_count, policy, cores);

auto shape_begin = chunk_size_iterator(it_or_r, chunk_size, count);
auto shape_end = chunk_size_iterator(last, chunk_size, count, count);

Expand All @@ -250,7 +260,7 @@ namespace hpx::parallel::util::detail {
template <typename ExPolicy, typename IterOrR,
typename Stride = std::size_t>
std::vector<hpx::tuple<IterOrR, std::size_t>>
get_bulk_iteration_shape_variable(ExPolicy&& policy, IterOrR& it_or_r,
get_bulk_iteration_shape_variable(ExPolicy& policy, IterOrR& it_or_r,
std::size_t& count, Stride s = Stride(1))
{
using tuple_type = hpx::tuple<IterOrR, std::size_t>;
Expand Down Expand Up @@ -308,27 +318,31 @@ namespace hpx::parallel::util::detail {
}
// clang-format on

// update executor with new values
policy = hpx::experimental::prefer(
execution::with_processing_units_count, policy, cores);

return shape;
}

template <typename ExPolicy, typename Future, typename F1, typename FwdIter,
typename Stride = std::size_t>
decltype(auto) get_bulk_iteration_shape(std::false_type, ExPolicy&& policy,
decltype(auto) get_bulk_iteration_shape(std::false_type, ExPolicy& policy,
std::vector<Future>& workitems, F1&& f1, FwdIter& begin,
std::size_t& count, Stride s = Stride(1))
{
return get_bulk_iteration_shape(HPX_FORWARD(ExPolicy, policy),
workitems, HPX_FORWARD(F1, f1), begin, count, s);
return get_bulk_iteration_shape(
policy, workitems, HPX_FORWARD(F1, f1), begin, count, s);
}

template <typename ExPolicy, typename Future, typename F1, typename FwdIter,
typename Stride = std::size_t>
decltype(auto) get_bulk_iteration_shape(std::true_type, ExPolicy&& policy,
decltype(auto) get_bulk_iteration_shape(std::true_type, ExPolicy& policy,
std::vector<Future>& workitems, F1&& f1, FwdIter& begin,
std::size_t& count, Stride s = Stride(1))
{
return get_bulk_iteration_shape_variable(HPX_FORWARD(ExPolicy, policy),
workitems, HPX_FORWARD(F1, f1), begin, count, s);
return get_bulk_iteration_shape_variable(
policy, workitems, HPX_FORWARD(F1, f1), begin, count, s);
}

///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -360,7 +374,7 @@ namespace hpx::parallel::util::detail {
typename Stride = std::size_t>
hpx::util::iterator_range<
parallel::util::detail::chunk_size_idx_iterator<FwdIter>>
get_bulk_iteration_shape_idx(ExPolicy&& policy, FwdIter begin,
get_bulk_iteration_shape_idx(ExPolicy& policy, FwdIter begin,
std::size_t count, Stride s = Stride(1))
{
using iterator =
Expand Down Expand Up @@ -397,6 +411,13 @@ namespace hpx::parallel::util::detail {
// clang-format on
}

// update executor with new values
policy = hpx::experimental::prefer(
execution::with_processing_units_count, policy, cores);

using iterator =
parallel::util::detail::chunk_size_idx_iterator<FwdIter>;

iterator shape_begin(begin, chunk_size, count, 0, 0);
iterator shape_end(last, chunk_size, count, count, 0);

Expand All @@ -407,7 +428,7 @@ namespace hpx::parallel::util::detail {
typename Stride = std::size_t>
hpx::util::iterator_range<
parallel::util::detail::chunk_size_idx_iterator<FwdIter>>
get_bulk_iteration_shape_idx(ExPolicy&& policy,
get_bulk_iteration_shape_idx(ExPolicy& policy,
std::vector<Future>& workitems, F1&& f1, FwdIter begin,
std::size_t count, Stride s = Stride(1))
{
Expand Down Expand Up @@ -475,6 +496,13 @@ namespace hpx::parallel::util::detail {
// clang-format on
}

// update executor with new values
policy = hpx::experimental::prefer(
execution::with_processing_units_count, policy, cores);

using iterator =
parallel::util::detail::chunk_size_idx_iterator<FwdIter>;

iterator shape_begin(begin, chunk_size, count, 0, base_idx);
iterator shape_end(last, chunk_size, count, count, base_idx);

Expand All @@ -484,7 +512,7 @@ namespace hpx::parallel::util::detail {
template <typename ExPolicy, typename FwdIter,
typename Stride = std::size_t>
std::vector<hpx::tuple<FwdIter, std::size_t, std::size_t>>
get_bulk_iteration_shape_idx_variable(ExPolicy&& policy, FwdIter first,
get_bulk_iteration_shape_idx_variable(ExPolicy& policy, FwdIter first,
std::size_t count, Stride s = Stride(1))
{
using tuple_type = hpx::tuple<FwdIter, std::size_t, std::size_t>;
Expand Down Expand Up @@ -543,6 +571,10 @@ namespace hpx::parallel::util::detail {
}
// clang-format on

// update executor with new values
policy = hpx::experimental::prefer(
execution::with_processing_units_count, policy, cores);

return shape;
}
} // namespace hpx::parallel::util::detail
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace hpx::parallel::util::detail {

template <typename Result, typename ExPolicy, typename FwdIter, typename F>
auto foreach_partition(
ExPolicy&& policy, FwdIter first, std::size_t count, F&& f)
ExPolicy policy, FwdIter first, std::size_t count, F&& f)
{
// estimate a chunk size based on number of cores used
using parameters_type =
Expand All @@ -53,16 +53,16 @@ namespace hpx::parallel::util::detail {
"has_variable_chunk_size and invokes_testing_function");

auto&& shape = detail::get_bulk_iteration_shape_idx_variable(
HPX_FORWARD(ExPolicy, policy), first, count);
policy, first, count);

return execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
HPX_MOVE(shape));
}
else if constexpr (!invokes_testing_function)
{
auto&& shape = detail::get_bulk_iteration_shape_idx(
HPX_FORWARD(ExPolicy, policy), first, count);
auto&& shape =
detail::get_bulk_iteration_shape_idx(policy, first, count);

return execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand All @@ -72,7 +72,7 @@ namespace hpx::parallel::util::detail {
{
std::vector<hpx::future<Result>> inititems;
auto&& shape = detail::get_bulk_iteration_shape_idx(
HPX_FORWARD(ExPolicy, policy), inititems, f, first, count);
policy, inititems, f, first, count);

auto&& workitems = execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand Down
21 changes: 10 additions & 11 deletions libs/core/algorithms/include/hpx/parallel/util/partitioner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
namespace hpx::parallel::util::detail {

template <typename Result, typename ExPolicy, typename IterOrR, typename F>
auto partition(ExPolicy&& policy, IterOrR it_or_r, std::size_t count, F&& f)
auto partition(ExPolicy policy, IterOrR it_or_r, std::size_t count, F&& f)
{
// estimate a chunk size based on number of cores used
using parameters_type =
Expand All @@ -57,16 +57,16 @@ namespace hpx::parallel::util::detail {
"has_variable_chunk_size and invokes_testing_function");

auto&& shape = detail::get_bulk_iteration_shape_variable(
HPX_FORWARD(ExPolicy, policy), it_or_r, count);
policy, it_or_r, count);

return execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
HPX_MOVE(shape));
}
else if constexpr (!invokes_testing_function)
{
auto&& shape = detail::get_bulk_iteration_shape(
HPX_FORWARD(ExPolicy, policy), it_or_r, count);
auto&& shape =
detail::get_bulk_iteration_shape(policy, it_or_r, count);

return execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand All @@ -76,7 +76,7 @@ namespace hpx::parallel::util::detail {
{
std::vector<hpx::future<Result>> inititems;
auto&& shape = detail::get_bulk_iteration_shape(
HPX_FORWARD(ExPolicy, policy), inititems, f, it_or_r, count);
policy, inititems, f, it_or_r, count);

auto&& workitems = execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand All @@ -88,8 +88,8 @@ namespace hpx::parallel::util::detail {

template <typename Result, typename ExPolicy, typename FwdIter,
typename Stride, typename F>
auto partition_with_index(ExPolicy&& policy, FwdIter first,
std::size_t count, Stride stride, F&& f)
auto partition_with_index(
ExPolicy policy, FwdIter first, std::size_t count, Stride stride, F&& f)
{
// estimate a chunk size based on number of cores used
using parameters_type =
Expand All @@ -106,7 +106,7 @@ namespace hpx::parallel::util::detail {
"has_variable_chunk_size and invokes_testing_function");

auto&& shape = detail::get_bulk_iteration_shape_idx_variable(
HPX_FORWARD(ExPolicy, policy), first, count, stride);
policy, first, count, stride);

return execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand All @@ -115,7 +115,7 @@ namespace hpx::parallel::util::detail {
else if constexpr (!invokes_testing_function)
{
auto&& shape = detail::get_bulk_iteration_shape_idx(
HPX_FORWARD(ExPolicy, policy), first, count, stride);
policy, first, count, stride);

return execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand All @@ -125,8 +125,7 @@ namespace hpx::parallel::util::detail {
{
std::vector<hpx::future<Result>> inititems;
auto&& shape = detail::get_bulk_iteration_shape_idx(
HPX_FORWARD(ExPolicy, policy), inititems, f, first, count,
stride);
policy, inititems, f, first, count, stride);

auto&& workitems = execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand Down
1 change: 1 addition & 0 deletions libs/core/algorithms/tests/regressions/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ set(tests
for_loop_5735
for_loop_with_auto_chunk_size
minimal_findend
num_cores
reduce_3641
scan_different_inits
scan_non_commutative
Expand Down
39 changes: 39 additions & 0 deletions libs/core/algorithms/tests/regressions/num_cores.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) 2023 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/algorithm.hpp>
#include <hpx/chrono.hpp>
#include <hpx/execution.hpp>
#include <hpx/init.hpp>
#include <hpx/modules/testing.hpp>

int hpx_main()
{
hpx::execution::experimental::num_cores nc(2);
auto policy = hpx::execution::par.with(nc);

HPX_TEST_EQ(
hpx::parallel::execution::processing_units_count(policy.parameters(),
policy.executor(), hpx::chrono::null_duration, 0),
2);

auto policy2 =
hpx::parallel::execution::with_processing_units_count(policy, 2);
HPX_TEST_EQ(hpx::parallel::execution::processing_units_count(
hpx::execution::par.parameters(), policy2.executor(),
hpx::chrono::null_duration, 0),
2);

return hpx::local::finalize();
}

int main(int argc, char* argv[])
{
HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv), 0,
"HPX main exited with non-zero status");

return hpx::util::report_errors();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ namespace hpx::execution::experimental {

/// \cond NOINTERNAL
// discover the number of cores to use for parallelization
template <typename Executor>
friend std::size_t tag_invoke(
hpx::parallel::execution::processing_units_count_t,
num_cores params, Executor&&, hpx::chrono::steady_duration const&,
std::size_t) noexcept
{
return params.num_cores_;
}

template <typename Executor>
constexpr std::size_t processing_units_count(Executor&&,
hpx::chrono::steady_duration const&, std::size_t) const noexcept
Expand Down
9 changes: 9 additions & 0 deletions libs/core/execution/tests/regressions/is_executor_1691.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,18 @@
///////////////////////////////////////////////////////////////////////////////
struct my_executor : hpx::execution::parallel_executor
{
my_executor() = default;

my_executor(hpx::execution::parallel_executor const& rhs) {}

my_executor& operator=(hpx::execution::parallel_executor const& rhs)
{
return *this;
}
};

namespace hpx::parallel::execution {

template <>
struct is_one_way_executor<my_executor> : std::true_type
{
Expand Down
Loading

0 comments on commit 91fc649

Please sign in to comment.