diff --git a/libs/core/threading/include/hpx/threading/thread.hpp b/libs/core/threading/include/hpx/threading/thread.hpp index 77005ea665d8..cf7c3fac0a4d 100644 --- a/libs/core/threading/include/hpx/threading/thread.hpp +++ b/libs/core/threading/include/hpx/threading/thread.hpp @@ -168,9 +168,11 @@ namespace hpx { return threads::invalid_thread_id != id_; } - void detach_locked() + threads::thread_id_ref_type detach_locked() { - id_ = threads::invalid_thread_id; + threads::thread_id_ref_type id; + std::swap(id, id_); + return id; } void start_thread(threads::thread_pool_base* pool, diff --git a/libs/core/threading/src/thread.cpp b/libs/core/threading/src/thread.cpp index 7b135e96bb0f..55a72fb0b7fa 100644 --- a/libs/core/threading/src/thread.cpp +++ b/libs/core/threading/src/thread.cpp @@ -112,7 +112,6 @@ namespace hpx { "run_thread_exit_callbacks", "null thread id encountered"); } threads::run_thread_exit_callbacks(id); - threads::free_thread_exit_callbacks(id); } threads::thread_result_type thread::thread_function_nullary( @@ -214,17 +213,18 @@ namespace hpx { } this_thread::interruption_point(); + // invalidate this object + threads::thread_id_ref_type id = detach_locked(); + // register callback function to be called when thread exits - if (threads::add_thread_exit_callback(id_.noref(), - hpx::bind_front(&resume_thread, HPX_MOVE(this_id)))) + if (threads::add_thread_exit_callback( + id.noref(), hpx::bind_front(&resume_thread, HPX_MOVE(this_id)))) { // wait for thread to be terminated - unlock_guard ul(l); + l.unlock(); this_thread::suspend( threads::thread_schedule_state::suspended, "thread::join"); } - - detach_locked(); // invalidate this object } // extensions diff --git a/libs/core/threading_base/include/hpx/threading_base/thread_data.hpp b/libs/core/threading_base/include/hpx/threading_base/thread_data.hpp index cb47ff33fabc..b4ff494c089d 100644 --- a/libs/core/threading_base/include/hpx/threading_base/thread_data.hpp +++ b/libs/core/threading_base/include/hpx/threading_base/thread_data.hpp @@ -486,7 +486,6 @@ namespace hpx::threads { bool add_thread_exit_callback(function const& f); void run_thread_exit_callbacks(); - void free_thread_exit_callbacks(); // no need to protect the variables related to scoped children as those // are supposed to be accessed by ourselves only @@ -516,7 +515,8 @@ namespace hpx::threads { void set_last_worker_thread_num( std::size_t last_worker_thread_num) noexcept { - last_worker_thread_num_ = last_worker_thread_num; + last_worker_thread_num_ = + static_cast(last_worker_thread_num); } constexpr std::ptrdiff_t get_stack_size() const noexcept @@ -610,6 +610,34 @@ namespace hpx::threads { private: mutable std::atomic current_state_; + /////////////////////////////////////////////////////////////////////// + thread_priority priority_; + thread_stacksize stacksize_enum_; + + bool requested_interrupt_; + bool enabled_interrupt_; + + enum class exit_func_state + { + none, + ready, + processed + }; + + std::atomic ran_exit_funcs_; + bool const is_stackless_; + + std::uint16_t last_worker_thread_num_; + + // reference to scheduler which created/manages this thread + policies::scheduler_base* scheduler_base_; + void* queue_; + + std::ptrdiff_t stacksize_; + + // Singly linked list (heap-allocated) + std::forward_list> exit_funcs_; + /////////////////////////////////////////////////////////////////////// // Debugging/logging information #ifdef HPX_HAVE_THREAD_DESCRIPTION diff --git a/libs/core/threading_base/src/thread_data.cpp b/libs/core/threading_base/src/thread_data.cpp index a884fbc33d52..d89c7033ebc9 100644 --- a/libs/core/threading_base/src/thread_data.cpp +++ b/libs/core/threading_base/src/thread_data.cpp @@ -52,6 +52,16 @@ namespace hpx::threads { : detail::thread_data_reference_counting(addref) , current_state_(thread_state( init_data.initial_state, thread_restart_state::signaled)) + , priority_(init_data.priority) + , stacksize_enum_(init_data.stacksize) + , requested_interrupt_(false) + , enabled_interrupt_(true) + , ran_exit_funcs_(exit_func_state::none) + , is_stackless_(is_stackless) + , last_worker_thread_num_(std::uint16_t(-1)) + , scheduler_base_(init_data.scheduler_base) + , queue_(queue) + , stacksize_(stacksize) #ifdef HPX_HAVE_THREAD_DESCRIPTION , description_(init_data.description) , lco_description_() @@ -107,7 +117,13 @@ namespace hpx::threads { thread_data::~thread_data() { LTM_(debug).format("thread_data::~thread_data({})", this); - free_thread_exit_callbacks(); + + // Exit functions should have been executed. + HPX_ASSERT(exit_funcs_.empty() || + ran_exit_funcs_.load(std::memory_order_relaxed) == + exit_func_state::none || + ran_exit_funcs_.load(std::memory_order_relaxed) == + exit_func_state::processed); } void thread_data::destroy_thread() @@ -121,20 +137,55 @@ namespace hpx::threads { void thread_data::run_thread_exit_callbacks() { - std::unique_lock l( - spinlock_pool::spinlock_for(this)); - - while (!exit_funcs_.empty()) + // when leaving this function the state must be 'processed' + while (true) { + exit_func_state expected = exit_func_state::ready; + if (ran_exit_funcs_.compare_exchange_strong( + expected, exit_func_state::processed)) { - hpx::unlock_guard> - ul(l); - if (!exit_funcs_.front().empty()) - exit_funcs_.front()(); + // run exit functions only if there are any (state is 'ready') + std::unique_lock l( + spinlock_pool::spinlock_for(this)); + + while (!exit_funcs_.empty()) + { + if (!exit_funcs_.front().empty()) + { + auto f = exit_funcs_.front(); + exit_funcs_.pop_front(); + + hpx::unlock_guard< + std::unique_lock> + ul(l); + f(); + } + else + { + exit_funcs_.pop_front(); + } + } + + // clear all exit functions now as they are not needed anymore + exit_funcs_.clear(); + return; + } + else if (expected == exit_func_state::none) + { + if (ran_exit_funcs_.compare_exchange_strong( + expected, exit_func_state::processed)) + { + return; + } + + // try again, state was set to ready or processed by now + } + else + { + HPX_ASSERT(expected == exit_func_state::processed); + return; } - exit_funcs_.pop_front(); } - ran_exit_funcs_ = true; } bool thread_data::add_thread_exit_callback(hpx::function const& f) @@ -149,20 +200,28 @@ namespace hpx::threads { return false; } - exit_funcs_.push_front(f); + // don't register any more exit callback if the thread has already + // exited + exit_func_state expected = exit_func_state::none; + if (!ran_exit_funcs_.compare_exchange_strong( + expected, exit_func_state::ready)) + { + // the state was not none (i.e. ready or processed), bail out if it + // was processed + if (expected == exit_func_state::processed) + { + return false; + } + } - return true; - } + HPX_ASSERT(ran_exit_funcs_.load(std::memory_order_relaxed) == + exit_func_state::ready); - void thread_data::free_thread_exit_callbacks() - { std::lock_guard l( spinlock_pool::spinlock_for(this)); - // Exit functions should have been executed. - HPX_ASSERT(exit_funcs_.empty() || ran_exit_funcs_); - - exit_funcs_.clear(); + exit_funcs_.push_front(f); + return true; } bool thread_data::interruption_point(bool throw_on_interrupt) @@ -196,8 +255,6 @@ namespace hpx::threads { "thread_data::rebind_base({}), description({}), phase({}), rebind", this, get_description(), get_thread_phase()); - free_thread_exit_callbacks(); - current_state_.store(thread_state( init_data.initial_state, thread_restart_state::signaled)); @@ -219,7 +276,7 @@ namespace hpx::threads { priority_ = init_data.priority; requested_interrupt_ = false; enabled_interrupt_ = true; - ran_exit_funcs_ = false; + ran_exit_funcs_.store(exit_func_state::none, std::memory_order_relaxed); runs_as_child_.store(init_data.schedulehint.runs_as_child_mode() == hpx::threads::thread_execution_hint::run_as_child,