Skip to content

Commit

Permalink
Fixing race during destruction of hpx::thread
Browse files Browse the repository at this point in the history
  • Loading branch information
hkaiser committed Aug 5, 2023
1 parent 243de79 commit e8124b3
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 33 deletions.
6 changes: 4 additions & 2 deletions libs/core/threading/include/hpx/threading/thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,11 @@ namespace hpx {
return threads::invalid_thread_id != id_;
}

void detach_locked()
threads::thread_id_ref_type detach_locked()
{
id_ = threads::invalid_thread_id;
threads::thread_id_ref_type id;
std::swap(id, id_);
return id;
}

void start_thread(threads::thread_pool_base* pool,
Expand Down
12 changes: 6 additions & 6 deletions libs/core/threading/src/thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ namespace hpx {
"run_thread_exit_callbacks", "null thread id encountered");
}
threads::run_thread_exit_callbacks(id);
threads::free_thread_exit_callbacks(id);
}

threads::thread_result_type thread::thread_function_nullary(
Expand Down Expand Up @@ -214,17 +213,18 @@ namespace hpx {
}
this_thread::interruption_point();

// invalidate this object
threads::thread_id_ref_type id = detach_locked();

// register callback function to be called when thread exits
if (threads::add_thread_exit_callback(id_.noref(),
hpx::bind_front(&resume_thread, HPX_MOVE(this_id))))
if (threads::add_thread_exit_callback(
id.noref(), hpx::bind_front(&resume_thread, HPX_MOVE(this_id))))
{
// wait for thread to be terminated
unlock_guard ul(l);
l.unlock();
this_thread::suspend(
threads::thread_schedule_state::suspended, "thread::join");
}

detach_locked(); // invalidate this object
}

// extensions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,6 @@ namespace hpx::threads {

bool add_thread_exit_callback(function<void()> const& f);
void run_thread_exit_callbacks();
void free_thread_exit_callbacks();

// no need to protect the variables related to scoped children as those
// are supposed to be accessed by ourselves only
Expand Down Expand Up @@ -516,7 +515,8 @@ namespace hpx::threads {
void set_last_worker_thread_num(
std::size_t last_worker_thread_num) noexcept
{
last_worker_thread_num_ = last_worker_thread_num;
last_worker_thread_num_ =
static_cast<std::uint16_t>(last_worker_thread_num);
}

constexpr std::ptrdiff_t get_stack_size() const noexcept
Expand Down Expand Up @@ -610,6 +610,34 @@ namespace hpx::threads {
private:
mutable std::atomic<thread_state> current_state_;

///////////////////////////////////////////////////////////////////////
thread_priority priority_;
thread_stacksize stacksize_enum_;

bool requested_interrupt_;
bool enabled_interrupt_;

enum class exit_func_state
{
none,
ready,
processed
};

std::atomic<exit_func_state> ran_exit_funcs_;
bool const is_stackless_;

std::uint16_t last_worker_thread_num_;

// reference to scheduler which created/manages this thread
policies::scheduler_base* scheduler_base_;
void* queue_;

std::ptrdiff_t stacksize_;

// Singly linked list (heap-allocated)
std::forward_list<hpx::function<void()>> exit_funcs_;

///////////////////////////////////////////////////////////////////////
// Debugging/logging information
#ifdef HPX_HAVE_THREAD_DESCRIPTION
Expand Down
103 changes: 80 additions & 23 deletions libs/core/threading_base/src/thread_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ namespace hpx::threads {
: detail::thread_data_reference_counting(addref)
, current_state_(thread_state(
init_data.initial_state, thread_restart_state::signaled))
, priority_(init_data.priority)
, stacksize_enum_(init_data.stacksize)
, requested_interrupt_(false)
, enabled_interrupt_(true)
, ran_exit_funcs_(exit_func_state::none)
, is_stackless_(is_stackless)
, last_worker_thread_num_(std::uint16_t(-1))
, scheduler_base_(init_data.scheduler_base)
, queue_(queue)
, stacksize_(stacksize)
#ifdef HPX_HAVE_THREAD_DESCRIPTION
, description_(init_data.description)
, lco_description_()
Expand Down Expand Up @@ -107,7 +117,13 @@ namespace hpx::threads {
thread_data::~thread_data()
{
LTM_(debug).format("thread_data::~thread_data({})", this);
free_thread_exit_callbacks();

// Exit functions should have been executed.
HPX_ASSERT(exit_funcs_.empty() ||
ran_exit_funcs_.load(std::memory_order_relaxed) ==
exit_func_state::none ||
ran_exit_funcs_.load(std::memory_order_relaxed) ==
exit_func_state::processed);
}

void thread_data::destroy_thread()
Expand All @@ -121,20 +137,55 @@ namespace hpx::threads {

void thread_data::run_thread_exit_callbacks()
{
std::unique_lock<hpx::util::detail::spinlock> l(
spinlock_pool::spinlock_for(this));

while (!exit_funcs_.empty())
// when leaving this function the state must be 'processed'
while (true)
{
exit_func_state expected = exit_func_state::ready;
if (ran_exit_funcs_.compare_exchange_strong(
expected, exit_func_state::processed))
{
hpx::unlock_guard<std::unique_lock<hpx::util::detail::spinlock>>
ul(l);
if (!exit_funcs_.front().empty())
exit_funcs_.front()();
// run exit functions only if there are any (state is 'ready')
std::unique_lock<hpx::util::detail::spinlock> l(
spinlock_pool::spinlock_for(this));

while (!exit_funcs_.empty())
{
if (!exit_funcs_.front().empty())
{
auto f = exit_funcs_.front();
exit_funcs_.pop_front();

hpx::unlock_guard<
std::unique_lock<hpx::util::detail::spinlock>>
ul(l);
f();
}
else
{
exit_funcs_.pop_front();
}
}

// clear all exit functions now as they are not needed anymore
exit_funcs_.clear();
return;
}
else if (expected == exit_func_state::none)
{
if (ran_exit_funcs_.compare_exchange_strong(
expected, exit_func_state::processed))
{
return;
}

// try again, state was set to ready or processed by now
}
else
{
HPX_ASSERT(expected == exit_func_state::processed);
return;
}
exit_funcs_.pop_front();
}
ran_exit_funcs_ = true;
}

bool thread_data::add_thread_exit_callback(hpx::function<void()> const& f)
Expand All @@ -149,20 +200,28 @@ namespace hpx::threads {
return false;
}

exit_funcs_.push_front(f);
// don't register any more exit callback if the thread has already
// exited
exit_func_state expected = exit_func_state::none;
if (!ran_exit_funcs_.compare_exchange_strong(
expected, exit_func_state::ready))
{
// the state was not none (i.e. ready or processed), bail out if it
// was processed
if (expected == exit_func_state::processed)
{
return false;
}
}

return true;
}
HPX_ASSERT(ran_exit_funcs_.load(std::memory_order_relaxed) ==
exit_func_state::ready);

void thread_data::free_thread_exit_callbacks()
{
std::lock_guard<hpx::util::detail::spinlock> l(
spinlock_pool::spinlock_for(this));

// Exit functions should have been executed.
HPX_ASSERT(exit_funcs_.empty() || ran_exit_funcs_);

exit_funcs_.clear();
exit_funcs_.push_front(f);
return true;
}

bool thread_data::interruption_point(bool throw_on_interrupt)
Expand Down Expand Up @@ -196,8 +255,6 @@ namespace hpx::threads {
"thread_data::rebind_base({}), description({}), phase({}), rebind",
this, get_description(), get_thread_phase());

free_thread_exit_callbacks();

current_state_.store(thread_state(
init_data.initial_state, thread_restart_state::signaled));

Expand All @@ -219,7 +276,7 @@ namespace hpx::threads {
priority_ = init_data.priority;
requested_interrupt_ = false;
enabled_interrupt_ = true;
ran_exit_funcs_ = false;
ran_exit_funcs_.store(exit_func_state::none, std::memory_order_relaxed);

runs_as_child_.store(init_data.schedulehint.runs_as_child_mode() ==
hpx::threads::thread_execution_hint::run_as_child,
Expand Down

0 comments on commit e8124b3

Please sign in to comment.