From a10ebfda3c27b78d7f82629367618e006c318e0e Mon Sep 17 00:00:00 2001 From: Hossein Moein Date: Tue, 5 Dec 2023 11:22:11 -0500 Subject: [PATCH] Converted all std::async to thread pool dispatch --- .../DataFrame/DataFrameFinancialVisitors.h | 74 +++++----- include/DataFrame/DataFrameStatsVisitors.h | 35 +---- include/DataFrame/Internals/DataFrame.tcc | 129 ++++++++---------- .../DataFrame/Internals/DataFrame_shift.tcc | 55 +------- .../DataFrame/Utils/Threads/SharedQueue.tcc | 6 +- .../Utils/Threads/ThreadGranularity.h | 8 +- .../DataFrame/Utils/Threads/ThreadPool.tcc | 7 +- 7 files changed, 121 insertions(+), 193 deletions(-) diff --git a/include/DataFrame/DataFrameFinancialVisitors.h b/include/DataFrame/DataFrameFinancialVisitors.h index 4e348135..d8722424 100644 --- a/include/DataFrame/DataFrameFinancialVisitors.h +++ b/include/DataFrame/DataFrameFinancialVisitors.h @@ -187,28 +187,29 @@ struct DoubleCrossOver { operator() (const K &idx_begin, const K &idx_end, const H &prices_begin, const H &prices_end) { - const size_type thread_level = - ThreadGranularity::get_thread_level(); - size_type re_count1 = 0; - size_type re_count2 = 0; + const auto thread_level = ThreadGranularity::get_thread_level(); + size_type re_count1 = 0; + size_type re_count2 = 0; - if (thread_level >= 2) { + if (thread_level > 0) { std::future fut1 = - std::async(std::launch::async, - &DoubleCrossOver::run_short_roller_, - this, - std::cref(idx_begin), - std::cref(idx_end), - std::cref(prices_begin), - std::cref(prices_end)); + ThreadGranularity::thr_pool_.dispatch( + false, + &DoubleCrossOver::run_short_roller_, + this, + std::cref(idx_begin), + std::cref(idx_end), + std::cref(prices_begin), + std::cref(prices_end)); std::future fut2 = - std::async(std::launch::async, - &DoubleCrossOver::run_long_roller_, - this, - std::cref(idx_begin), - std::cref(idx_end), - std::cref(prices_begin), - std::cref(prices_end)); + ThreadGranularity::thr_pool_.dispatch( + false, + &DoubleCrossOver::run_long_roller_, + this, + std::cref(idx_begin), + std::cref(idx_end), + std::cref(prices_begin), + std::cref(prices_end)); re_count1 = fut1.get(); re_count2 = fut2.get(); @@ -318,26 +319,27 @@ struct BollingerBand { operator() (const K &idx_begin, const K &idx_end, const H &prices_begin, const H &prices_end) { - const size_type thread_level = - ThreadGranularity::get_thread_level(); + const auto thread_level = ThreadGranularity::get_thread_level(); - if (thread_level >= 2) { + if (thread_level > 0) { std::future fut1 = - std::async(std::launch::async, - &BollingerBand::run_mean_roller_, - this, - std::cref(idx_begin), - std::cref(idx_end), - std::cref(prices_begin), - std::cref(prices_end)); + ThreadGranularity::thr_pool_.dispatch( + false, + &BollingerBand::run_mean_roller_, + this, + std::cref(idx_begin), + std::cref(idx_end), + std::cref(prices_begin), + std::cref(prices_end)); std::future fut2 = - std::async(std::launch::async, - &BollingerBand::run_std_roller_, - this, - std::cref(idx_begin), - std::cref(idx_end), - std::cref(prices_begin), - std::cref(prices_end)); + ThreadGranularity::thr_pool_.dispatch( + false, + &BollingerBand::run_std_roller_, + this, + std::cref(idx_begin), + std::cref(idx_end), + std::cref(prices_begin), + std::cref(prices_end)); fut1.get(); fut2.get(); diff --git a/include/DataFrame/DataFrameStatsVisitors.h b/include/DataFrame/DataFrameStatsVisitors.h index cda393d1..a2479dfd 100644 --- a/include/DataFrame/DataFrameStatsVisitors.h +++ b/include/DataFrame/DataFrameStatsVisitors.h @@ -1794,37 +1794,16 @@ struct AutoCorrVisitor { vec_type tmp_result(col_s - 4); size_type lag = 1; - const size_type thread_level = - ThreadGranularity::get_thread_level(); - vec_type> futures(thread_level); - size_type thread_count = 0; tmp_result[0] = 1.0; - while (lag < col_s - 4) { - if (thread_count >= thread_level) { - const auto result = get_auto_corr_(col_s, lag, column_begin); - tmp_result[result.first] = result.second; - } - else { - futures[thread_count] = - std::async(std::launch::async, - &AutoCorrVisitor::get_auto_corr_, - this, - col_s, - lag, - std::cref(column_begin)); - thread_count += 1; - } - lag += 1; - } - - for (size_type i = 0; i < thread_count; ++i) { - const auto &result = futures[i].get(); + while (lag < col_s - 4) { + const auto result = get_auto_corr_(col_s, lag, column_begin); tmp_result[result.first] = result.second; + lag += 1; } - tmp_result.swap(result_); + result_.swap(tmp_result); } DEFINE_PRE_POST @@ -1839,10 +1818,8 @@ struct AutoCorrVisitor { using CorrResult = std::pair; template - inline CorrResult - get_auto_corr_(size_type col_s, - size_type lag, - const H &column_begin) const { + inline static CorrResult + get_auto_corr_(size_type col_s, size_type lag, const H &column_begin) { CorrVisitor corr { }; constexpr I dummy = I(); diff --git a/include/DataFrame/Internals/DataFrame.tcc b/include/DataFrame/Internals/DataFrame.tcc index 94399f28..b8c2bc9b 100644 --- a/include/DataFrame/Internals/DataFrame.tcc +++ b/include/DataFrame/Internals/DataFrame.tcc @@ -377,91 +377,76 @@ fill_missing(const StlVecType &col_names, const StlVecType &values, int limit) { - const size_type count = col_names.size(); - StlVecType> futures(get_thread_level()); - ThreadGranularity::size_type thread_count = 0; + if (fp == fill_policy::linear_extrapolate) { + char buffer [512]; + snprintf(buffer, sizeof(buffer) - 1, + "DataFrame::fill_missing(): fill_policy %d not implemented", + static_cast(fp)); + throw NotImplemented(buffer); + } + + const size_type count = col_names.size(); + const ThreadGranularity::size_type thread_count = get_thread_level(); + StlVecType> futures; + + if (thread_count > 0) + futures.reserve(count); for (size_type i = 0; i < count; ++i) { ColumnVecType &vec = get_column(col_names[i]); - if (fp == fill_policy::value) { - if (thread_count >= get_thread_level()) + if (thread_count == 0) { + if (fp == fill_policy::value) fill_missing_value_(vec, values[i], limit, indices_.size()); - else { - futures[thread_count] = - std::async(std::launch::async, - &DataFrame::fill_missing_value_, - std::ref(vec), - std::cref(values[i]), - limit, - indices_.size()); - thread_count += 1; - } - } - else if (fp == fill_policy::fill_forward) { - if (thread_count >= get_thread_level()) + else if (fp == fill_policy::fill_forward) fill_missing_ffill_(vec, limit, indices_.size()); - else { - futures[thread_count] = - std::async(std::launch::async, - &DataFrame::fill_missing_ffill_, - std::ref(vec), - limit, - indices_.size()); - thread_count += 1; - } - } - else if (fp == fill_policy::fill_backward) { - if (thread_count >= get_thread_level()) + else if (fp == fill_policy::fill_backward) fill_missing_bfill_(vec, limit); - else { - futures[thread_count] = - std::async(std::launch::async, - &DataFrame::fill_missing_bfill_, - std::ref(vec), - limit); - thread_count += 1; - } - } - else if (fp == fill_policy::linear_interpolate) { - if (thread_count >= get_thread_level()) + else if (fp == fill_policy::linear_interpolate) fill_missing_linter_(vec, indices_, limit); - else { - futures[thread_count] = - std::async(std::launch::async, - &DataFrame::fill_missing_linter_, - std::ref(vec), - std::cref(indices_), - limit); - thread_count += 1; - } - } - else if (fp == fill_policy::mid_point) { - if (thread_count >= get_thread_level()) + else if (fp == fill_policy::mid_point) fill_missing_midpoint_(vec, limit, indices_.size()); - else { - futures[thread_count] = - std::async(std::launch::async, - &DataFrame::fill_missing_midpoint_, - std::ref(vec), - limit, - indices_.size()); - thread_count += 1; - } } - else if (fp == fill_policy::linear_extrapolate) { - char buffer [512]; - - snprintf ( - buffer, sizeof(buffer) - 1, - "DataFrame::fill_missing(): fill_policy %d is not implemented", - static_cast(fp)); - throw NotImplemented(buffer); + else { + if (fp == fill_policy::value) + futures.emplace_back( + thr_pool_.dispatch(false, + &DataFrame::fill_missing_value_, + std::ref(vec), + std::cref(values[i]), + limit, + indices_.size())); + else if (fp == fill_policy::fill_forward) + futures.emplace_back( + thr_pool_.dispatch(false, + &DataFrame::fill_missing_ffill_, + std::ref(vec), + limit, + indices_.size())); + else if (fp == fill_policy::fill_backward) + futures.emplace_back( + thr_pool_.dispatch(false, + &DataFrame::fill_missing_bfill_, + std::ref(vec), + limit)); + else if (fp == fill_policy::linear_interpolate) + futures.emplace_back( + thr_pool_.dispatch(false, + &DataFrame::fill_missing_linter_, + std::ref(vec), + std::cref(indices_), + limit)); + else if (fp == fill_policy::mid_point) + futures.emplace_back( + thr_pool_.dispatch(false, + &DataFrame::fill_missing_midpoint_, + std::ref(vec), + limit, + indices_.size())); } } + for (auto &fut : futures) fut.get(); - for (ThreadGranularity::size_type idx = 0; idx < thread_count; ++idx) - futures[idx].get(); return; } diff --git a/include/DataFrame/Internals/DataFrame_shift.tcc b/include/DataFrame/Internals/DataFrame_shift.tcc index bac0c613..610e8d2e 100644 --- a/include/DataFrame/Internals/DataFrame_shift.tcc +++ b/include/DataFrame/Internals/DataFrame_shift.tcc @@ -47,35 +47,14 @@ void DataFrame::self_shift(size_type periods, shift_policy sp) { if (periods > 0) [[likely]] { if (sp == shift_policy::down || sp == shift_policy::up) [[likely]] { vertical_shift_functor_ functor(periods, sp); - StlVecType> futures(get_thread_level()); - ThreadGranularity::size_type thread_count = 0; - const size_type data_size = data_.size(); + const size_type num_cols = data_.size(); { const SpinGuard guard(lock_); - for (size_type idx = 0; idx < data_size; ++idx) [[likely]] { - if (thread_count >= get_thread_level()) - data_[idx].change(functor); - else { - auto to_be_called = - static_cast - &&)> - (&DataVec::template - change>); - - futures[thread_count] = - std::async(std::launch::async, - to_be_called, - &(data_[idx]), - std::move(functor)); - thread_count += 1; - } - } + for (size_type idx = 0; idx < num_cols; ++idx) [[likely]] + data_[idx].change(functor); } - for (ThreadGranularity::size_type idx = 0; - idx < thread_count; ++idx) - futures[idx].get(); } else if (sp == shift_policy::left) { while (periods-- > 0) @@ -132,35 +111,15 @@ void DataFrame::self_rotate(size_type periods, shift_policy sp) { if (periods > 0) { if (sp == shift_policy::down || sp == shift_policy::up) [[likely]] { - rotate_functor_ functor(periods, sp); - StlVecType> futures(get_thread_level()); - ThreadGranularity::size_type thread_count = 0; - const size_type data_size = data_.size(); + rotate_functor_ functor(periods, sp); + const size_type num_cols = data_.size(); { const SpinGuard guard(lock_); - for (size_type idx = 0; idx < data_size; ++idx) [[likely]] { - if (thread_count >= get_thread_level()) - data_[idx].change(functor); - else { - auto to_be_called = - static_cast - &&)> - (&H::template change>); - - futures[thread_count] = - std::async(std::launch::async, - to_be_called, - &(data_[idx]), - std::move(functor)); - thread_count += 1; - } - } + for (size_type idx = 0; idx < num_cols; ++idx) [[likely]] + data_[idx].change(functor); } - for (ThreadGranularity::size_type idx = 0; - idx < thread_count; ++idx) - futures[idx].get(); } else if (sp == shift_policy::left) { std::rotate(column_list_.begin(), diff --git a/include/DataFrame/Utils/Threads/SharedQueue.tcc b/include/DataFrame/Utils/Threads/SharedQueue.tcc index e5f468a9..55e392b7 100644 --- a/include/DataFrame/Utils/Threads/SharedQueue.tcc +++ b/include/DataFrame/Utils/Threads/SharedQueue.tcc @@ -29,6 +29,10 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include +#include + +using namespace std::chrono_literals; + // ---------------------------------------------------------------------------- namespace hmdf @@ -73,7 +77,7 @@ SharedQueue::pop_front(bool wait_on_front) noexcept { std::unique_lock ul { mutex_ }; if (queue_.empty() && wait_on_front) { - while (queue_.empty()) cvx_.wait(ul); + while (queue_.empty()) cvx_.wait_for(ul, 2s); } if (! queue_.empty()) { diff --git a/include/DataFrame/Utils/Threads/ThreadGranularity.h b/include/DataFrame/Utils/Threads/ThreadGranularity.h index de3cc0ef..b0e7466b 100644 --- a/include/DataFrame/Utils/Threads/ThreadGranularity.h +++ b/include/DataFrame/Utils/Threads/ThreadGranularity.h @@ -57,13 +57,13 @@ struct ThreadGranularity { return (thr_pool_.capacity_threads()); } -protected: - - ThreadGranularity() = default; - // By defaut, there are no threads // inline static ThreadPool thr_pool_ { 0 }; + +protected: + + ThreadGranularity() = default; }; // ---------------------------------------------------------------------------- diff --git a/include/DataFrame/Utils/Threads/ThreadPool.tcc b/include/DataFrame/Utils/Threads/ThreadPool.tcc index 67d6e8aa..e9f0b450 100644 --- a/include/DataFrame/Utils/Threads/ThreadPool.tcc +++ b/include/DataFrame/Utils/Threads/ThreadPool.tcc @@ -121,7 +121,8 @@ ThreadPool::dispatch(bool immediately, F &&routine, As && ... args) { auto callable { std::make_shared> - (std::bind(std::forward(routine), std::forward(args) ...)) + (std::bind(std::forward(routine), + std::forward(args) ...)) }; future_t return_fut { callable->get_future() }; const WorkUnit work_unit { @@ -402,13 +403,13 @@ ThreadPool::get_one_local_task_() noexcept { WorkUnit work_unit; const guard_type guard { state_ }; - if (local_queue_ && local_queue_->empty() == false) { + if (local_queue_ && (! local_queue_->empty())) { work_unit = local_queue_->front(); local_queue_->pop(); } else { // Try to steal tasks from other queues for (auto &q : local_queues_) - if (&q != local_queue_ && q.empty() == false) { + if (! q.empty()) { work_unit = q.front(); q.pop(); break;