Skip to content

Commit

Permalink
Improve error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
hkaiser committed Jul 25, 2023
1 parent 70242db commit f6334d3
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 29 deletions.
6 changes: 3 additions & 3 deletions libs/core/init_runtime_local/src/init_runtime_local.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,14 +357,14 @@ namespace hpx {
}

// non-blocking version
start(*rt, cfg.hpx_main_f_, cfg.vm_, HPX_MOVE(startup),
HPX_MOVE(shutdown));
int const result = start(*rt, cfg.hpx_main_f_, cfg.vm_,
HPX_MOVE(startup), HPX_MOVE(shutdown));

// pointer to runtime is stored in TLS
hpx::runtime* p = rt.release();
(void) p;

return 0;
return result;
}

////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,29 @@ void test_scheduler(
hpx::local::init_params init_args;

init_args.cfg = {"hpx.os_threads=" +
std::to_string(((std::min)(std::size_t(4),
std::size_t(hpx::threads::hardware_concurrency()))))};
std::to_string(((std::min)(static_cast<std::size_t>(4),
static_cast<std::size_t>(hpx::threads::hardware_concurrency()))))};
init_args.rp_callback = [scheduler](auto& rp,
hpx::program_options::variables_map const&) {
rp.create_thread_pool("default", scheduler);
};

hpx::local::start(nullptr, argc, argv, init_args);
HPX_TEST(hpx::local::start(nullptr, argc, argv, init_args));

hpx::threads::thread_pool_base& default_pool =
hpx::resource::get_thread_pool("default");
std::size_t const default_pool_threads =
hpx::resource::get_num_threads("default");

hpx::chrono::high_resolution_timer t;
hpx::chrono::high_resolution_timer const t;

while (t.elapsed() < 2)
{
std::atomic count_tasks = default_pool_threads * 10000;

for (std::size_t i = 0; i < default_pool_threads * 10000; ++i)
{
hpx::post([]() {});
hpx::post([&]() { --count_tasks; });
}

bool suspended = false;
Expand All @@ -72,6 +74,12 @@ void test_scheduler(
{
std::this_thread::yield();
}

// wait for tasks finish running
while (count_tasks.load() != 0)
{
std::this_thread::yield();
}
}

hpx::post([]() { hpx::local::finalize(); });
Expand All @@ -81,7 +89,7 @@ void test_scheduler(

int main(int argc, char* argv[])
{
std::vector<hpx::resource::scheduling_policy> schedulers = {
std::vector<hpx::resource::scheduling_policy> const schedulers = {
hpx::resource::scheduling_policy::local,
hpx::resource::scheduling_policy::local_priority_fifo,
#if defined(HPX_HAVE_CXX11_STD_ATOMIC_128BIT)
Expand Down
21 changes: 11 additions & 10 deletions libs/core/runtime_local/src/runtime_local.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1424,7 +1424,6 @@ namespace hpx {
// see: http://connect.microsoft.com/VisualStudio/feedback/ViewFeedback.aspx?FeedbackID=100319
_isatty(0);
#endif
// {{{ early startup code - local

// initialize instrumentation system
#ifdef HPX_HAVE_APEX
Expand All @@ -1448,9 +1447,13 @@ namespace hpx {
"I/O service pool";
#endif
// start the thread manager
thread_manager_->run();
if (!thread_manager_->run())
{
std::cerr << "runtime::start: failed to start threadmanager\n";
return -1;
}

lbt_ << "(1st stage) runtime::start: started threadmanager";
// }}}

// {{{ launch main
// register the given main function with the thread manager
Expand All @@ -1473,13 +1476,11 @@ namespace hpx {
{
return wait(); // wait for the shutdown_action to be executed
}
else
{
// wait for at least hpx::state::running
util::yield_while(
[this]() { return get_state() < hpx::state::running; },
"runtime::start");
}

// wait for at least hpx::state::running
util::yield_while(
[this]() { return get_state() < hpx::state::running; },
"runtime::start");

return 0; // return zero as we don't know the outcome of hpx_main yet
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,24 @@ namespace hpx::threads::detail {
if (!threads_.empty())
{
// wait for all work to be done before requesting threads to shut
// down
// down, but only if all threads were successfully initialized
if (blocking)
{
wait();
bool must_wait = true;
for (const auto& thread : threads_)
{
// skip this if already stopped
if (!thread.joinable())
{
must_wait = false;
break;
}
}

if (must_wait)
{
wait();
}
}

// wake up if suspended
Expand Down Expand Up @@ -329,9 +343,9 @@ namespace hpx::threads::detail {
// get_pu_mask expects index according to ordering of masks
// in affinity_data::affinity_masks_
// which is in order of occupied PU
LTM_(info).format(
"run: {} create OS thread {}: will run on processing units "
"within this mask: {}",
LTM_(info).format("run: {} create OS thread {}: will run "
"on processing units "
"within this mask: {}",
id_.name(), global_thread_num,
hpx::threads::to_string(mask));

Expand All @@ -352,8 +366,7 @@ namespace hpx::threads::detail {
"run: {} failed with: {}", id_.name(), e.what());

// trigger the barrier
pool_threads -= (thread_num + 1);
while (pool_threads-- != 0)
while (thread_num-- != 0)
startup->wait();

stop_locked(l);
Expand Down Expand Up @@ -1908,10 +1921,10 @@ namespace hpx::threads::detail {

std::atomic<hpx::state>& state =
sched_->Scheduler::get_state(virt_core);
hpx::state oldstate = state.exchange(hpx::state::initialized);
[[maybe_unused]] hpx::state const oldstate =
state.exchange(hpx::state::initialized);
HPX_ASSERT(oldstate == hpx::state::stopped ||
oldstate == hpx::state::initialized);
HPX_UNUSED(oldstate);

threads_[virt_core] = std::thread(&scheduled_thread_pool::thread_func,
this, virt_core, thread_num, HPX_MOVE(startup));
Expand Down Expand Up @@ -1941,7 +1954,7 @@ namespace hpx::threads::detail {
sched_->Scheduler::get_state(virt_core);

// inform the scheduler to stop the virtual core
hpx::state oldstate = state.exchange(hpx::state::stopping);
hpx::state const oldstate = state.exchange(hpx::state::stopping);

if (oldstate > hpx::state::stopping)
{
Expand Down

0 comments on commit f6334d3

Please sign in to comment.