From e2139d3e114a7f8097d5df51fa5a7447df49b401 Mon Sep 17 00:00:00 2001 From: Noah Goldstein Date: Tue, 28 May 2024 11:59:41 -0500 Subject: [PATCH 1/3] ThreadPool: Spend less time busy waiting. The purpose of the patch is primarily to save power, but it also has nice perf benefits (mostly from allowing the system to better distribute power to cores doing meaningful work). Changes are twofold: 1) Decrease WorkerLoop spin count dramatically ~10^6 -> ~10^4. The reality is after ~10^4 spins, if there hasn't been any new work added its unlikely any new work is imminent so sleep to preserve power. 2) Use exponential backoff for waiting on memory. This saves a bit more power, and important increases the time between iterations in WorkerLoop to help accomidate the dramatically lowering spin counts. Since the tuning for both the iteration counts / backoff counts are dramatically different for hybrid/non-hybrid systems, this patch templates the affected functions and dynamically choses based on `CPUIDInfo::IsHybrid()`. This seemed like the "lightest weight" way of getting the change in, although its likely we could incur less dynamic overhead if we added the template argument to the entirety of `ThreadPoolTempl`. Measured performance on an [Intel Raptor Lake CPU](https://www.intel.com/content/www/us/en/products/sku/230496/intel-core-i913900k-processor-36m-cache-up-to-5-80-ghz/specifications.html) across a range of models. Below are the result of 3 runs with each metric being the value-before-patch / value-after-patch (so for something like inference time, lower is better). Session creation time cost|First inference time cost |Total inference time cost |Total inference requests |Average inference time cost |Total inference run time |Number of inferences per second |Avg CPU usage |Peak working set size |Runs |Min Latency |Max Latency |P50 Latency |P90 Latency |P95 Latency |P99 Latency :-----:|:-----:|:-----:|:-----:|:-----:|:-----:|:-----:|:-----:|:-----:|:-----:|:-----:|:-----:|:-----:|:-----:|:-----:|:-----: 0.9151 |0.8564 |0.9995 |0.9450 |0.9396 |0.9995 |0.9449 |0.9018 |0.9876 |1.0650 |0.9706 |0.8538 |0.9453 |0.9051 |0.8683 |0.8547 --- .../platform/EigenNonBlockingThreadPool.h | 65 ++++++++++++++++--- .../onnxruntime/core/platform/threadpool.h | 5 +- onnxruntime/core/common/threadpool.cc | 38 +++++++---- 3 files changed, 83 insertions(+), 25 deletions(-) diff --git a/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h b/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h index a7c63c507d1ba..1fd1de7575829 100644 --- a/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h +++ b/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h @@ -694,7 +694,7 @@ class RunQueue { static std::atomic next_tag{1}; -template +template class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInterface { private: struct PerThread; @@ -766,6 +766,29 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter typedef std::function Task; typedef RunQueue Queue; + // Class for waiting w/ exponential backoff. + // Template argument is maximum number of spins in backoff loop. + template + class ThreadPoolWaiter { + // Current number if spins in backoff loop + unsigned pause_time_; + + public: + void wait() { + // If kMaxBackoff is zero don't do any pausing. + if constexpr (kMaxBackoff == 1) { + onnxruntime::concurrency::SpinPause(); + } else if constexpr (kMaxBackoff > 1) { + // Exponential backoff + unsigned pause_time = pause_time_ + 1U; + for (unsigned i = 0; i < pause_time; ++i) { + onnxruntime::concurrency::SpinPause(); + } + pause_time_ = (pause_time * 2U) % kMaxBackoff; + } + } + }; + ThreadPoolTempl(const CHAR_TYPE* name, int num_threads, bool allow_spinning, Environment& env, const ThreadOptions& thread_options) : profiler_(num_threads, name), @@ -907,8 +930,9 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter // finish dispatch work. This avoids new tasks being started // concurrently with us attempting to end the parallel section. if (ps.dispatch_q_idx != -1) { + ThreadPoolWaiter<4> waiter{}; while (!ps.dispatch_done.load(std::memory_order_acquire)) { - onnxruntime::concurrency::SpinPause(); + waiter.wait(); } } @@ -930,15 +954,17 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter // Wait for the dispatch task's own work... if (ps.dispatch_q_idx > -1) { + ThreadPoolWaiter waiter{}; while (!ps.work_done.load(std::memory_order_acquire)) { - onnxruntime::concurrency::SpinPause(); + waiter.wait(); } } // ...and wait for any other tasks not revoked to finish their work auto tasks_to_wait_for = tasks_started - ps.tasks_revoked; + ThreadPoolWaiter waiter{}; while (ps.tasks_finished < tasks_to_wait_for) { - onnxruntime::concurrency::SpinPause(); + waiter.wait(); } // Clear status to allow the ThreadPoolParallelSection to be @@ -1256,9 +1282,10 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter // Increase the worker count if needed. Each worker will pick up // loops to execute from the current parallel section. std::function worker_fn = [&ps](unsigned par_idx) { + ThreadPoolWaiter waiter{}; while (ps.active) { if (ps.current_loop.load() == nullptr) { - onnxruntime::concurrency::SpinPause(); + waiter.wait(); } else { ps.workers_in_loop++; ThreadPoolLoop* work_item = ps.current_loop; @@ -1279,8 +1306,9 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter // Wait for workers to exit the loop ps.current_loop = 0; + ThreadPoolWaiter waiter{}; while (ps.workers_in_loop) { - onnxruntime::concurrency::SpinPause(); + waiter.wait(); } profiler_.LogEnd(ThreadPoolProfiler::WAIT); } @@ -1535,13 +1563,30 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter assert(td.GetStatus() == WorkerData::ThreadStatus::Spinning); - constexpr int log2_spin = 20; - const int spin_count = allow_spinning_ ? (1ull << log2_spin) : 0; - const int steal_count = spin_count / 100; + // The exact value of spin_count and steal_count are arbitrary and + // were experimentally determined. These numbers yielded the best + // performance across a range of workloads and + // machines. Generally, the goal of tuning spin_count is to make + // the number as small as possible while ensuring there is enough + // slack so that if each core is doing the same amount of work it + // won't sleep before they have all finished. The idea here is + // that in pipelined workloads, it won't sleep during each stage + // if it's done a bit faster than its neighbors, but that if there + // are non-equal sizes of work distributed, it won't take too long + // to reach sleep giving power (and thus frequency/performance) to + // its neighbors. Since hybrid has P/E cores, a lower value is + // chosen. On hybrid systems, even with equal sized workloads + // distributed the compute time won't stay synced. Typically in + // the hybrid case the P cores finish first (and are thus waiting) + // which is essentially a priority inversion. + constexpr int pref_spin_count = kIsHybrid ? 5000 : 10000; + const int spin_count = allow_spinning_ ? pref_spin_count : 0; + constexpr int steal_count = pref_spin_count / (kIsHybrid ? 25 : 100); SetDenormalAsZero(set_denormal_as_zero_); profiler_.LogThreadId(thread_id); + ThreadPoolWaiter waiter{}; while (!should_exit) { Task t = q.PopFront(); if (!t) { @@ -1557,7 +1602,7 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter if (spin_loop_status_.load(std::memory_order_relaxed) == SpinLoopStatus::kIdle) { break; } - onnxruntime::concurrency::SpinPause(); + waiter.wait(); } // Attempt to block diff --git a/include/onnxruntime/core/platform/threadpool.h b/include/onnxruntime/core/platform/threadpool.h index 04df6dc982c6a..8b0f8044b2351 100644 --- a/include/onnxruntime/core/platform/threadpool.h +++ b/include/onnxruntime/core/platform/threadpool.h @@ -129,7 +129,7 @@ struct TensorOpCost { namespace concurrency { -template +template class ThreadPoolTempl; class ExtendedThreadPoolInterface; @@ -424,7 +424,8 @@ class ThreadPool { ExtendedThreadPoolInterface* underlying_threadpool_ = nullptr; // If used, underlying_threadpool_ is instantiated and owned by the ThreadPool. - std::unique_ptr > extended_eigen_threadpool_; + std::unique_ptr> extended_eigen_hybrid_threadpool_; + std::unique_ptr> extended_eigen_normal_threadpool_; // Force the thread pool to run in hybrid mode on a normal cpu. bool force_hybrid_ = false; diff --git a/onnxruntime/core/common/threadpool.cc b/onnxruntime/core/common/threadpool.cc index b192688373851..3c3a9ad292544 100644 --- a/onnxruntime/core/common/threadpool.cc +++ b/onnxruntime/core/common/threadpool.cc @@ -390,13 +390,23 @@ ThreadPool::ThreadPool(Env* env, assert(thread_options_.affinities.size() >= size_t(threads_to_create)); } - extended_eigen_threadpool_ = - std::make_unique >(name, - threads_to_create, - low_latency_hint, - *env, - thread_options_); - underlying_threadpool_ = extended_eigen_threadpool_.get(); + if (force_hybrid_) { + extended_eigen_hybrid_threadpool_ = + std::make_unique >(name, + threads_to_create, + low_latency_hint, + *env, + thread_options_); + underlying_threadpool_ = extended_eigen_hybrid_threadpool_.get(); + } else { + extended_eigen_normal_threadpool_ = + std::make_unique >(name, + threads_to_create, + low_latency_hint, + *env, + thread_options_); + underlying_threadpool_ = extended_eigen_normal_threadpool_.get(); + } } } @@ -665,15 +675,17 @@ std::string ThreadPool::StopProfiling(concurrency::ThreadPool* tp) { } void ThreadPool::EnableSpinning() { - if (extended_eigen_threadpool_) { - extended_eigen_threadpool_->EnableSpinning(); - } + if (extended_eigen_hybrid_threadpool_) + extended_eigen_hybrid_threadpool_->EnableSpinning(); + else if (extended_eigen_normal_threadpool_) + extended_eigen_normal_threadpool_->EnableSpinning(); } void ThreadPool::DisableSpinning() { - if (extended_eigen_threadpool_) { - extended_eigen_threadpool_->DisableSpinning(); - } + if (extended_eigen_hybrid_threadpool_) + extended_eigen_hybrid_threadpool_->DisableSpinning(); + else if (extended_eigen_normal_threadpool_) + extended_eigen_normal_threadpool_->DisableSpinning(); } // Return the number of threads created by the pool. From 6245d758eb88ef856313cbef673d18490a5bbd0d Mon Sep 17 00:00:00 2001 From: Scott McKay Date: Fri, 4 Oct 2024 17:33:39 +1000 Subject: [PATCH 2/3] Refactor recent change to the thread pool to minimize binary size impact. --- .../platform/EigenNonBlockingThreadPool.h | 61 +++++++++++-------- .../onnxruntime/core/platform/threadpool.h | 5 +- onnxruntime/core/common/threadpool.cc | 34 +++-------- 3 files changed, 47 insertions(+), 53 deletions(-) diff --git a/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h b/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h index 1fd1de7575829..3bfef8a9de970 100644 --- a/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h +++ b/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h @@ -226,8 +226,8 @@ class ThreadPoolProfiler { void LogStart() {}; void LogEnd(ThreadPoolEvent){}; void LogEndAndStart(ThreadPoolEvent){}; - void LogStartAndCoreAndBlock(std::ptrdiff_t){}; - void LogCoreAndBlock(std::ptrdiff_t){}; + void LogStartAndCoreAndBlock(std::ptrdiff_t) {}; + void LogCoreAndBlock(std::ptrdiff_t) {}; void LogThreadId(int) {}; void LogRun(int) {}; std::string DumpChildThreadStat() { return {}; } @@ -694,7 +694,7 @@ class RunQueue { static std::atomic next_tag{1}; -template +template class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInterface { private: struct PerThread; @@ -768,34 +768,43 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter // Class for waiting w/ exponential backoff. // Template argument is maximum number of spins in backoff loop. - template class ThreadPoolWaiter { // Current number if spins in backoff loop - unsigned pause_time_; + unsigned pause_time_{0}; + const unsigned max_backoff_; public: + explicit ThreadPoolWaiter(unsigned max_backoff) + : max_backoff_(max_backoff) { + } + void wait() { - // If kMaxBackoff is zero don't do any pausing. - if constexpr (kMaxBackoff == 1) { - onnxruntime::concurrency::SpinPause(); - } else if constexpr (kMaxBackoff > 1) { - // Exponential backoff - unsigned pause_time = pause_time_ + 1U; - for (unsigned i = 0; i < pause_time; ++i) { + switch (max_backoff_) { + case 1: onnxruntime::concurrency::SpinPause(); - } - pause_time_ = (pause_time * 2U) % kMaxBackoff; - } + [[fallthrough]]; + case 0: + // If kMaxBackoff is zero don't do any pausing. + return; + default: + // Exponential backoff + unsigned pause_time = pause_time_ + 1U; + for (unsigned i = 0; i < pause_time; ++i) { + onnxruntime::concurrency::SpinPause(); + } + pause_time_ = (pause_time * 2U) % max_backoff_; + }; } }; ThreadPoolTempl(const CHAR_TYPE* name, int num_threads, bool allow_spinning, Environment& env, - const ThreadOptions& thread_options) + const ThreadOptions& thread_options, bool is_hybrid) : profiler_(num_threads, name), env_(env), num_threads_(num_threads), allow_spinning_(allow_spinning), set_denormal_as_zero_(thread_options.set_denormal_as_zero), + is_hybrid_{is_hybrid}, worker_data_(num_threads), all_coprimes_(num_threads), blocked_(0), @@ -930,7 +939,7 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter // finish dispatch work. This avoids new tasks being started // concurrently with us attempting to end the parallel section. if (ps.dispatch_q_idx != -1) { - ThreadPoolWaiter<4> waiter{}; + ThreadPoolWaiter waiter{4}; while (!ps.dispatch_done.load(std::memory_order_acquire)) { waiter.wait(); } @@ -952,9 +961,10 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter } profiler_.LogEnd(ThreadPoolProfiler::WAIT_REVOKE); + ThreadPoolWaiter waiter{is_hybrid_ ? 0U : 1U}; + // Wait for the dispatch task's own work... if (ps.dispatch_q_idx > -1) { - ThreadPoolWaiter waiter{}; while (!ps.work_done.load(std::memory_order_acquire)) { waiter.wait(); } @@ -962,7 +972,6 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter // ...and wait for any other tasks not revoked to finish their work auto tasks_to_wait_for = tasks_started - ps.tasks_revoked; - ThreadPoolWaiter waiter{}; while (ps.tasks_finished < tasks_to_wait_for) { waiter.wait(); } @@ -1281,8 +1290,9 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter // Increase the worker count if needed. Each worker will pick up // loops to execute from the current parallel section. - std::function worker_fn = [&ps](unsigned par_idx) { - ThreadPoolWaiter waiter{}; + const auto is_hybrid = is_hybrid_; + std::function worker_fn = [&ps, is_hybrid](unsigned par_idx) { + ThreadPoolWaiter waiter{is_hybrid ? 4U : 0U}; while (ps.active) { if (ps.current_loop.load() == nullptr) { waiter.wait(); @@ -1306,7 +1316,7 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter // Wait for workers to exit the loop ps.current_loop = 0; - ThreadPoolWaiter waiter{}; + ThreadPoolWaiter waiter{is_hybrid_ ? 1U : 4U}; while (ps.workers_in_loop) { waiter.wait(); } @@ -1524,6 +1534,7 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter const unsigned num_threads_; const bool allow_spinning_; const bool set_denormal_as_zero_; + const bool is_hybrid_; Eigen::MaxSizeVector worker_data_; Eigen::MaxSizeVector> all_coprimes_; std::atomic blocked_; // Count of blocked workers, used as a termination condition @@ -1579,14 +1590,14 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter // distributed the compute time won't stay synced. Typically in // the hybrid case the P cores finish first (and are thus waiting) // which is essentially a priority inversion. - constexpr int pref_spin_count = kIsHybrid ? 5000 : 10000; + const int pref_spin_count = is_hybrid_ ? 5000 : 10000; const int spin_count = allow_spinning_ ? pref_spin_count : 0; - constexpr int steal_count = pref_spin_count / (kIsHybrid ? 25 : 100); + const int steal_count = pref_spin_count / (is_hybrid_ ? 25 : 100); SetDenormalAsZero(set_denormal_as_zero_); profiler_.LogThreadId(thread_id); - ThreadPoolWaiter waiter{}; + ThreadPoolWaiter waiter{is_hybrid_ ? 1U : 8U}; while (!should_exit) { Task t = q.PopFront(); if (!t) { diff --git a/include/onnxruntime/core/platform/threadpool.h b/include/onnxruntime/core/platform/threadpool.h index 8b0f8044b2351..06532bfc94f96 100644 --- a/include/onnxruntime/core/platform/threadpool.h +++ b/include/onnxruntime/core/platform/threadpool.h @@ -129,7 +129,7 @@ struct TensorOpCost { namespace concurrency { -template +template class ThreadPoolTempl; class ExtendedThreadPoolInterface; @@ -424,8 +424,7 @@ class ThreadPool { ExtendedThreadPoolInterface* underlying_threadpool_ = nullptr; // If used, underlying_threadpool_ is instantiated and owned by the ThreadPool. - std::unique_ptr> extended_eigen_hybrid_threadpool_; - std::unique_ptr> extended_eigen_normal_threadpool_; + std::unique_ptr> extended_eigen_threadpool_; // Force the thread pool to run in hybrid mode on a normal cpu. bool force_hybrid_ = false; diff --git a/onnxruntime/core/common/threadpool.cc b/onnxruntime/core/common/threadpool.cc index 3c3a9ad292544..fca02a7669701 100644 --- a/onnxruntime/core/common/threadpool.cc +++ b/onnxruntime/core/common/threadpool.cc @@ -390,23 +390,13 @@ ThreadPool::ThreadPool(Env* env, assert(thread_options_.affinities.size() >= size_t(threads_to_create)); } - if (force_hybrid_) { - extended_eigen_hybrid_threadpool_ = - std::make_unique >(name, - threads_to_create, - low_latency_hint, - *env, - thread_options_); - underlying_threadpool_ = extended_eigen_hybrid_threadpool_.get(); - } else { - extended_eigen_normal_threadpool_ = - std::make_unique >(name, - threads_to_create, - low_latency_hint, - *env, - thread_options_); - underlying_threadpool_ = extended_eigen_normal_threadpool_.get(); - } + extended_eigen_threadpool_ = std::make_unique>(name, + threads_to_create, + low_latency_hint, + *env, + thread_options_, + force_hybrid_); + underlying_threadpool_ = extended_eigen_threadpool_.get(); } } @@ -675,17 +665,11 @@ std::string ThreadPool::StopProfiling(concurrency::ThreadPool* tp) { } void ThreadPool::EnableSpinning() { - if (extended_eigen_hybrid_threadpool_) - extended_eigen_hybrid_threadpool_->EnableSpinning(); - else if (extended_eigen_normal_threadpool_) - extended_eigen_normal_threadpool_->EnableSpinning(); + extended_eigen_threadpool_->EnableSpinning(); } void ThreadPool::DisableSpinning() { - if (extended_eigen_hybrid_threadpool_) - extended_eigen_hybrid_threadpool_->DisableSpinning(); - else if (extended_eigen_normal_threadpool_) - extended_eigen_normal_threadpool_->DisableSpinning(); + extended_eigen_threadpool_->DisableSpinning(); } // Return the number of threads created by the pool. From ed8683fb001d581b1a87ca8ca2b9c3baa049cdcc Mon Sep 17 00:00:00 2001 From: Noah Goldstein Date: Tue, 7 Jan 2025 17:21:58 -0800 Subject: [PATCH 3/3] Fixup Lints --- .../onnxruntime/core/platform/EigenNonBlockingThreadPool.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h b/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h index 3bfef8a9de970..d487ca5f58760 100644 --- a/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h +++ b/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h @@ -226,8 +226,8 @@ class ThreadPoolProfiler { void LogStart() {}; void LogEnd(ThreadPoolEvent){}; void LogEndAndStart(ThreadPoolEvent){}; - void LogStartAndCoreAndBlock(std::ptrdiff_t) {}; - void LogCoreAndBlock(std::ptrdiff_t) {}; + void LogStartAndCoreAndBlock(std::ptrdiff_t) {} + void LogCoreAndBlock(std::ptrdiff_t) {} void LogThreadId(int) {}; void LogRun(int) {}; std::string DumpChildThreadStat() { return {}; } @@ -793,7 +793,7 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter onnxruntime::concurrency::SpinPause(); } pause_time_ = (pause_time * 2U) % max_backoff_; - }; + } } };