From f9106749d6c1ebef97ea41f6a66c36dbddc7a491 Mon Sep 17 00:00:00 2001 From: Hartmut Kaiser Date: Fri, 27 Jan 2023 16:07:16 -0600 Subject: [PATCH] Fixing merge conflicts, revert changes to thread exit callbacks --- .../include/hpx/modules/schedulers.hpp | 2 + .../local_workrequesting_scheduler.hpp | 78 +++++++------ .../include/hpx/schedulers/thread_queue.hpp | 3 +- .../hpx/thread_pools/scheduling_loop.hpp | 11 +- .../include/hpx/threading/thread.hpp | 6 +- libs/core/threading/src/thread.cpp | 12 +- .../hpx/threading_base/register_thread.hpp | 2 + .../hpx/threading_base/thread_data.hpp | 32 +----- .../hpx/threading_base/thread_helpers.hpp | 3 + libs/core/threading_base/src/thread_data.cpp | 103 ++++-------------- .../threading_base/src/thread_helpers.cpp | 16 +++ 11 files changed, 106 insertions(+), 162 deletions(-) diff --git a/libs/core/schedulers/include/hpx/modules/schedulers.hpp b/libs/core/schedulers/include/hpx/modules/schedulers.hpp index d6790d352d73..2de14cc98363 100644 --- a/libs/core/schedulers/include/hpx/modules/schedulers.hpp +++ b/libs/core/schedulers/include/hpx/modules/schedulers.hpp @@ -6,6 +6,8 @@ #pragma once +#include + #include #include #include diff --git a/libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp b/libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp index bb025b80e987..46e0ff678d2b 100644 --- a/libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp +++ b/libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -46,7 +47,7 @@ // different working principle if compared to the classic work-stealing. Instead // of actively pulling work from the work queues of neighboring cores it relies // on a push model. Cores that run out of work post steal requests that are -// handled by cores that have work awailable by actively sending tasks to the +// handled by cores that have work available by actively sending tasks to the // requesting core. // // When a worker runs out of tasks, it becomes a thief by sending steal requests @@ -115,18 +116,18 @@ namespace hpx::threads::policies { using thread_queue_type = thread_queue; - public: struct init_parameter { init_parameter(std::size_t num_queues, detail::affinity_data const& affinity_data, - std::size_t num_high_priority_queues = std::size_t(-1), + std::size_t num_high_priority_queues = static_cast( + -1), thread_queue_init_parameters const& thread_queue_init = thread_queue_init_parameters{}, char const* description = "local_workrequesting_scheduler") : num_queues_(num_queues) , num_high_priority_queues_( - num_high_priority_queues == std::size_t(-1) ? + num_high_priority_queues == static_cast(-1) ? num_queues : num_high_priority_queues) , thread_queue_init_(thread_queue_init) @@ -159,7 +160,8 @@ namespace hpx::threads::policies { struct task_data { explicit HPX_HOST_DEVICE_CONSTEXPR task_data( - std::uint16_t num_thread = std::uint16_t(-1)) noexcept + std::uint16_t num_thread = static_cast( + -1)) noexcept : num_thread_(num_thread) { } @@ -188,8 +190,8 @@ namespace hpx::threads::policies { { } - steal_request(std::size_t num_thread, task_channel* channel, - mask_cref_type victims, bool idle, bool stealhalf) + steal_request(std::size_t const num_thread, task_channel* channel, + mask_cref_type victims, bool idle, bool const stealhalf) : channel_(channel) , victims_(victims) , num_thread_(static_cast(num_thread)) @@ -221,6 +223,11 @@ namespace hpx::threads::policies { { } + scheduler_data(scheduler_data const&) = delete; + scheduler_data(scheduler_data&&) = delete; + scheduler_data& operator=(scheduler_data const&) = delete; + scheduler_data& operator=(scheduler_data&&) = delete; + ~scheduler_data() { delete queue_; @@ -330,9 +337,18 @@ namespace hpx::threads::policies { } } + local_workrequesting_scheduler( + local_workrequesting_scheduler const&) = delete; + local_workrequesting_scheduler( + local_workrequesting_scheduler&&) = delete; + local_workrequesting_scheduler& operator=( + local_workrequesting_scheduler const&) = delete; + local_workrequesting_scheduler& operator=( + local_workrequesting_scheduler&&) = delete; + ~local_workrequesting_scheduler() override = default; - static std::string get_scheduler_name() + static std::string_view get_scheduler_name() { return "local_workrequesting_scheduler"; } @@ -659,9 +675,9 @@ namespace hpx::threads::policies { std::size_t num_thread = data.schedulehint.mode == thread_schedule_hint_mode::thread ? data.schedulehint.hint : - std::size_t(-1); + static_cast(-1); - if (std::size_t(-1) == num_thread) + if (static_cast(-1) == num_thread) { num_thread = curr_queue_++ % num_queues_; } @@ -889,7 +905,7 @@ namespace hpx::threads::policies { // Return the next thread to be executed, return false if none is // available bool get_next_thread(std::size_t num_thread, bool running, - thread_id_ref_type& thrd, bool enable_stealing) override + thread_id_ref_type& thrd, bool enable_stealing) { HPX_ASSERT(num_thread < num_queues_); @@ -968,9 +984,9 @@ namespace hpx::threads::policies { void schedule_thread(thread_id_ref_type thrd, threads::thread_schedule_hint schedulehint, bool allow_fallback = false, - thread_priority priority = thread_priority::normal) override + thread_priority priority = thread_priority::default_) override { - std::size_t num_thread = std::size_t(-1); + std::size_t num_thread = static_cast(-1); if (schedulehint.mode == thread_schedule_hint_mode::thread) { num_thread = schedulehint.hint; @@ -980,7 +996,7 @@ namespace hpx::threads::policies { allow_fallback = false; } - if (std::size_t(-1) == num_thread) + if (static_cast(-1) == num_thread) { num_thread = curr_queue_++ % num_queues_; } @@ -1038,9 +1054,9 @@ namespace hpx::threads::policies { void schedule_thread_last(thread_id_ref_type thrd, threads::thread_schedule_hint schedulehint, bool allow_fallback = false, - thread_priority priority = thread_priority::normal) override + thread_priority priority = thread_priority::default_) override { - std::size_t num_thread = std::size_t(-1); + std::size_t num_thread = static_cast(-1); if (schedulehint.mode == thread_schedule_hint_mode::thread) { num_thread = schedulehint.hint; @@ -1050,7 +1066,7 @@ namespace hpx::threads::policies { allow_fallback = false; } - if (std::size_t(-1) == num_thread) + if (static_cast(-1) == num_thread) { num_thread = curr_queue_++ % num_queues_; } @@ -1109,12 +1125,11 @@ namespace hpx::threads::policies { /////////////////////////////////////////////////////////////////////// // This returns the current length of the queues (work items and new // items) - std::int64_t get_queue_length( - std::size_t num_thread = std::size_t(-1)) const override + std::int64_t get_queue_length(std::size_t num_thread) const override { // Return queue length of one specific queue. std::int64_t count = 0; - if (std::size_t(-1) != num_thread) + if (static_cast(-1) != num_thread) { HPX_ASSERT(num_thread < num_queues_); auto const& d = data_[num_thread].data_; @@ -1150,12 +1165,12 @@ namespace hpx::threads::policies { std::int64_t get_thread_count( thread_schedule_state state = thread_schedule_state::unknown, thread_priority priority = thread_priority::default_, - std::size_t num_thread = std::size_t(-1), + std::size_t num_thread = static_cast(-1), bool /* reset */ = false) const override { // Return thread count of one specific queue. std::int64_t count = 0; - if (std::size_t(-1) != num_thread) + if (static_cast(-1) != num_thread) { HPX_ASSERT(num_thread < num_queues_); @@ -1207,7 +1222,6 @@ namespace hpx::threads::policies { "local_workrequesting_scheduler::get_thread_count", "unknown thread priority value " "(thread_priority::unknown)"); - return 0; } } return 0; @@ -1275,7 +1289,6 @@ namespace hpx::threads::policies { "local_workrequesting_scheduler::get_thread_count", "unknown thread priority value " "(thread_priority::unknown)"); - return 0; } } return count; @@ -1444,7 +1457,7 @@ namespace hpx::threads::policies { // generate at most 3 random numbers before resorting to more // expensive algorithm std::uniform_int_distribution uniform( - 0, std::int16_t(num_queues_ - 1)); + 0, static_cast(num_queues_ - 1)); int attempts = 0; do @@ -1461,8 +1474,9 @@ namespace hpx::threads::policies { // to avoid infinite trials we randomly select one of the possible // victims - std::uniform_int_distribution uniform( - 0, std::int16_t(num_queues_ - count(req.victims_) - 1)); + std::uniform_int_distribution uniform(0, + static_cast( + num_queues_ - count(req.victims_) - 1)); // generate one more random number std::size_t selected_victim = uniform(gen_); @@ -1489,7 +1503,7 @@ namespace hpx::threads::policies { std::size_t next_victim([[maybe_unused]] scheduler_data& d, steal_request const& req) noexcept { - std::size_t victim = std::size_t(-1); + std::size_t victim; // return thief if max steal attempts has been reached or no more // cores are available for stealing @@ -1516,7 +1530,7 @@ namespace hpx::threads::policies { } // couldn't find victim, return steal request to thief - if (victim == std::size_t(-1)) + if (victim == static_cast(-1)) { victim = req.num_thread_; HPX_ASSERT(victim != d.num_thread_); @@ -1650,7 +1664,7 @@ namespace hpx::threads::policies { bool wait_or_add_new(std::size_t num_thread, bool running, [[maybe_unused]] std::int64_t& idle_loop_count, bool enable_stealing, std::size_t& added, - thread_id_ref_type* next_thrd = nullptr) override + thread_id_ref_type* next_thrd = nullptr) { HPX_ASSERT(num_thread < num_queues_); @@ -1755,11 +1769,11 @@ namespace hpx::threads::policies { low_priority_queue_.on_start_thread(num_thread); } - std::size_t num_threads = num_queues_; + std::size_t const num_threads = num_queues_; //auto const& topo = create_topology(); // Initially set all bits, code below resets the bits corresponding - // to cores that can serve as a vistim for the current core. A set + // to cores that can serve as a victim for the current core. A set // bit in this mask means 'do not steal from this core'. resize(d.victims_, num_threads); reset(d.victims_); diff --git a/libs/core/schedulers/include/hpx/schedulers/thread_queue.hpp b/libs/core/schedulers/include/hpx/schedulers/thread_queue.hpp index 6861f9a84683..7758770b9306 100644 --- a/libs/core/schedulers/include/hpx/schedulers/thread_queue.hpp +++ b/libs/core/schedulers/include/hpx/schedulers/thread_queue.hpp @@ -457,8 +457,7 @@ namespace hpx::threads::policies { if (delete_all) { - // do not lock mutex while deleting all threads, do it - // piece-wise + // do not lock mutex while deleting all threads, do it piece-wise while (true) { std::unique_lock lk(mtx_, std::try_to_lock); diff --git a/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp b/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp index 86c18a3ba2b4..6a6dbb766d69 100644 --- a/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp +++ b/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp @@ -19,7 +19,6 @@ #include #include #include -#include #if defined(HPX_HAVE_APEX) #include @@ -179,7 +178,6 @@ namespace hpx::threads::detail { idle_collect_rate idle_rate(counters.tfunc_time_, counters.exec_time_); [[maybe_unused]] tfunc_time_wrapper tfunc_time_collector(idle_rate); - HPX_UNUSED(tfunc_time_collector); // spin for some time after queues have become empty bool may_exit = false; @@ -237,8 +235,6 @@ namespace hpx::threads::detail { scheduler.get_next_thread( num_thread, running, thrd, enable_stealing))) { - HPX_UNUSED(tfunc_time_collector); - HPX_ASSERT(get_thread_id_data(thrd)->get_scheduler_base() == &scheduler); @@ -273,7 +269,6 @@ namespace hpx::threads::detail { [[maybe_unused]] tfunc_time_wrapper tfunc_time_collector_inner(idle_rate); - HPX_UNUSED(tfunc_time_collector); // thread returns new required state store the // returned state in the thread @@ -294,7 +289,7 @@ namespace hpx::threads::detail { // and add to aggregate execution time. [[maybe_unused]] exec_time_wrapper exec_time_collector(idle_rate); - HPX_UNUSED(exec_time_collector); + #if defined(HPX_HAVE_APEX) // get the APEX data pointer, in case we are // resuming the thread and have to restore any @@ -470,8 +465,8 @@ namespace hpx::threads::detail { ++idle_loop_count; next_thrd = nullptr; - if (scheduler.wait_or_add_new(num_thread, - running, idle_loop_count, enable_stealing_staged, added, + if (scheduler.wait_or_add_new(num_thread, running, + idle_loop_count, enable_stealing_staged, added, &next_thrd)) { // Clean up terminated threads before trying to exit diff --git a/libs/core/threading/include/hpx/threading/thread.hpp b/libs/core/threading/include/hpx/threading/thread.hpp index cf7c3fac0a4d..77005ea665d8 100644 --- a/libs/core/threading/include/hpx/threading/thread.hpp +++ b/libs/core/threading/include/hpx/threading/thread.hpp @@ -168,11 +168,9 @@ namespace hpx { return threads::invalid_thread_id != id_; } - threads::thread_id_ref_type detach_locked() + void detach_locked() { - threads::thread_id_ref_type id; - std::swap(id, id_); - return id; + id_ = threads::invalid_thread_id; } void start_thread(threads::thread_pool_base* pool, diff --git a/libs/core/threading/src/thread.cpp b/libs/core/threading/src/thread.cpp index 55a72fb0b7fa..7b135e96bb0f 100644 --- a/libs/core/threading/src/thread.cpp +++ b/libs/core/threading/src/thread.cpp @@ -112,6 +112,7 @@ namespace hpx { "run_thread_exit_callbacks", "null thread id encountered"); } threads::run_thread_exit_callbacks(id); + threads::free_thread_exit_callbacks(id); } threads::thread_result_type thread::thread_function_nullary( @@ -213,18 +214,17 @@ namespace hpx { } this_thread::interruption_point(); - // invalidate this object - threads::thread_id_ref_type id = detach_locked(); - // register callback function to be called when thread exits - if (threads::add_thread_exit_callback( - id.noref(), hpx::bind_front(&resume_thread, HPX_MOVE(this_id)))) + if (threads::add_thread_exit_callback(id_.noref(), + hpx::bind_front(&resume_thread, HPX_MOVE(this_id)))) { // wait for thread to be terminated - l.unlock(); + unlock_guard ul(l); this_thread::suspend( threads::thread_schedule_state::suspended, "thread::join"); } + + detach_locked(); // invalidate this object } // extensions diff --git a/libs/core/threading_base/include/hpx/threading_base/register_thread.hpp b/libs/core/threading_base/include/hpx/threading_base/register_thread.hpp index d3f715779875..5fb4df759336 100644 --- a/libs/core/threading_base/include/hpx/threading_base/register_thread.hpp +++ b/libs/core/threading_base/include/hpx/threading_base/register_thread.hpp @@ -43,6 +43,7 @@ namespace hpx::threads { auto* p = get_self_id_data(); p->run_thread_exit_callbacks(); + p->free_thread_exit_callbacks(); return threads::thread_result_type( threads::thread_schedule_state::terminated, @@ -69,6 +70,7 @@ namespace hpx::threads { auto* p = get_self_id_data(); p->run_thread_exit_callbacks(); + p->free_thread_exit_callbacks(); return threads::thread_result_type( threads::thread_schedule_state::terminated, diff --git a/libs/core/threading_base/include/hpx/threading_base/thread_data.hpp b/libs/core/threading_base/include/hpx/threading_base/thread_data.hpp index b4ff494c089d..cb47ff33fabc 100644 --- a/libs/core/threading_base/include/hpx/threading_base/thread_data.hpp +++ b/libs/core/threading_base/include/hpx/threading_base/thread_data.hpp @@ -486,6 +486,7 @@ namespace hpx::threads { bool add_thread_exit_callback(function const& f); void run_thread_exit_callbacks(); + void free_thread_exit_callbacks(); // no need to protect the variables related to scoped children as those // are supposed to be accessed by ourselves only @@ -515,8 +516,7 @@ namespace hpx::threads { void set_last_worker_thread_num( std::size_t last_worker_thread_num) noexcept { - last_worker_thread_num_ = - static_cast(last_worker_thread_num); + last_worker_thread_num_ = last_worker_thread_num; } constexpr std::ptrdiff_t get_stack_size() const noexcept @@ -610,34 +610,6 @@ namespace hpx::threads { private: mutable std::atomic current_state_; - /////////////////////////////////////////////////////////////////////// - thread_priority priority_; - thread_stacksize stacksize_enum_; - - bool requested_interrupt_; - bool enabled_interrupt_; - - enum class exit_func_state - { - none, - ready, - processed - }; - - std::atomic ran_exit_funcs_; - bool const is_stackless_; - - std::uint16_t last_worker_thread_num_; - - // reference to scheduler which created/manages this thread - policies::scheduler_base* scheduler_base_; - void* queue_; - - std::ptrdiff_t stacksize_; - - // Singly linked list (heap-allocated) - std::forward_list> exit_funcs_; - /////////////////////////////////////////////////////////////////////// // Debugging/logging information #ifdef HPX_HAVE_THREAD_DESCRIPTION diff --git a/libs/core/threading_base/include/hpx/threading_base/thread_helpers.hpp b/libs/core/threading_base/include/hpx/threading_base/thread_helpers.hpp index 22cb01f8bd19..82db59ebe895 100644 --- a/libs/core/threading_base/include/hpx/threading_base/thread_helpers.hpp +++ b/libs/core/threading_base/include/hpx/threading_base/thread_helpers.hpp @@ -384,6 +384,9 @@ namespace hpx::threads { HPX_CORE_EXPORT bool add_thread_exit_callback(thread_id_type const& id, hpx::function const& f, error_code& ec = throws); + HPX_CORE_EXPORT void free_thread_exit_callbacks( + thread_id_type const& id, error_code& ec = throws); + /////////////////////////////////////////////////////////////////////////// HPX_CORE_EXPORT std::size_t get_thread_data( thread_id_type const& id, error_code& ec = throws); diff --git a/libs/core/threading_base/src/thread_data.cpp b/libs/core/threading_base/src/thread_data.cpp index d89c7033ebc9..a884fbc33d52 100644 --- a/libs/core/threading_base/src/thread_data.cpp +++ b/libs/core/threading_base/src/thread_data.cpp @@ -52,16 +52,6 @@ namespace hpx::threads { : detail::thread_data_reference_counting(addref) , current_state_(thread_state( init_data.initial_state, thread_restart_state::signaled)) - , priority_(init_data.priority) - , stacksize_enum_(init_data.stacksize) - , requested_interrupt_(false) - , enabled_interrupt_(true) - , ran_exit_funcs_(exit_func_state::none) - , is_stackless_(is_stackless) - , last_worker_thread_num_(std::uint16_t(-1)) - , scheduler_base_(init_data.scheduler_base) - , queue_(queue) - , stacksize_(stacksize) #ifdef HPX_HAVE_THREAD_DESCRIPTION , description_(init_data.description) , lco_description_() @@ -117,13 +107,7 @@ namespace hpx::threads { thread_data::~thread_data() { LTM_(debug).format("thread_data::~thread_data({})", this); - - // Exit functions should have been executed. - HPX_ASSERT(exit_funcs_.empty() || - ran_exit_funcs_.load(std::memory_order_relaxed) == - exit_func_state::none || - ran_exit_funcs_.load(std::memory_order_relaxed) == - exit_func_state::processed); + free_thread_exit_callbacks(); } void thread_data::destroy_thread() @@ -137,55 +121,20 @@ namespace hpx::threads { void thread_data::run_thread_exit_callbacks() { - // when leaving this function the state must be 'processed' - while (true) - { - exit_func_state expected = exit_func_state::ready; - if (ran_exit_funcs_.compare_exchange_strong( - expected, exit_func_state::processed)) - { - // run exit functions only if there are any (state is 'ready') - std::unique_lock l( - spinlock_pool::spinlock_for(this)); - - while (!exit_funcs_.empty()) - { - if (!exit_funcs_.front().empty()) - { - auto f = exit_funcs_.front(); - exit_funcs_.pop_front(); - - hpx::unlock_guard< - std::unique_lock> - ul(l); - f(); - } - else - { - exit_funcs_.pop_front(); - } - } - - // clear all exit functions now as they are not needed anymore - exit_funcs_.clear(); - return; - } - else if (expected == exit_func_state::none) - { - if (ran_exit_funcs_.compare_exchange_strong( - expected, exit_func_state::processed)) - { - return; - } + std::unique_lock l( + spinlock_pool::spinlock_for(this)); - // try again, state was set to ready or processed by now - } - else + while (!exit_funcs_.empty()) + { { - HPX_ASSERT(expected == exit_func_state::processed); - return; + hpx::unlock_guard> + ul(l); + if (!exit_funcs_.front().empty()) + exit_funcs_.front()(); } + exit_funcs_.pop_front(); } + ran_exit_funcs_ = true; } bool thread_data::add_thread_exit_callback(hpx::function const& f) @@ -200,28 +149,20 @@ namespace hpx::threads { return false; } - // don't register any more exit callback if the thread has already - // exited - exit_func_state expected = exit_func_state::none; - if (!ran_exit_funcs_.compare_exchange_strong( - expected, exit_func_state::ready)) - { - // the state was not none (i.e. ready or processed), bail out if it - // was processed - if (expected == exit_func_state::processed) - { - return false; - } - } + exit_funcs_.push_front(f); - HPX_ASSERT(ran_exit_funcs_.load(std::memory_order_relaxed) == - exit_func_state::ready); + return true; + } + void thread_data::free_thread_exit_callbacks() + { std::lock_guard l( spinlock_pool::spinlock_for(this)); - exit_funcs_.push_front(f); - return true; + // Exit functions should have been executed. + HPX_ASSERT(exit_funcs_.empty() || ran_exit_funcs_); + + exit_funcs_.clear(); } bool thread_data::interruption_point(bool throw_on_interrupt) @@ -255,6 +196,8 @@ namespace hpx::threads { "thread_data::rebind_base({}), description({}), phase({}), rebind", this, get_description(), get_thread_phase()); + free_thread_exit_callbacks(); + current_state_.store(thread_state( init_data.initial_state, thread_restart_state::signaled)); @@ -276,7 +219,7 @@ namespace hpx::threads { priority_ = init_data.priority; requested_interrupt_ = false; enabled_interrupt_ = true; - ran_exit_funcs_.store(exit_func_state::none, std::memory_order_relaxed); + ran_exit_funcs_ = false; runs_as_child_.store(init_data.schedulehint.runs_as_child_mode() == hpx::threads::thread_execution_hint::run_as_child, diff --git a/libs/core/threading_base/src/thread_helpers.cpp b/libs/core/threading_base/src/thread_helpers.cpp index 6e3830f65430..ecbd95b0c87d 100644 --- a/libs/core/threading_base/src/thread_helpers.cpp +++ b/libs/core/threading_base/src/thread_helpers.cpp @@ -344,6 +344,22 @@ namespace hpx::threads { return get_thread_id_data(id)->add_thread_exit_callback(f); } + void free_thread_exit_callbacks(thread_id_type const& id, error_code& ec) + { + if (HPX_UNLIKELY(!id)) + { + HPX_THROWS_IF(ec, hpx::error::null_thread_id, + "hpx::threads::add_thread_exit_callback", + "null thread id encountered"); + return; + } + + if (&ec != &throws) + ec = make_success_code(); + + get_thread_id_data(id)->free_thread_exit_callbacks(); + } + /////////////////////////////////////////////////////////////////////////// #ifdef HPX_HAVE_THREAD_FULLBACKTRACE_ON_SUSPENSION char const* get_thread_backtrace(thread_id_type const& id, error_code& ec)