Skip to content

Commit

Permalink
Fixing merge conflicts, revert changes to thread exit callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
hkaiser committed Aug 5, 2023
1 parent e8124b3 commit f910674
Show file tree
Hide file tree
Showing 11 changed files with 106 additions and 162 deletions.
2 changes: 2 additions & 0 deletions libs/core/schedulers/include/hpx/modules/schedulers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#pragma once

#include <hpx/config.hpp>

#include <hpx/schedulers/background_scheduler.hpp>
#include <hpx/schedulers/local_priority_queue_scheduler.hpp>
#include <hpx/schedulers/local_queue_scheduler.hpp>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <mutex>
#include <random>
#include <string>
#include <string_view>
#include <type_traits>
#include <utility>
#include <vector>
Expand All @@ -46,7 +47,7 @@
// different working principle if compared to the classic work-stealing. Instead
// of actively pulling work from the work queues of neighboring cores it relies
// on a push model. Cores that run out of work post steal requests that are
// handled by cores that have work awailable by actively sending tasks to the
// handled by cores that have work available by actively sending tasks to the
// requesting core.
//
// When a worker runs out of tasks, it becomes a thief by sending steal requests
Expand Down Expand Up @@ -115,18 +116,18 @@ namespace hpx::threads::policies {
using thread_queue_type = thread_queue<Mutex, PendingQueuing,
StagedQueuing, TerminatedQueuing>;

public:
struct init_parameter
{
init_parameter(std::size_t num_queues,
detail::affinity_data const& affinity_data,
std::size_t num_high_priority_queues = std::size_t(-1),
std::size_t num_high_priority_queues = static_cast<std::size_t>(
-1),
thread_queue_init_parameters const& thread_queue_init =
thread_queue_init_parameters{},
char const* description = "local_workrequesting_scheduler")
: num_queues_(num_queues)
, num_high_priority_queues_(
num_high_priority_queues == std::size_t(-1) ?
num_high_priority_queues == static_cast<std::size_t>(-1) ?
num_queues :
num_high_priority_queues)
, thread_queue_init_(thread_queue_init)
Expand Down Expand Up @@ -159,7 +160,8 @@ namespace hpx::threads::policies {
struct task_data
{
explicit HPX_HOST_DEVICE_CONSTEXPR task_data(
std::uint16_t num_thread = std::uint16_t(-1)) noexcept
std::uint16_t num_thread = static_cast<std::uint16_t>(
-1)) noexcept
: num_thread_(num_thread)
{
}
Expand Down Expand Up @@ -188,8 +190,8 @@ namespace hpx::threads::policies {
{
}

steal_request(std::size_t num_thread, task_channel* channel,
mask_cref_type victims, bool idle, bool stealhalf)
steal_request(std::size_t const num_thread, task_channel* channel,
mask_cref_type victims, bool idle, bool const stealhalf)
: channel_(channel)
, victims_(victims)
, num_thread_(static_cast<std::uint16_t>(num_thread))
Expand Down Expand Up @@ -221,6 +223,11 @@ namespace hpx::threads::policies {
{
}

scheduler_data(scheduler_data const&) = delete;
scheduler_data(scheduler_data&&) = delete;
scheduler_data& operator=(scheduler_data const&) = delete;
scheduler_data& operator=(scheduler_data&&) = delete;

~scheduler_data()
{
delete queue_;
Expand Down Expand Up @@ -330,9 +337,18 @@ namespace hpx::threads::policies {
}
}

local_workrequesting_scheduler(
local_workrequesting_scheduler const&) = delete;
local_workrequesting_scheduler(
local_workrequesting_scheduler&&) = delete;
local_workrequesting_scheduler& operator=(
local_workrequesting_scheduler const&) = delete;
local_workrequesting_scheduler& operator=(
local_workrequesting_scheduler&&) = delete;

~local_workrequesting_scheduler() override = default;

static std::string get_scheduler_name()
static std::string_view get_scheduler_name()
{
return "local_workrequesting_scheduler";
}
Expand Down Expand Up @@ -659,9 +675,9 @@ namespace hpx::threads::policies {
std::size_t num_thread =
data.schedulehint.mode == thread_schedule_hint_mode::thread ?
data.schedulehint.hint :
std::size_t(-1);
static_cast<std::size_t>(-1);

if (std::size_t(-1) == num_thread)
if (static_cast<std::size_t>(-1) == num_thread)
{
num_thread = curr_queue_++ % num_queues_;
}
Expand Down Expand Up @@ -889,7 +905,7 @@ namespace hpx::threads::policies {
// Return the next thread to be executed, return false if none is
// available
bool get_next_thread(std::size_t num_thread, bool running,
thread_id_ref_type& thrd, bool enable_stealing) override
thread_id_ref_type& thrd, bool enable_stealing)
{
HPX_ASSERT(num_thread < num_queues_);

Expand Down Expand Up @@ -968,9 +984,9 @@ namespace hpx::threads::policies {
void schedule_thread(thread_id_ref_type thrd,
threads::thread_schedule_hint schedulehint,
bool allow_fallback = false,
thread_priority priority = thread_priority::normal) override
thread_priority priority = thread_priority::default_) override
{
std::size_t num_thread = std::size_t(-1);
std::size_t num_thread = static_cast<std::size_t>(-1);
if (schedulehint.mode == thread_schedule_hint_mode::thread)
{
num_thread = schedulehint.hint;
Expand All @@ -980,7 +996,7 @@ namespace hpx::threads::policies {
allow_fallback = false;
}

if (std::size_t(-1) == num_thread)
if (static_cast<std::size_t>(-1) == num_thread)
{
num_thread = curr_queue_++ % num_queues_;
}
Expand Down Expand Up @@ -1038,9 +1054,9 @@ namespace hpx::threads::policies {
void schedule_thread_last(thread_id_ref_type thrd,
threads::thread_schedule_hint schedulehint,
bool allow_fallback = false,
thread_priority priority = thread_priority::normal) override
thread_priority priority = thread_priority::default_) override
{
std::size_t num_thread = std::size_t(-1);
std::size_t num_thread = static_cast<std::size_t>(-1);
if (schedulehint.mode == thread_schedule_hint_mode::thread)
{
num_thread = schedulehint.hint;
Expand All @@ -1050,7 +1066,7 @@ namespace hpx::threads::policies {
allow_fallback = false;
}

if (std::size_t(-1) == num_thread)
if (static_cast<std::size_t>(-1) == num_thread)
{
num_thread = curr_queue_++ % num_queues_;
}
Expand Down Expand Up @@ -1109,12 +1125,11 @@ namespace hpx::threads::policies {
///////////////////////////////////////////////////////////////////////
// This returns the current length of the queues (work items and new
// items)
std::int64_t get_queue_length(
std::size_t num_thread = std::size_t(-1)) const override
std::int64_t get_queue_length(std::size_t num_thread) const override
{
// Return queue length of one specific queue.
std::int64_t count = 0;
if (std::size_t(-1) != num_thread)
if (static_cast<std::size_t>(-1) != num_thread)
{
HPX_ASSERT(num_thread < num_queues_);
auto const& d = data_[num_thread].data_;
Expand Down Expand Up @@ -1150,12 +1165,12 @@ namespace hpx::threads::policies {
std::int64_t get_thread_count(
thread_schedule_state state = thread_schedule_state::unknown,
thread_priority priority = thread_priority::default_,
std::size_t num_thread = std::size_t(-1),
std::size_t num_thread = static_cast<std::size_t>(-1),
bool /* reset */ = false) const override
{
// Return thread count of one specific queue.
std::int64_t count = 0;
if (std::size_t(-1) != num_thread)
if (static_cast<std::size_t>(-1) != num_thread)
{
HPX_ASSERT(num_thread < num_queues_);

Expand Down Expand Up @@ -1207,7 +1222,6 @@ namespace hpx::threads::policies {
"local_workrequesting_scheduler::get_thread_count",
"unknown thread priority value "
"(thread_priority::unknown)");
return 0;
}
}
return 0;
Expand Down Expand Up @@ -1275,7 +1289,6 @@ namespace hpx::threads::policies {
"local_workrequesting_scheduler::get_thread_count",
"unknown thread priority value "
"(thread_priority::unknown)");
return 0;
}
}
return count;
Expand Down Expand Up @@ -1444,7 +1457,7 @@ namespace hpx::threads::policies {
// generate at most 3 random numbers before resorting to more
// expensive algorithm
std::uniform_int_distribution<std::int16_t> uniform(
0, std::int16_t(num_queues_ - 1));
0, static_cast<std::int16_t>(num_queues_ - 1));

int attempts = 0;
do
Expand All @@ -1461,8 +1474,9 @@ namespace hpx::threads::policies {

// to avoid infinite trials we randomly select one of the possible
// victims
std::uniform_int_distribution<std::int16_t> uniform(
0, std::int16_t(num_queues_ - count(req.victims_) - 1));
std::uniform_int_distribution<std::int16_t> uniform(0,
static_cast<std::int16_t>(
num_queues_ - count(req.victims_) - 1));

// generate one more random number
std::size_t selected_victim = uniform(gen_);
Expand All @@ -1489,7 +1503,7 @@ namespace hpx::threads::policies {
std::size_t next_victim([[maybe_unused]] scheduler_data& d,
steal_request const& req) noexcept
{
std::size_t victim = std::size_t(-1);
std::size_t victim;

// return thief if max steal attempts has been reached or no more
// cores are available for stealing
Expand All @@ -1516,7 +1530,7 @@ namespace hpx::threads::policies {
}

// couldn't find victim, return steal request to thief
if (victim == std::size_t(-1))
if (victim == static_cast<std::size_t>(-1))
{
victim = req.num_thread_;
HPX_ASSERT(victim != d.num_thread_);
Expand Down Expand Up @@ -1650,7 +1664,7 @@ namespace hpx::threads::policies {
bool wait_or_add_new(std::size_t num_thread, bool running,
[[maybe_unused]] std::int64_t& idle_loop_count,
bool enable_stealing, std::size_t& added,
thread_id_ref_type* next_thrd = nullptr) override
thread_id_ref_type* next_thrd = nullptr)
{
HPX_ASSERT(num_thread < num_queues_);

Expand Down Expand Up @@ -1755,11 +1769,11 @@ namespace hpx::threads::policies {
low_priority_queue_.on_start_thread(num_thread);
}

std::size_t num_threads = num_queues_;
std::size_t const num_threads = num_queues_;
//auto const& topo = create_topology();

// Initially set all bits, code below resets the bits corresponding
// to cores that can serve as a vistim for the current core. A set
// to cores that can serve as a victim for the current core. A set
// bit in this mask means 'do not steal from this core'.
resize(d.victims_, num_threads);
reset(d.victims_);
Expand Down
3 changes: 1 addition & 2 deletions libs/core/schedulers/include/hpx/schedulers/thread_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,7 @@ namespace hpx::threads::policies {

if (delete_all)
{
// do not lock mutex while deleting all threads, do it
// piece-wise
// do not lock mutex while deleting all threads, do it piece-wise
while (true)
{
std::unique_lock<mutex_type> lk(mtx_, std::try_to_lock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <hpx/threading_base/scheduler_base.hpp>
#include <hpx/threading_base/scheduler_state.hpp>
#include <hpx/threading_base/thread_data.hpp>
#include <hpx/type_support/unused.hpp>

#if defined(HPX_HAVE_APEX)
#include <hpx/threading_base/external_timer.hpp>
Expand Down Expand Up @@ -179,7 +178,6 @@ namespace hpx::threads::detail {

idle_collect_rate idle_rate(counters.tfunc_time_, counters.exec_time_);
[[maybe_unused]] tfunc_time_wrapper tfunc_time_collector(idle_rate);
HPX_UNUSED(tfunc_time_collector);

// spin for some time after queues have become empty
bool may_exit = false;
Expand Down Expand Up @@ -237,8 +235,6 @@ namespace hpx::threads::detail {
scheduler.get_next_thread(
num_thread, running, thrd, enable_stealing)))
{
HPX_UNUSED(tfunc_time_collector);

HPX_ASSERT(get_thread_id_data(thrd)->get_scheduler_base() ==
&scheduler);

Expand Down Expand Up @@ -273,7 +269,6 @@ namespace hpx::threads::detail {

[[maybe_unused]] tfunc_time_wrapper
tfunc_time_collector_inner(idle_rate);
HPX_UNUSED(tfunc_time_collector);

// thread returns new required state store the
// returned state in the thread
Expand All @@ -294,7 +289,7 @@ namespace hpx::threads::detail {
// and add to aggregate execution time.
[[maybe_unused]] exec_time_wrapper
exec_time_collector(idle_rate);
HPX_UNUSED(exec_time_collector);

#if defined(HPX_HAVE_APEX)
// get the APEX data pointer, in case we are
// resuming the thread and have to restore any
Expand Down Expand Up @@ -470,8 +465,8 @@ namespace hpx::threads::detail {
++idle_loop_count;

next_thrd = nullptr;
if (scheduler.wait_or_add_new(num_thread,
running, idle_loop_count, enable_stealing_staged, added,
if (scheduler.wait_or_add_new(num_thread, running,
idle_loop_count, enable_stealing_staged, added,
&next_thrd))
{
// Clean up terminated threads before trying to exit
Expand Down
6 changes: 2 additions & 4 deletions libs/core/threading/include/hpx/threading/thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,9 @@ namespace hpx {
return threads::invalid_thread_id != id_;
}

threads::thread_id_ref_type detach_locked()
void detach_locked()
{
threads::thread_id_ref_type id;
std::swap(id, id_);
return id;
id_ = threads::invalid_thread_id;
}

void start_thread(threads::thread_pool_base* pool,
Expand Down
12 changes: 6 additions & 6 deletions libs/core/threading/src/thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ namespace hpx {
"run_thread_exit_callbacks", "null thread id encountered");
}
threads::run_thread_exit_callbacks(id);
threads::free_thread_exit_callbacks(id);
}

threads::thread_result_type thread::thread_function_nullary(
Expand Down Expand Up @@ -213,18 +214,17 @@ namespace hpx {
}
this_thread::interruption_point();

// invalidate this object
threads::thread_id_ref_type id = detach_locked();

// register callback function to be called when thread exits
if (threads::add_thread_exit_callback(
id.noref(), hpx::bind_front(&resume_thread, HPX_MOVE(this_id))))
if (threads::add_thread_exit_callback(id_.noref(),
hpx::bind_front(&resume_thread, HPX_MOVE(this_id))))
{
// wait for thread to be terminated
l.unlock();
unlock_guard ul(l);
this_thread::suspend(
threads::thread_schedule_state::suspended, "thread::join");
}

detach_locked(); // invalidate this object
}

// extensions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ namespace hpx::threads {
auto* p = get_self_id_data();

p->run_thread_exit_callbacks();
p->free_thread_exit_callbacks();

return threads::thread_result_type(
threads::thread_schedule_state::terminated,
Expand All @@ -69,6 +70,7 @@ namespace hpx::threads {
auto* p = get_self_id_data();

p->run_thread_exit_callbacks();
p->free_thread_exit_callbacks();

return threads::thread_result_type(
threads::thread_schedule_state::terminated,
Expand Down
Loading

0 comments on commit f910674

Please sign in to comment.