diff --git a/libs/core/init_runtime_local/src/init_runtime_local.cpp b/libs/core/init_runtime_local/src/init_runtime_local.cpp index 3ddcd74bf3b1..14d816e7e5b0 100644 --- a/libs/core/init_runtime_local/src/init_runtime_local.cpp +++ b/libs/core/init_runtime_local/src/init_runtime_local.cpp @@ -357,14 +357,14 @@ namespace hpx { } // non-blocking version - start(*rt, cfg.hpx_main_f_, cfg.vm_, HPX_MOVE(startup), - HPX_MOVE(shutdown)); + int const result = start(*rt, cfg.hpx_main_f_, cfg.vm_, + HPX_MOVE(startup), HPX_MOVE(shutdown)); // pointer to runtime is stored in TLS hpx::runtime* p = rt.release(); (void) p; - return 0; + return result; } //////////////////////////////////////////////////////////////////////// diff --git a/libs/core/resource_partitioner/tests/unit/suspend_pool_external.cpp b/libs/core/resource_partitioner/tests/unit/suspend_pool_external.cpp index 4eea2224c9e9..39151f23ec72 100644 --- a/libs/core/resource_partitioner/tests/unit/suspend_pool_external.cpp +++ b/libs/core/resource_partitioner/tests/unit/suspend_pool_external.cpp @@ -30,27 +30,29 @@ void test_scheduler( hpx::local::init_params init_args; init_args.cfg = {"hpx.os_threads=" + - std::to_string(((std::min)(std::size_t(4), - std::size_t(hpx::threads::hardware_concurrency()))))}; + std::to_string(((std::min)(static_cast(4), + static_cast(hpx::threads::hardware_concurrency()))))}; init_args.rp_callback = [scheduler](auto& rp, hpx::program_options::variables_map const&) { rp.create_thread_pool("default", scheduler); }; - hpx::local::start(nullptr, argc, argv, init_args); + HPX_TEST(hpx::local::start(nullptr, argc, argv, init_args)); hpx::threads::thread_pool_base& default_pool = hpx::resource::get_thread_pool("default"); std::size_t const default_pool_threads = hpx::resource::get_num_threads("default"); - hpx::chrono::high_resolution_timer t; + hpx::chrono::high_resolution_timer const t; while (t.elapsed() < 2) { + std::atomic count_tasks = default_pool_threads * 10000; + for (std::size_t i = 0; i < default_pool_threads * 10000; ++i) { - hpx::post([]() {}); + hpx::post([&]() { --count_tasks; }); } bool suspended = false; @@ -72,6 +74,12 @@ void test_scheduler( { std::this_thread::yield(); } + + // wait for tasks finish running + while (count_tasks.load() != 0) + { + std::this_thread::yield(); + } } hpx::post([]() { hpx::local::finalize(); }); @@ -81,7 +89,7 @@ void test_scheduler( int main(int argc, char* argv[]) { - std::vector schedulers = { + std::vector const schedulers = { hpx::resource::scheduling_policy::local, hpx::resource::scheduling_policy::local_priority_fifo, #if defined(HPX_HAVE_CXX11_STD_ATOMIC_128BIT) diff --git a/libs/core/runtime_local/src/runtime_local.cpp b/libs/core/runtime_local/src/runtime_local.cpp index 94626ecfb40c..2aa33b83e009 100644 --- a/libs/core/runtime_local/src/runtime_local.cpp +++ b/libs/core/runtime_local/src/runtime_local.cpp @@ -1424,7 +1424,6 @@ namespace hpx { // see: http://connect.microsoft.com/VisualStudio/feedback/ViewFeedback.aspx?FeedbackID=100319 _isatty(0); #endif - // {{{ early startup code - local // initialize instrumentation system #ifdef HPX_HAVE_APEX @@ -1448,9 +1447,13 @@ namespace hpx { "I/O service pool"; #endif // start the thread manager - thread_manager_->run(); + if (!thread_manager_->run()) + { + std::cerr << "runtime::start: failed to start threadmanager\n"; + return -1; + } + lbt_ << "(1st stage) runtime::start: started threadmanager"; - // }}} // {{{ launch main // register the given main function with the thread manager @@ -1473,13 +1476,11 @@ namespace hpx { { return wait(); // wait for the shutdown_action to be executed } - else - { - // wait for at least hpx::state::running - util::yield_while( - [this]() { return get_state() < hpx::state::running; }, - "runtime::start"); - } + + // wait for at least hpx::state::running + util::yield_while( + [this]() { return get_state() < hpx::state::running; }, + "runtime::start"); return 0; // return zero as we don't know the outcome of hpx_main yet } diff --git a/libs/core/thread_pools/include/hpx/thread_pools/scheduled_thread_pool_impl.hpp b/libs/core/thread_pools/include/hpx/thread_pools/scheduled_thread_pool_impl.hpp index 2678f684bc04..72c30824fe08 100644 --- a/libs/core/thread_pools/include/hpx/thread_pools/scheduled_thread_pool_impl.hpp +++ b/libs/core/thread_pools/include/hpx/thread_pools/scheduled_thread_pool_impl.hpp @@ -229,10 +229,24 @@ namespace hpx::threads::detail { if (!threads_.empty()) { // wait for all work to be done before requesting threads to shut - // down + // down, but only if all threads were successfully initialized if (blocking) { - wait(); + bool must_wait = true; + for (const auto& thread : threads_) + { + // skip this if already stopped + if (!thread.joinable()) + { + must_wait = false; + break; + } + } + + if (must_wait) + { + wait(); + } } // wake up if suspended @@ -329,9 +343,9 @@ namespace hpx::threads::detail { // get_pu_mask expects index according to ordering of masks // in affinity_data::affinity_masks_ // which is in order of occupied PU - LTM_(info).format( - "run: {} create OS thread {}: will run on processing units " - "within this mask: {}", + LTM_(info).format("run: {} create OS thread {}: will run " + "on processing units " + "within this mask: {}", id_.name(), global_thread_num, hpx::threads::to_string(mask)); @@ -352,8 +366,7 @@ namespace hpx::threads::detail { "run: {} failed with: {}", id_.name(), e.what()); // trigger the barrier - pool_threads -= (thread_num + 1); - while (pool_threads-- != 0) + while (thread_num-- != 0) startup->wait(); stop_locked(l); @@ -1908,10 +1921,10 @@ namespace hpx::threads::detail { std::atomic& state = sched_->Scheduler::get_state(virt_core); - hpx::state oldstate = state.exchange(hpx::state::initialized); + [[maybe_unused]] hpx::state const oldstate = + state.exchange(hpx::state::initialized); HPX_ASSERT(oldstate == hpx::state::stopped || oldstate == hpx::state::initialized); - HPX_UNUSED(oldstate); threads_[virt_core] = std::thread(&scheduled_thread_pool::thread_func, this, virt_core, thread_num, HPX_MOVE(startup)); @@ -1941,7 +1954,7 @@ namespace hpx::threads::detail { sched_->Scheduler::get_state(virt_core); // inform the scheduler to stop the virtual core - hpx::state oldstate = state.exchange(hpx::state::stopping); + hpx::state const oldstate = state.exchange(hpx::state::stopping); if (oldstate > hpx::state::stopping) {