From 76733db34ee8875b539898609e30fb145eebb485 Mon Sep 17 00:00:00 2001 From: Fabian Sauter Date: Fri, 24 Jan 2025 13:40:38 +0100 Subject: [PATCH 01/13] Initial threadpool2 work --- cpr/CMakeLists.txt | 1 + cpr/threadpool2.cpp | 133 +++++++++++++++++++++++++++++++++++++ include/CMakeLists.txt | 1 + include/cpr/threadpool2.h | 66 ++++++++++++++++++ test/CMakeLists.txt | 1 + test/threadpool2_tests.cpp | 14 ++++ 6 files changed, 216 insertions(+) create mode 100644 cpr/threadpool2.cpp create mode 100644 include/cpr/threadpool2.h create mode 100644 test/threadpool2_tests.cpp diff --git a/cpr/CMakeLists.txt b/cpr/CMakeLists.txt index db48358e4..4e18159af 100644 --- a/cpr/CMakeLists.txt +++ b/cpr/CMakeLists.txt @@ -19,6 +19,7 @@ add_library(cpr proxyauth.cpp session.cpp threadpool.cpp + threadpool2.cpp timeout.cpp unix_socket.cpp util.cpp diff --git a/cpr/threadpool2.cpp b/cpr/threadpool2.cpp new file mode 100644 index 000000000..bf03c6cc3 --- /dev/null +++ b/cpr/threadpool2.cpp @@ -0,0 +1,133 @@ +#include "cpr/threadpool2.h" +#include +#include +#include +#include +#include +#include +#include +#include + +namespace cpr { +size_t ThreadPool2::DEFAULT_MAX_THREAD_COUNT = std::thread::hardware_concurrency(); + +ThreadPool2::ThreadPool2(size_t minThreadCount, size_t maxThreadCount) : minThreadCount(minThreadCount), maxThreadCount(maxThreadCount) { + assert(minThreadCount <= maxThreadCount); + Start(); +} + +ThreadPool2::~ThreadPool2() { + Stop(); +} + +ThreadPool2::State ThreadPool2::GetState() const { + return state.load(); +} + +size_t ThreadPool2::GetMaxThreadCount() const { + return maxThreadCount.load(); +} + +size_t ThreadPool2::GetCurThreadCount() const { + return curThreadCount.load(); +} + +size_t ThreadPool2::GetMinThreadCount() const { + return minThreadCount.load(); +} + +void ThreadPool2::SetMinThreadCount(size_t minThreadCount) { + assert(minThreadCount <= maxThreadCount); + this->minThreadCount = minThreadCount; +} + +void ThreadPool2::SetMaxThreadCount(size_t maxThreadCount) { + assert(minThreadCount <= maxThreadCount); + this->maxThreadCount = maxThreadCount; +} + +void ThreadPool2::Start() { + const std::unique_lock lock(controlMutex); + setState(State::RUNNING); + + for (size_t i = 0; i < minThreadCount; i++) { + addThread(); + } +} + +void ThreadPool2::Stop() { + const std::unique_lock controlLock(controlMutex); + setState(State::STOP); + + // Join all workers + const std::unique_lock workersLock{workerMutex}; + auto iter = workers.begin(); + while (iter != workers.end()) { + if (iter->thread->joinable()) { + iter->thread->join(); + } + iter = workers.erase(iter); + } +} + +void ThreadPool2::setState(State state) { + const std::unique_lock lock(controlMutex); + if (this->state == state) { + return; + } +} + +void ThreadPool2::addThread() { + assert(state != State::STOP); + + const std::unique_lock lock{workerMutex}; + workers.emplace_back(); + workers.back().thread = std::make_unique(&ThreadPool2::threadFunc, this, std::ref(workers.back())); + curThreadCount++; +} + +void ThreadPool2::threadFunc(WorkerThread& workerThread) { + while (true) { + std::cv_status result{std::cv_status::timeout}; + { + std::unique_lock lock(taskQueueMutex); + result = taskQueueCondVar.wait_for(lock, std::chrono::milliseconds(250)); + } + + if (state == State::STOP) { + curThreadCount--; + break; + } + + // A timeout has been reached check if we should cleanup the thread + if (result == std::cv_status::timeout) { + const std::unique_lock lock(controlMutex); + if (curThreadCount > minThreadCount) { + curThreadCount--; + break; + } + } + + // Check for tasks and execute one + } + + workerThread.state = State::STOP; + + // Mark worker thread to be removed + workerJoinReadyCount++; +} + +void ThreadPool2::joinStoppedThreads() { + const std::unique_lock lock{workerMutex}; + auto iter = workers.begin(); + while (iter != workers.end()) { + if (iter->state == State::STOP) { + if (iter->thread->joinable()) { + iter->thread->join(); + } + iter = workers.erase(iter); + workerJoinReadyCount--; + } + } +} +} // namespace cpr diff --git a/include/CMakeLists.txt b/include/CMakeLists.txt index c0f73df8d..756718bc5 100644 --- a/include/CMakeLists.txt +++ b/include/CMakeLists.txt @@ -38,6 +38,7 @@ target_sources(cpr PRIVATE cpr/ssl_ctx.h cpr/ssl_options.h cpr/threadpool.h + cpr/threadpool2.h cpr/timeout.h cpr/unix_socket.h cpr/util.h diff --git a/include/cpr/threadpool2.h b/include/cpr/threadpool2.h new file mode 100644 index 000000000..a3d2831e0 --- /dev/null +++ b/include/cpr/threadpool2.h @@ -0,0 +1,66 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace cpr { +class ThreadPool2 { + public: + static constexpr size_t DEFAULT_MIN_THREAD_COUNT = 0; + static size_t DEFAULT_MAX_THREAD_COUNT; + + private: + enum class State : uint8_t { STOP, RUNNING, PAUSE }; + struct WorkerThread { + std::unique_ptr thread{nullptr}; + State state{State::RUNNING}; + }; + + std::mutex workerMutex; + std::list workers; + std::atomic_size_t workerJoinReadyCount{0}; + + std::mutex taskQueueMutex; + std::condition_variable taskQueueCondVar; + + std::atomic state = State::STOP; + std::atomic_size_t minThreadCount; + std::atomic_size_t curThreadCount{0}; + std::atomic_size_t maxThreadCount; + + std::recursive_mutex controlMutex; + + public: + explicit ThreadPool2(size_t minThreadCount = DEFAULT_MIN_THREAD_COUNT, size_t maxThreadCount = DEFAULT_MAX_THREAD_COUNT); + ThreadPool2(const ThreadPool2& other) = delete; + ThreadPool2(ThreadPool2&& old) = delete; + ~ThreadPool2(); + + ThreadPool2& operator=(const ThreadPool2& other) = delete; + ThreadPool2& operator=(ThreadPool2&& old) = delete; + + [[nodiscard]] State GetState() const; + [[nodiscard]] size_t GetMaxThreadCount() const; + [[nodiscard]] size_t GetCurThreadCount() const; + [[nodiscard]] size_t GetMinThreadCount() const; + + void SetMinThreadCount(size_t minThreadCount); + void SetMaxThreadCount(size_t maxThreadCount); + + void Start(); + void Stop(); + + private: + void setState(State newState); + void addThread(); + void joinStoppedThreads(); + + void threadFunc(WorkerThread& workerThread); +}; +} // namespace cpr diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index d78ff8229..969763d11 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -67,6 +67,7 @@ add_cpr_test(multiasync) add_cpr_test(file_upload) add_cpr_test(singleton) add_cpr_test(threadpool) +add_cpr_test(threadpool2) if (ENABLE_SSL_TESTS) add_cpr_test(ssl) diff --git a/test/threadpool2_tests.cpp b/test/threadpool2_tests.cpp new file mode 100644 index 000000000..35810b1fd --- /dev/null +++ b/test/threadpool2_tests.cpp @@ -0,0 +1,14 @@ +#include +#include +#include + +#include "cpr/threadpool2.h" + +TEST(ThreadPool2Tests, StartStop) { + cpr::ThreadPool2 tp(1, 1); +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} From 58812e1f544b37221397a28d43b7faedd28a2af5 Mon Sep 17 00:00:00 2001 From: Fabian Sauter Date: Sun, 26 Jan 2025 15:15:12 +0100 Subject: [PATCH 02/13] Thread pool2 tests and implementation finishing touches --- cpr/threadpool2.cpp | 37 +++++++++++++++++++++++++++++-------- include/cpr/threadpool2.h | 37 ++++++++++++++++++++++++++++++++++++- test/threadpool2_tests.cpp | 36 ++++++++++++++++++++++++++++++++++-- 3 files changed, 99 insertions(+), 11 deletions(-) diff --git a/cpr/threadpool2.cpp b/cpr/threadpool2.cpp index bf03c6cc3..18430f274 100644 --- a/cpr/threadpool2.cpp +++ b/cpr/threadpool2.cpp @@ -1,12 +1,13 @@ #include "cpr/threadpool2.h" #include -#include +#include #include -#include +#include #include +#include #include -#include #include +#include namespace cpr { size_t ThreadPool2::DEFAULT_MAX_THREAD_COUNT = std::thread::hardware_concurrency(); @@ -49,15 +50,12 @@ void ThreadPool2::SetMaxThreadCount(size_t maxThreadCount) { void ThreadPool2::Start() { const std::unique_lock lock(controlMutex); setState(State::RUNNING); - - for (size_t i = 0; i < minThreadCount; i++) { - addThread(); - } } void ThreadPool2::Stop() { const std::unique_lock controlLock(controlMutex); setState(State::STOP); + taskQueueCondVar.notify_all(); // Join all workers const std::unique_lock workersLock{workerMutex}; @@ -70,11 +68,21 @@ void ThreadPool2::Stop() { } } +void ThreadPool2::Wait() { + while (true) { + if ((state != State::RUNNING && curThreadCount <= 0) || (tasks.empty() && curThreadCount <= idleThreadCount)) { + break; + } + std::this_thread::yield(); + } +} + void ThreadPool2::setState(State state) { const std::unique_lock lock(controlMutex); if (this->state == state) { return; } + this->state = state; } void ThreadPool2::addThread() { @@ -84,6 +92,7 @@ void ThreadPool2::addThread() { workers.emplace_back(); workers.back().thread = std::make_unique(&ThreadPool2::threadFunc, this, std::ref(workers.back())); curThreadCount++; + idleThreadCount++; } void ThreadPool2::threadFunc(WorkerThread& workerThread) { @@ -91,7 +100,9 @@ void ThreadPool2::threadFunc(WorkerThread& workerThread) { std::cv_status result{std::cv_status::timeout}; { std::unique_lock lock(taskQueueMutex); - result = taskQueueCondVar.wait_for(lock, std::chrono::milliseconds(250)); + if (tasks.empty()) { + result = taskQueueCondVar.wait_for(lock, std::chrono::milliseconds(250)); + } } if (state == State::STOP) { @@ -109,6 +120,16 @@ void ThreadPool2::threadFunc(WorkerThread& workerThread) { } // Check for tasks and execute one + const std::unique_lock lock(taskQueueMutex); + if (!tasks.empty()) { + idleThreadCount--; + const std::function task = std::move(tasks.front()); + tasks.pop(); + + // Execute the task + task(); + } + idleThreadCount++; } workerThread.state = State::STOP; diff --git a/include/cpr/threadpool2.h b/include/cpr/threadpool2.h index a3d2831e0..b42735a74 100644 --- a/include/cpr/threadpool2.h +++ b/include/cpr/threadpool2.h @@ -4,9 +4,12 @@ #include #include #include +#include +#include #include #include #include +#include #include namespace cpr { @@ -16,7 +19,7 @@ class ThreadPool2 { static size_t DEFAULT_MAX_THREAD_COUNT; private: - enum class State : uint8_t { STOP, RUNNING, PAUSE }; + enum class State : uint8_t { STOP, RUNNING }; struct WorkerThread { std::unique_ptr thread{nullptr}; State state{State::RUNNING}; @@ -28,11 +31,13 @@ class ThreadPool2 { std::mutex taskQueueMutex; std::condition_variable taskQueueCondVar; + std::queue> tasks; std::atomic state = State::STOP; std::atomic_size_t minThreadCount; std::atomic_size_t curThreadCount{0}; std::atomic_size_t maxThreadCount; + std::atomic_size_t idleThreadCount{0}; std::recursive_mutex controlMutex; @@ -55,6 +60,36 @@ class ThreadPool2 { void Start(); void Stop(); + void Wait(); + + /** + * Return a future, calling future.get() will wait task done and return RetType. + * Submit(fn, args...) + * Submit(std::bind(&Class::mem_fn, &obj)) + * Submit(std::mem_fn(&Class::mem_fn, &obj)) + **/ + template + auto Submit(Fn&& fn, Args&&... args) { + // Add a new worker thread in case the tasks queue is not empty and we still can add a thread + { + std::unique_lock lock(taskQueueMutex); + if (idleThreadCount < tasks.size() && curThreadCount < maxThreadCount) { + addThread(); + } + } + + // Add task to queue + using RetType = decltype(fn(args...)); + const std::shared_ptr> task = std::make_shared>([fn = std::forward(fn), args...]() mutable { return std::invoke(fn, args...); }); + std::future future = task->get_future(); + { + std::unique_lock lock(taskQueueMutex); + tasks.emplace([task] { (*task)(); }); + } + + taskQueueCondVar.notify_one(); + return future; + } private: void setState(State newState); diff --git a/test/threadpool2_tests.cpp b/test/threadpool2_tests.cpp index 35810b1fd..be57200ed 100644 --- a/test/threadpool2_tests.cpp +++ b/test/threadpool2_tests.cpp @@ -4,8 +4,40 @@ #include "cpr/threadpool2.h" -TEST(ThreadPool2Tests, StartStop) { - cpr::ThreadPool2 tp(1, 1); +TEST(ThreadPool2Tests, BasicWorkOneThread) { + std::atomic_uint32_t invCount{0}; + uint32_t invCountExpected{100}; + + { + cpr::ThreadPool2 tp(1, 1); + + for (size_t i = 0; i < invCountExpected; ++i) { + tp.Submit([&invCount]() -> void { invCount++; }); + } + + // Wait for the thread pool to finish its work + tp.Wait(); + } + + EXPECT_EQ(invCount, invCountExpected); +} + +TEST(ThreadPool2Tests, BasicWorkMultipleThreads) { + std::atomic_uint32_t invCount{0}; + uint32_t invCountExpected{100}; + + { + cpr::ThreadPool2 tp(1, 10); + + for (size_t i = 0; i < invCountExpected; ++i) { + tp.Submit([&invCount]() -> void { invCount++; }); + } + + // Wait for the thread pool to finish its work + tp.Wait(); + } + + EXPECT_EQ(invCount, invCountExpected); } int main(int argc, char** argv) { From 8e9dd5978a472440e03ebac690f73d505373f5d8 Mon Sep 17 00:00:00 2001 From: Fabian Sauter Date: Sun, 26 Jan 2025 15:34:35 +0100 Subject: [PATCH 03/13] Fixed wor canceled before done --- cpr/threadpool2.cpp | 19 ++++++++++++------- test/threadpool2_tests.cpp | 17 +++++++++++++++++ 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/cpr/threadpool2.cpp b/cpr/threadpool2.cpp index 18430f274..2e54fbbdd 100644 --- a/cpr/threadpool2.cpp +++ b/cpr/threadpool2.cpp @@ -97,7 +97,7 @@ void ThreadPool2::addThread() { void ThreadPool2::threadFunc(WorkerThread& workerThread) { while (true) { - std::cv_status result{std::cv_status::timeout}; + std::cv_status result{std::cv_status::no_timeout}; { std::unique_lock lock(taskQueueMutex); if (tasks.empty()) { @@ -120,13 +120,18 @@ void ThreadPool2::threadFunc(WorkerThread& workerThread) { } // Check for tasks and execute one - const std::unique_lock lock(taskQueueMutex); - if (!tasks.empty()) { - idleThreadCount--; - const std::function task = std::move(tasks.front()); - tasks.pop(); + std::function task; + { + const std::unique_lock lock(taskQueueMutex); + if (!tasks.empty()) { + idleThreadCount--; + task = std::move(tasks.front()); + tasks.pop(); + } + } - // Execute the task + // Execute the task + if (task) { task(); } idleThreadCount++; diff --git a/test/threadpool2_tests.cpp b/test/threadpool2_tests.cpp index be57200ed..3394b65c5 100644 --- a/test/threadpool2_tests.cpp +++ b/test/threadpool2_tests.cpp @@ -40,6 +40,23 @@ TEST(ThreadPool2Tests, BasicWorkMultipleThreads) { EXPECT_EQ(invCount, invCountExpected); } +// Ensure only the current task gets finished when stopping worker +TEST(ThreadPool2Tests, CanceledBeforeDone) { + std::atomic_uint32_t invCount{0}; + { + cpr::ThreadPool2 tp(1, 1); + + for (size_t i = 0; i < 100; ++i) { + tp.Submit([&invCount]() -> void { + std::this_thread::sleep_for(std::chrono::seconds(1)); + invCount++; + }); + } + } + + EXPECT_EQ(invCount, 1); +} + int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); From 92068e508e9c4c7afcbef5383bca3546d9f2cb1c Mon Sep 17 00:00:00 2001 From: Fabian Sauter Date: Sun, 26 Jan 2025 18:04:24 +0100 Subject: [PATCH 04/13] Using threadpool2 as real thread pool --- cpr/CMakeLists.txt | 1 - cpr/threadpool.cpp | 222 +++++++++++++++++++------------------ cpr/threadpool2.cpp | 159 -------------------------- include/CMakeLists.txt | 1 - include/cpr/async.h | 11 +- include/cpr/threadpool.h | 149 ++++++++++--------------- include/cpr/threadpool2.h | 101 ----------------- test/CMakeLists.txt | 1 - test/multiasync_tests.cpp | 4 +- test/threadpool2_tests.cpp | 63 ----------- test/threadpool_tests.cpp | 155 +++++++++++++++++++------- 11 files changed, 292 insertions(+), 575 deletions(-) delete mode 100644 cpr/threadpool2.cpp delete mode 100644 include/cpr/threadpool2.h delete mode 100644 test/threadpool2_tests.cpp diff --git a/cpr/CMakeLists.txt b/cpr/CMakeLists.txt index 4e18159af..db48358e4 100644 --- a/cpr/CMakeLists.txt +++ b/cpr/CMakeLists.txt @@ -19,7 +19,6 @@ add_library(cpr proxyauth.cpp session.cpp threadpool.cpp - threadpool2.cpp timeout.cpp unix_socket.cpp util.cpp diff --git a/cpr/threadpool.cpp b/cpr/threadpool.cpp index 36b6400da..16e4fee81 100644 --- a/cpr/threadpool.cpp +++ b/cpr/threadpool.cpp @@ -1,158 +1,170 @@ #include "cpr/threadpool.h" #include +#include #include +#include #include -#include +#include #include #include #include #include namespace cpr { +size_t ThreadPool::DEFAULT_MAX_THREAD_COUNT = std::thread::hardware_concurrency(); -ThreadPool::ThreadPool(size_t min_threads, size_t max_threads, std::chrono::milliseconds max_idle_ms) : min_thread_num(min_threads), max_thread_num(max_threads), max_idle_time(max_idle_ms) {} +ThreadPool::ThreadPool(size_t minThreadCount, size_t maxThreadCount) : minThreadCount(minThreadCount), maxThreadCount(maxThreadCount) { + assert(minThreadCount <= maxThreadCount); + Start(); +} ThreadPool::~ThreadPool() { Stop(); } -int ThreadPool::Start(size_t start_threads) { - if (status != STOP) { - return -1; - } - status = RUNNING; - start_threads = std::clamp(start_threads, min_thread_num, max_thread_num); - for (size_t i = 0; i < start_threads; ++i) { - CreateThread(); - } - return 0; +ThreadPool::State ThreadPool::GetState() const { + return state.load(); } -int ThreadPool::Stop() { - const std::unique_lock status_lock(status_wait_mutex); - if (status == STOP) { - return -1; - } +size_t ThreadPool::GetMaxThreadCount() const { + return maxThreadCount.load(); +} - status = STOP; - status_wait_cond.notify_all(); - task_cond.notify_all(); +size_t ThreadPool::GetCurThreadCount() const { + return curThreadCount.load(); +} - for (auto& i : threads) { - if (i.thread->joinable()) { - i.thread->join(); - } - } +size_t ThreadPool::GetIdleThreadCount() const { + return idleThreadCount.load(); +} - threads.clear(); - cur_thread_num = 0; - idle_thread_num = 0; - return 0; +size_t ThreadPool::GetMinThreadCount() const { + return minThreadCount.load(); } -int ThreadPool::Pause() { - if (status == RUNNING) { - status = PAUSE; +void ThreadPool::SetMinThreadCount(size_t minThreadCount) { + assert(minThreadCount <= maxThreadCount); + this->minThreadCount = minThreadCount; +} + +void ThreadPool::SetMaxThreadCount(size_t maxThreadCount) { + assert(minThreadCount <= maxThreadCount); + this->maxThreadCount = maxThreadCount; +} + +void ThreadPool::Start() { + const std::unique_lock lock(controlMutex); + if (setState(State::RUNNING)) { + for (size_t i = 0; i < std::max(minThreadCount.load(), tasks.size()); i++) { + addThread(); + } } - return 0; } -int ThreadPool::Resume() { - const std::unique_lock status_lock(status_wait_mutex); - if (status == PAUSE) { - status = RUNNING; - status_wait_cond.notify_all(); +void ThreadPool::Stop() { + const std::unique_lock controlLock(controlMutex); + setState(State::STOP); + taskQueueCondVar.notify_all(); + + // Join all workers + const std::unique_lock workersLock{workerMutex}; + auto iter = workers.begin(); + while (iter != workers.end()) { + if (iter->thread->joinable()) { + iter->thread->join(); + } + iter = workers.erase(iter); } - return 0; } -int ThreadPool::Wait() { +void ThreadPool::Wait() { while (true) { - if (status == STOP || (tasks.empty() && idle_thread_num == cur_thread_num)) { + if ((state != State::RUNNING && curThreadCount <= 0) || (tasks.empty() && curThreadCount <= idleThreadCount)) { break; } std::this_thread::yield(); } - return 0; } -bool ThreadPool::CreateThread() { - if (cur_thread_num >= max_thread_num) { +bool ThreadPool::setState(State state) { + const std::unique_lock lock(controlMutex); + if (this->state == state) { return false; } - std::thread* thread = new std::thread([this] { - bool initialRun = true; - while (status != STOP) { - { - std::unique_lock status_lock(status_wait_mutex); - status_wait_cond.wait(status_lock, [this]() { return status != Status::PAUSE; }); + this->state = state; + return true; +} + +void ThreadPool::addThread() { + assert(state != State::STOP); + + const std::unique_lock lock{workerMutex}; + workers.emplace_back(); + workers.back().thread = std::make_unique(&ThreadPool::threadFunc, this, std::ref(workers.back())); + curThreadCount++; + idleThreadCount++; +} + +void ThreadPool::threadFunc(WorkerThread& workerThread) { + while (true) { + std::cv_status result{std::cv_status::no_timeout}; + { + std::unique_lock lock(taskQueueMutex); + if (tasks.empty()) { + result = taskQueueCondVar.wait_for(lock, std::chrono::milliseconds(250)); + } + } + + if (state == State::STOP) { + curThreadCount--; + break; + } + + // A timeout has been reached check if we should cleanup the thread + if (result == std::cv_status::timeout) { + const std::unique_lock lock(controlMutex); + if (curThreadCount > minThreadCount) { + curThreadCount--; + break; } + } - Task task; - { - std::unique_lock locker(task_mutex); - task_cond.wait_for(locker, std::chrono::milliseconds(max_idle_time), [this]() { return status == STOP || !tasks.empty(); }); - if (status == STOP) { - return; - } - if (tasks.empty()) { - if (cur_thread_num > min_thread_num) { - DelThread(std::this_thread::get_id()); - return; - } - continue; - } - if (!initialRun) { - --idle_thread_num; - } + // Check for tasks and execute one + std::function task; + { + const std::unique_lock lock(taskQueueMutex); + if (!tasks.empty()) { + idleThreadCount--; task = std::move(tasks.front()); tasks.pop(); } - if (task) { - task(); - ++idle_thread_num; - initialRun = false; - } } - }); - AddThread(thread); - return true; + + // Execute the task + if (task) { + task(); + idleThreadCount++; + } + } + + workerThread.state = State::STOP; + + // Mark worker thread to be removed + workerJoinReadyCount++; + idleThreadCount--; } -void ThreadPool::AddThread(std::thread* thread) { - thread_mutex.lock(); - ++cur_thread_num; - ThreadData data; - data.thread = std::shared_ptr(thread); - data.id = thread->get_id(); - data.status = RUNNING; - data.start_time = std::chrono::steady_clock::now(); - data.stop_time = std::chrono::steady_clock::time_point::max(); - threads.emplace_back(data); - thread_mutex.unlock(); -} - -void ThreadPool::DelThread(std::thread::id id) { - const std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now(); - - thread_mutex.lock(); - --cur_thread_num; - --idle_thread_num; - auto iter = threads.begin(); - while (iter != threads.end()) { - if (iter->status == STOP && now > iter->stop_time) { +void ThreadPool::joinStoppedThreads() { + const std::unique_lock lock{workerMutex}; + auto iter = workers.begin(); + while (iter != workers.end()) { + if (iter->state == State::STOP) { if (iter->thread->joinable()) { iter->thread->join(); - iter = threads.erase(iter); - continue; } - } else if (iter->id == id) { - iter->status = STOP; - iter->stop_time = std::chrono::steady_clock::now(); + iter = workers.erase(iter); + workerJoinReadyCount--; } - ++iter; } - thread_mutex.unlock(); } - } // namespace cpr diff --git a/cpr/threadpool2.cpp b/cpr/threadpool2.cpp deleted file mode 100644 index 2e54fbbdd..000000000 --- a/cpr/threadpool2.cpp +++ /dev/null @@ -1,159 +0,0 @@ -#include "cpr/threadpool2.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace cpr { -size_t ThreadPool2::DEFAULT_MAX_THREAD_COUNT = std::thread::hardware_concurrency(); - -ThreadPool2::ThreadPool2(size_t minThreadCount, size_t maxThreadCount) : minThreadCount(minThreadCount), maxThreadCount(maxThreadCount) { - assert(minThreadCount <= maxThreadCount); - Start(); -} - -ThreadPool2::~ThreadPool2() { - Stop(); -} - -ThreadPool2::State ThreadPool2::GetState() const { - return state.load(); -} - -size_t ThreadPool2::GetMaxThreadCount() const { - return maxThreadCount.load(); -} - -size_t ThreadPool2::GetCurThreadCount() const { - return curThreadCount.load(); -} - -size_t ThreadPool2::GetMinThreadCount() const { - return minThreadCount.load(); -} - -void ThreadPool2::SetMinThreadCount(size_t minThreadCount) { - assert(minThreadCount <= maxThreadCount); - this->minThreadCount = minThreadCount; -} - -void ThreadPool2::SetMaxThreadCount(size_t maxThreadCount) { - assert(minThreadCount <= maxThreadCount); - this->maxThreadCount = maxThreadCount; -} - -void ThreadPool2::Start() { - const std::unique_lock lock(controlMutex); - setState(State::RUNNING); -} - -void ThreadPool2::Stop() { - const std::unique_lock controlLock(controlMutex); - setState(State::STOP); - taskQueueCondVar.notify_all(); - - // Join all workers - const std::unique_lock workersLock{workerMutex}; - auto iter = workers.begin(); - while (iter != workers.end()) { - if (iter->thread->joinable()) { - iter->thread->join(); - } - iter = workers.erase(iter); - } -} - -void ThreadPool2::Wait() { - while (true) { - if ((state != State::RUNNING && curThreadCount <= 0) || (tasks.empty() && curThreadCount <= idleThreadCount)) { - break; - } - std::this_thread::yield(); - } -} - -void ThreadPool2::setState(State state) { - const std::unique_lock lock(controlMutex); - if (this->state == state) { - return; - } - this->state = state; -} - -void ThreadPool2::addThread() { - assert(state != State::STOP); - - const std::unique_lock lock{workerMutex}; - workers.emplace_back(); - workers.back().thread = std::make_unique(&ThreadPool2::threadFunc, this, std::ref(workers.back())); - curThreadCount++; - idleThreadCount++; -} - -void ThreadPool2::threadFunc(WorkerThread& workerThread) { - while (true) { - std::cv_status result{std::cv_status::no_timeout}; - { - std::unique_lock lock(taskQueueMutex); - if (tasks.empty()) { - result = taskQueueCondVar.wait_for(lock, std::chrono::milliseconds(250)); - } - } - - if (state == State::STOP) { - curThreadCount--; - break; - } - - // A timeout has been reached check if we should cleanup the thread - if (result == std::cv_status::timeout) { - const std::unique_lock lock(controlMutex); - if (curThreadCount > minThreadCount) { - curThreadCount--; - break; - } - } - - // Check for tasks and execute one - std::function task; - { - const std::unique_lock lock(taskQueueMutex); - if (!tasks.empty()) { - idleThreadCount--; - task = std::move(tasks.front()); - tasks.pop(); - } - } - - // Execute the task - if (task) { - task(); - } - idleThreadCount++; - } - - workerThread.state = State::STOP; - - // Mark worker thread to be removed - workerJoinReadyCount++; -} - -void ThreadPool2::joinStoppedThreads() { - const std::unique_lock lock{workerMutex}; - auto iter = workers.begin(); - while (iter != workers.end()) { - if (iter->state == State::STOP) { - if (iter->thread->joinable()) { - iter->thread->join(); - } - iter = workers.erase(iter); - workerJoinReadyCount--; - } - } -} -} // namespace cpr diff --git a/include/CMakeLists.txt b/include/CMakeLists.txt index 756718bc5..c0f73df8d 100644 --- a/include/CMakeLists.txt +++ b/include/CMakeLists.txt @@ -38,7 +38,6 @@ target_sources(cpr PRIVATE cpr/ssl_ctx.h cpr/ssl_options.h cpr/threadpool.h - cpr/threadpool2.h cpr/timeout.h cpr/unix_socket.h cpr/util.h diff --git a/include/cpr/async.h b/include/cpr/async.h index 1305834fb..a7339dd9c 100644 --- a/include/cpr/async.h +++ b/include/cpr/async.h @@ -29,15 +29,10 @@ auto async(Fn&& fn, Args&&... args) { class async { public: - static void startup(size_t min_threads = CPR_DEFAULT_THREAD_POOL_MIN_THREAD_NUM, size_t max_threads = CPR_DEFAULT_THREAD_POOL_MAX_THREAD_NUM, std::chrono::milliseconds max_idle_ms = CPR_DEFAULT_THREAD_POOL_MAX_IDLE_TIME) { + static void startup(size_t minThreads = ThreadPool::DEFAULT_MIN_THREAD_COUNT, size_t maxThreads = ThreadPool::DEFAULT_MAX_THREAD_COUNT) { GlobalThreadPool* gtp = GlobalThreadPool::GetInstance(); - if (gtp->IsStarted()) { - return; - } - gtp->SetMinThreadNum(min_threads); - gtp->SetMaxThreadNum(max_threads); - gtp->SetMaxIdleTime(max_idle_ms); - gtp->Start(); + gtp->SetMinThreadCount(minThreads); + gtp->SetMaxThreadCount(maxThreads); } static void cleanup() { diff --git a/include/cpr/threadpool.h b/include/cpr/threadpool.h index ec402a422..ed41f4193 100644 --- a/include/cpr/threadpool.h +++ b/include/cpr/threadpool.h @@ -1,9 +1,9 @@ -#ifndef CPR_THREAD_POOL_H -#define CPR_THREAD_POOL_H +#pragma once #include -#include #include +#include +#include #include #include #include @@ -11,61 +11,57 @@ #include #include #include -#include - -#define CPR_DEFAULT_THREAD_POOL_MAX_THREAD_NUM std::thread::hardware_concurrency() - -constexpr size_t CPR_DEFAULT_THREAD_POOL_MIN_THREAD_NUM = 1; -constexpr std::chrono::milliseconds CPR_DEFAULT_THREAD_POOL_MAX_IDLE_TIME{250}; namespace cpr { - class ThreadPool { public: - using Task = std::function; - - explicit ThreadPool(size_t min_threads = CPR_DEFAULT_THREAD_POOL_MIN_THREAD_NUM, size_t max_threads = CPR_DEFAULT_THREAD_POOL_MAX_THREAD_NUM, std::chrono::milliseconds max_idle_ms = CPR_DEFAULT_THREAD_POOL_MAX_IDLE_TIME); - ThreadPool(const ThreadPool& other) = delete; - ThreadPool(ThreadPool&& old) = delete; + static constexpr size_t DEFAULT_MIN_THREAD_COUNT = 0; + static size_t DEFAULT_MAX_THREAD_COUNT; - virtual ~ThreadPool(); + private: + enum class State : uint8_t { STOP, RUNNING }; + struct WorkerThread { + std::unique_ptr thread{nullptr}; + State state{State::RUNNING}; + }; - ThreadPool& operator=(const ThreadPool& other) = delete; - ThreadPool& operator=(ThreadPool&& old) = delete; + std::mutex workerMutex; + std::list workers; + std::atomic_size_t workerJoinReadyCount{0}; - void SetMinThreadNum(size_t min_threads) { - min_thread_num = min_threads; - } + std::mutex taskQueueMutex; + std::condition_variable taskQueueCondVar; + std::queue> tasks; - void SetMaxThreadNum(size_t max_threads) { - max_thread_num = max_threads; - } + std::atomic state = State::STOP; + std::atomic_size_t minThreadCount; + std::atomic_size_t curThreadCount{0}; + std::atomic_size_t maxThreadCount; + std::atomic_size_t idleThreadCount{0}; - void SetMaxIdleTime(std::chrono::milliseconds ms) { - max_idle_time = ms; - } + std::recursive_mutex controlMutex; - size_t GetCurrentThreadNum() { - return cur_thread_num; - } + public: + explicit ThreadPool(size_t minThreadCount = DEFAULT_MIN_THREAD_COUNT, size_t maxThreadCount = DEFAULT_MAX_THREAD_COUNT); + ThreadPool(const ThreadPool& other) = delete; + ThreadPool(ThreadPool&& old) = delete; + virtual ~ThreadPool(); - size_t GetIdleThreadNum() { - return idle_thread_num; - } + ThreadPool& operator=(const ThreadPool& other) = delete; + ThreadPool& operator=(ThreadPool&& old) = delete; - bool IsStarted() { - return status != STOP; - } + [[nodiscard]] State GetState() const; + [[nodiscard]] size_t GetMaxThreadCount() const; + [[nodiscard]] size_t GetCurThreadCount() const; + [[nodiscard]] size_t GetIdleThreadCount() const; + [[nodiscard]] size_t GetMinThreadCount() const; - bool IsStopped() { - return status == STOP; - } + void SetMinThreadCount(size_t minThreadCount); + void SetMaxThreadCount(size_t maxThreadCount); - int Start(size_t start_threads = 0); - int Stop(); - int Pause(); - int Resume(); - int Wait(); + void Start(); + void Stop(); + void Wait(); /** * Return a future, calling future.get() will wait task done and return RetType. @@ -75,64 +71,35 @@ class ThreadPool { **/ template auto Submit(Fn&& fn, Args&&... args) { - if (status == STOP) { - Start(); - } - if (idle_thread_num <= 0 && cur_thread_num < max_thread_num) { - CreateThread(); + // Add a new worker thread in case the tasks queue is not empty and we still can add a thread + { + std::unique_lock lock(taskQueueMutex); + if (idleThreadCount <= tasks.size() && curThreadCount < maxThreadCount) { + const std::unique_lock lock(controlMutex); + if (state == State::RUNNING) { + addThread(); + } + } } + + // Add task to queue using RetType = decltype(fn(args...)); - auto task = std::make_shared>([fn = std::forward(fn), args...]() mutable { return std::invoke(fn, args...); }); + const std::shared_ptr> task = std::make_shared>([fn = std::forward(fn), args...]() mutable { return std::invoke(fn, args...); }); std::future future = task->get_future(); { - std::lock_guard locker(task_mutex); + std::unique_lock lock(taskQueueMutex); tasks.emplace([task] { (*task)(); }); } - task_cond.notify_one(); + taskQueueCondVar.notify_one(); return future; } private: - bool CreateThread(); - void AddThread(std::thread* thread); - void DelThread(std::thread::id id); + bool setState(State newState); + void addThread(); + void joinStoppedThreads(); - public: - size_t min_thread_num; - size_t max_thread_num; - std::chrono::milliseconds max_idle_time; - - private: - enum Status { - STOP, - RUNNING, - PAUSE, - }; - - struct ThreadData { - std::shared_ptr thread; - std::thread::id id; - Status status; - std::chrono::steady_clock::time_point start_time; - std::chrono::steady_clock::time_point stop_time; - }; - - std::atomic status{Status::STOP}; - std::condition_variable status_wait_cond{}; - std::mutex status_wait_mutex{}; - - std::atomic cur_thread_num{0}; - std::atomic idle_thread_num{0}; - - std::list threads{}; - std::mutex thread_mutex{}; - - std::queue tasks{}; - std::mutex task_mutex{}; - std::condition_variable task_cond{}; + void threadFunc(WorkerThread& workerThread); }; - } // namespace cpr - -#endif diff --git a/include/cpr/threadpool2.h b/include/cpr/threadpool2.h deleted file mode 100644 index b42735a74..000000000 --- a/include/cpr/threadpool2.h +++ /dev/null @@ -1,101 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace cpr { -class ThreadPool2 { - public: - static constexpr size_t DEFAULT_MIN_THREAD_COUNT = 0; - static size_t DEFAULT_MAX_THREAD_COUNT; - - private: - enum class State : uint8_t { STOP, RUNNING }; - struct WorkerThread { - std::unique_ptr thread{nullptr}; - State state{State::RUNNING}; - }; - - std::mutex workerMutex; - std::list workers; - std::atomic_size_t workerJoinReadyCount{0}; - - std::mutex taskQueueMutex; - std::condition_variable taskQueueCondVar; - std::queue> tasks; - - std::atomic state = State::STOP; - std::atomic_size_t minThreadCount; - std::atomic_size_t curThreadCount{0}; - std::atomic_size_t maxThreadCount; - std::atomic_size_t idleThreadCount{0}; - - std::recursive_mutex controlMutex; - - public: - explicit ThreadPool2(size_t minThreadCount = DEFAULT_MIN_THREAD_COUNT, size_t maxThreadCount = DEFAULT_MAX_THREAD_COUNT); - ThreadPool2(const ThreadPool2& other) = delete; - ThreadPool2(ThreadPool2&& old) = delete; - ~ThreadPool2(); - - ThreadPool2& operator=(const ThreadPool2& other) = delete; - ThreadPool2& operator=(ThreadPool2&& old) = delete; - - [[nodiscard]] State GetState() const; - [[nodiscard]] size_t GetMaxThreadCount() const; - [[nodiscard]] size_t GetCurThreadCount() const; - [[nodiscard]] size_t GetMinThreadCount() const; - - void SetMinThreadCount(size_t minThreadCount); - void SetMaxThreadCount(size_t maxThreadCount); - - void Start(); - void Stop(); - void Wait(); - - /** - * Return a future, calling future.get() will wait task done and return RetType. - * Submit(fn, args...) - * Submit(std::bind(&Class::mem_fn, &obj)) - * Submit(std::mem_fn(&Class::mem_fn, &obj)) - **/ - template - auto Submit(Fn&& fn, Args&&... args) { - // Add a new worker thread in case the tasks queue is not empty and we still can add a thread - { - std::unique_lock lock(taskQueueMutex); - if (idleThreadCount < tasks.size() && curThreadCount < maxThreadCount) { - addThread(); - } - } - - // Add task to queue - using RetType = decltype(fn(args...)); - const std::shared_ptr> task = std::make_shared>([fn = std::forward(fn), args...]() mutable { return std::invoke(fn, args...); }); - std::future future = task->get_future(); - { - std::unique_lock lock(taskQueueMutex); - tasks.emplace([task] { (*task)(); }); - } - - taskQueueCondVar.notify_one(); - return future; - } - - private: - void setState(State newState); - void addThread(); - void joinStoppedThreads(); - - void threadFunc(WorkerThread& workerThread); -}; -} // namespace cpr diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 969763d11..d78ff8229 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -67,7 +67,6 @@ add_cpr_test(multiasync) add_cpr_test(file_upload) add_cpr_test(singleton) add_cpr_test(threadpool) -add_cpr_test(threadpool2) if (ENABLE_SSL_TESTS) add_cpr_test(ssl) diff --git a/test/multiasync_tests.cpp b/test/multiasync_tests.cpp index 0454d2dbe..7d265751d 100644 --- a/test/multiasync_tests.cpp +++ b/test/multiasync_tests.cpp @@ -319,10 +319,10 @@ TEST(MultiAsyncCancelTests, CancellationOnQueue) { return true; }}; - GlobalThreadPool::GetInstance()->Pause(); + GlobalThreadPool::GetInstance()->Stop(); std::vector resps{MultiGetAsync(std::tuple{hello_url, ProgressCallback{observer_fn}})}; EXPECT_EQ(CancellationResult::success, resps.at(0).Cancel()); - GlobalThreadPool::GetInstance()->Resume(); + GlobalThreadPool::GetInstance()->Start(); const bool was_called{synchro_env->fn_called}; EXPECT_EQ(false, was_called); } diff --git a/test/threadpool2_tests.cpp b/test/threadpool2_tests.cpp deleted file mode 100644 index 3394b65c5..000000000 --- a/test/threadpool2_tests.cpp +++ /dev/null @@ -1,63 +0,0 @@ -#include -#include -#include - -#include "cpr/threadpool2.h" - -TEST(ThreadPool2Tests, BasicWorkOneThread) { - std::atomic_uint32_t invCount{0}; - uint32_t invCountExpected{100}; - - { - cpr::ThreadPool2 tp(1, 1); - - for (size_t i = 0; i < invCountExpected; ++i) { - tp.Submit([&invCount]() -> void { invCount++; }); - } - - // Wait for the thread pool to finish its work - tp.Wait(); - } - - EXPECT_EQ(invCount, invCountExpected); -} - -TEST(ThreadPool2Tests, BasicWorkMultipleThreads) { - std::atomic_uint32_t invCount{0}; - uint32_t invCountExpected{100}; - - { - cpr::ThreadPool2 tp(1, 10); - - for (size_t i = 0; i < invCountExpected; ++i) { - tp.Submit([&invCount]() -> void { invCount++; }); - } - - // Wait for the thread pool to finish its work - tp.Wait(); - } - - EXPECT_EQ(invCount, invCountExpected); -} - -// Ensure only the current task gets finished when stopping worker -TEST(ThreadPool2Tests, CanceledBeforeDone) { - std::atomic_uint32_t invCount{0}; - { - cpr::ThreadPool2 tp(1, 1); - - for (size_t i = 0; i < 100; ++i) { - tp.Submit([&invCount]() -> void { - std::this_thread::sleep_for(std::chrono::seconds(1)); - invCount++; - }); - } - } - - EXPECT_EQ(invCount, 1); -} - -int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} diff --git a/test/threadpool_tests.cpp b/test/threadpool_tests.cpp index 66b42c1d5..bb2484f75 100644 --- a/test/threadpool_tests.cpp +++ b/test/threadpool_tests.cpp @@ -1,19 +1,16 @@ #include #include #include - +#include #include "cpr/threadpool.h" -TEST(ThreadPoolTests, DISABLED_BasicWorkOneThread) { +TEST(ThreadPoolTests, BasicWorkOneThread) { std::atomic_uint32_t invCount{0}; uint32_t invCountExpected{100}; { - cpr::ThreadPool tp; - tp.SetMinThreadNum(1); - tp.SetMaxThreadNum(1); - tp.Start(0); + cpr::ThreadPool tp(0, 1); for (size_t i = 0; i < invCountExpected; ++i) { tp.Submit([&invCount]() -> void { invCount++; }); @@ -21,20 +18,21 @@ TEST(ThreadPoolTests, DISABLED_BasicWorkOneThread) { // Wait for the thread pool to finish its work tp.Wait(); + EXPECT_EQ(tp.GetCurThreadCount(), 1); + EXPECT_EQ(tp.GetIdleThreadCount(), 1); + EXPECT_EQ(tp.GetMaxThreadCount(), 1); + EXPECT_EQ(tp.GetMinThreadCount(), 0); } EXPECT_EQ(invCount, invCountExpected); } -TEST(ThreadPoolTests, DISABLED_BasicWorkMultipleThreads) { +TEST(ThreadPoolTests, BasicWorkOneMinThread) { std::atomic_uint32_t invCount{0}; uint32_t invCountExpected{100}; { - cpr::ThreadPool tp; - tp.SetMinThreadNum(1); - tp.SetMaxThreadNum(10); - tp.Start(0); + cpr::ThreadPool tp(1, 1); for (size_t i = 0; i < invCountExpected; ++i) { tp.Submit([&invCount]() -> void { invCount++; }); @@ -42,63 +40,134 @@ TEST(ThreadPoolTests, DISABLED_BasicWorkMultipleThreads) { // Wait for the thread pool to finish its work tp.Wait(); + EXPECT_EQ(tp.GetCurThreadCount(), 1); + EXPECT_EQ(tp.GetIdleThreadCount(), 1); + EXPECT_EQ(tp.GetMaxThreadCount(), 1); + EXPECT_EQ(tp.GetMinThreadCount(), 1); } EXPECT_EQ(invCount, invCountExpected); } -TEST(ThreadPoolTests, DISABLED_PauseResumeSingleThread) { +TEST(ThreadPoolTests, BasicWorkMultipleThreads) { std::atomic_uint32_t invCount{0}; + uint32_t invCountExpected{100}; - uint32_t repCount{100}; - uint32_t invBunchSize{20}; - - cpr::ThreadPool tp; - tp.SetMinThreadNum(1); - tp.SetMaxThreadNum(10); - tp.Start(0); - - for (size_t i = 0; i < repCount; ++i) { - tp.Pause(); - EXPECT_EQ(invCount, i * invBunchSize); + { + cpr::ThreadPool tp(1, 10); - for (size_t e = 0; e < invBunchSize; ++e) { + for (size_t i = 0; i < invCountExpected; ++i) { tp.Submit([&invCount]() -> void { invCount++; }); } - tp.Resume(); + // Wait for the thread pool to finish its work tp.Wait(); + EXPECT_GE(tp.GetCurThreadCount(), 1); + EXPECT_LE(tp.GetCurThreadCount(), 10); + + EXPECT_GE(tp.GetIdleThreadCount(), 1); + EXPECT_LE(tp.GetIdleThreadCount(), 10); - EXPECT_EQ(invCount, (i + 1) * invBunchSize); + EXPECT_EQ(tp.GetMaxThreadCount(), 10); + EXPECT_EQ(tp.GetMinThreadCount(), 1); } + + EXPECT_EQ(invCount, invCountExpected); } -TEST(ThreadPoolTests, DISABLED_PauseResumeMultipleThreads) { - std::atomic_uint32_t invCount{0}; +TEST(ThreadPoolTests, StartStopBasicWorkMultipleThreads) { + uint32_t invCountExpected{100}; - uint32_t repCount{100}; - uint32_t invBunchSize{20}; + cpr::ThreadPool tp(1, 10); - cpr::ThreadPool tp; - tp.SetMinThreadNum(1); - tp.SetMaxThreadNum(10); - tp.Start(0); + for (size_t i = 0; i < 100; i++) { + std::atomic_uint32_t invCount{0}; + tp.Start(); + EXPECT_EQ(tp.GetCurThreadCount(), 1); + EXPECT_EQ(tp.GetIdleThreadCount(), 1); + EXPECT_EQ(tp.GetMaxThreadCount(), 10); + EXPECT_EQ(tp.GetMinThreadCount(), 1); - for (size_t i = 0; i < repCount; ++i) { - tp.Pause(); - EXPECT_EQ(invCount, i * invBunchSize); + { + for (size_t i = 0; i < invCountExpected; ++i) { + tp.Submit([&invCount]() -> void { invCount++; }); + } - for (size_t e = 0; e < invBunchSize; ++e) { - tp.Submit([&invCount]() -> void { invCount++; }); + // Wait for the thread pool to finish its work + tp.Wait(); + EXPECT_GE(tp.GetCurThreadCount(), 1); + EXPECT_LE(tp.GetCurThreadCount(), 10); + + EXPECT_GE(tp.GetIdleThreadCount(), 1); + EXPECT_LE(tp.GetIdleThreadCount(), 10); + + EXPECT_EQ(tp.GetMaxThreadCount(), 10); + EXPECT_EQ(tp.GetMinThreadCount(), 1); } - tp.Resume(); - // Wait for the thread pool to finish its work - tp.Wait(); - EXPECT_EQ(invCount, (i + 1) * invBunchSize); + EXPECT_EQ(invCount, invCountExpected); + tp.Stop(); + + EXPECT_EQ(tp.GetCurThreadCount(), 0); + EXPECT_EQ(tp.GetIdleThreadCount(), 0); + EXPECT_EQ(tp.GetMaxThreadCount(), 10); + EXPECT_EQ(tp.GetMinThreadCount(), 1); } } +// Ensure only the current task gets finished when stopping worker +TEST(ThreadPoolTests, CanceledBeforeDoneSingleThread) { + std::atomic_uint32_t invCount{0}; + std::mutex lock; + lock.lock(); + + { + cpr::ThreadPool tp(1, 1); + + for (size_t i = 0; i < 100; ++i) { + tp.Submit([&invCount, &lock]() -> void { + const std::unique_lock guard(lock); + invCount++; + }); + } + + EXPECT_EQ(tp.GetCurThreadCount(), 1); + EXPECT_EQ(tp.GetIdleThreadCount(), 0); + EXPECT_EQ(tp.GetMaxThreadCount(), 1); + EXPECT_EQ(tp.GetMinThreadCount(), 1); + + lock.unlock(); + } + + EXPECT_EQ(invCount, 1); +} + +// Ensure only the current task gets finished when stopping worker +TEST(ThreadPoolTests, CanceledBeforeDoneMultipleThreads) { + std::atomic_uint32_t invCount{0}; + std::mutex lock; + lock.lock(); + + { + cpr::ThreadPool tp(1, 10); + + for (size_t i = 0; i < 100; ++i) { + tp.Submit([&invCount, &lock]() -> void { + const std::unique_lock guard(lock); + invCount++; + }); + } + + EXPECT_EQ(tp.GetCurThreadCount(), 10); + EXPECT_EQ(tp.GetIdleThreadCount(), 0); + EXPECT_EQ(tp.GetMaxThreadCount(), 10); + EXPECT_EQ(tp.GetMinThreadCount(), 1); + + lock.unlock(); + } + + EXPECT_EQ(invCount, 10); +} int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); From 036405d4ade67dcbe4aac682529bb35435ad050b Mon Sep 17 00:00:00 2001 From: Fabian Sauter Date: Sun, 2 Feb 2025 13:21:17 +0100 Subject: [PATCH 05/13] Invoking joinStoppedThreads() before stopping a thread --- cpr/threadpool.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cpr/threadpool.cpp b/cpr/threadpool.cpp index 16e4fee81..3d3cc07c1 100644 --- a/cpr/threadpool.cpp +++ b/cpr/threadpool.cpp @@ -147,6 +147,11 @@ void ThreadPool::threadFunc(WorkerThread& workerThread) { } } + // Make sure we clean up other stopped threads + if (state != State::STOP) { + joinStoppedThreads(); + } + workerThread.state = State::STOP; // Mark worker thread to be removed From 8f776d48ec7656d28597dbab741017c03d47e776 Mon Sep 17 00:00:00 2001 From: Fabian Sauter Date: Sun, 2 Feb 2025 13:21:32 +0100 Subject: [PATCH 06/13] Added class and function docs for thread pool --- include/cpr/threadpool.h | 127 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 126 insertions(+), 1 deletion(-) diff --git a/include/cpr/threadpool.h b/include/cpr/threadpool.h index ed41f4193..b37ca3cdb 100644 --- a/include/cpr/threadpool.h +++ b/include/cpr/threadpool.h @@ -13,35 +13,111 @@ #include namespace cpr { +/** + * cpr thread pool implementation used by async requests. + * + * Example: + * // Create a new thread pool object + * cpr::ThreadPool tp; + * // Start the thread pool and spawn initial set of worker threads. + * tp.Start() + * // Add work + * tp.Submit(..) + * ... + * // Stop/join workers and flush the task queue + * tp.Stop() + * // Start the thread pool again spawning the initial set of worker threads. + * tp.Start() + * ... + **/ class ThreadPool { public: + /** + * The default minimum thread count for the thread pool. + * Even if there is no work this number of threads should be in standby for once new work arrives. + **/ static constexpr size_t DEFAULT_MIN_THREAD_COUNT = 0; + /** + * The default maximum thread count for the thread pool. + * Even if there is a lot of work, the thread pool is not allowed to create more threads than this number. + **/ static size_t DEFAULT_MAX_THREAD_COUNT; private: + /** + * The thread pool or worker thread state. + **/ enum class State : uint8_t { STOP, RUNNING }; + /** + * Collection of properties identifying a worker thread for the thread pool. + **/ struct WorkerThread { std::unique_ptr thread{nullptr}; + /** + * RUNNING: The thread is still active and working on or awaiting new work. + * STOP: The thread is shutting down or has already been shut down and is ready to be joined. + **/ State state{State::RUNNING}; }; + /** + * Mutex for synchronizing access to the worker thread list. + **/ std::mutex workerMutex; + /** + * A list of all worker threads + **/ std::list workers; + /** + * Number of threads ready to be joined where their state is 'STOP'. + **/ std::atomic_size_t workerJoinReadyCount{0}; + /** + * Mutex for synchronizing access to the task queue. + **/ std::mutex taskQueueMutex; + /** + * Conditional variable to let threads wait for new work to arrive. + **/ std::condition_variable taskQueueCondVar; + /** + * A queue of tasks synchronized by 'taskQueueMutex'. + **/ std::queue> tasks; - std::atomic state = State::STOP; + /** + * The current state for the thread pool. + **/ + std::atomic state = State::RUNNING; + /** + * The number of threads that should always be in standby or working. + **/ std::atomic_size_t minThreadCount; + /** + * The current number of threads available to the thread pool (working or idle). + **/ std::atomic_size_t curThreadCount{0}; + /** + * The maximum number of threads allowed to be used by this thread pool. + **/ std::atomic_size_t maxThreadCount; + /** + * The number of idle threads without any work awaiting new tasks. + **/ std::atomic_size_t idleThreadCount{0}; + /** + * General control mutex synchronizing access to internal thread pool resources. + **/ std::recursive_mutex controlMutex; public: + /** + * Creates a new thread pool object with a minimum and maximum thread count. + * minThreadCount: Number of threads that should always be in standby or working. + * maxThreadCount: The maximum number of threads allowed to be used by this thread pool. + **/ explicit ThreadPool(size_t minThreadCount = DEFAULT_MIN_THREAD_COUNT, size_t maxThreadCount = DEFAULT_MAX_THREAD_COUNT); ThreadPool(const ThreadPool& other) = delete; ThreadPool(ThreadPool&& old) = delete; @@ -50,24 +126,60 @@ class ThreadPool { ThreadPool& operator=(const ThreadPool& other) = delete; ThreadPool& operator=(ThreadPool&& old) = delete; + /** + * Returns the current thread pool state. + * The thread pool is in STOP state when initially created and will move over to RUNNING once Start() is invoked for the first time. + **/ [[nodiscard]] State GetState() const; + /** + * Returns the maximum number of threads allowed to be used by this thread pool. + **/ [[nodiscard]] size_t GetMaxThreadCount() const; + /** + * Returns the current number of threads available to the thread pool (working or idle). + **/ [[nodiscard]] size_t GetCurThreadCount() const; + /** + * Returns the number of idle threads without any work awaiting new tasks. + **/ [[nodiscard]] size_t GetIdleThreadCount() const; + /** + * Returns the number of threads that should always be in standby or working. + **/ [[nodiscard]] size_t GetMinThreadCount() const; + /** + * Sets the number of threads that should always be in standby or working. + **/ void SetMinThreadCount(size_t minThreadCount); + /** + * Sets the current number of threads available to the thread pool (working or idle). + **/ void SetMaxThreadCount(size_t maxThreadCount); + /** + * Starts the thread pool by spawning GetMinThreadCount() threads. + * Does nothing in case the thread pool state is already RUNNING. + **/ void Start(); + /** + * Sets the state to STOP, clears the task queue and joins all running threads. + * This means waiting for all threads that currently work on something letting them finish their task. + **/ void Stop(); + /** + * Returns as soon as the task queue is empty and all threads are either stopped/joined or in idel state. + **/ void Wait(); /** + * Enqueues a new task to the thread pool. * Return a future, calling future.get() will wait task done and return RetType. * Submit(fn, args...) * Submit(std::bind(&Class::mem_fn, &obj)) * Submit(std::mem_fn(&Class::mem_fn, &obj)) + * + * Will start a new thread in case all other threads are currently working and GetCurThreadCount() < GetMaxThreadCount(). **/ template auto Submit(Fn&& fn, Args&&... args) { @@ -96,10 +208,23 @@ class ThreadPool { } private: + /** + * Sets the new thread pool state. + * Returns true in case the current state was different to the newState. + **/ bool setState(State newState); + /** + * Adds a new worker thread. + **/ void addThread(); + /** + * Goes through the worker threads list and joins all threads where their state is STOP. + **/ void joinStoppedThreads(); + /** + * The thread entry point where the heavy lifting happens. + **/ void threadFunc(WorkerThread& workerThread); }; } // namespace cpr From d1ae009b9cbfc2bf48d7e353c4f72bb796339441 Mon Sep 17 00:00:00 2001 From: Fabian Sauter Date: Sun, 2 Feb 2025 13:49:12 +0100 Subject: [PATCH 07/13] Refined thread pool docs --- include/cpr/threadpool.h | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/include/cpr/threadpool.h b/include/cpr/threadpool.h index b37ca3cdb..a07f76fea 100644 --- a/include/cpr/threadpool.h +++ b/include/cpr/threadpool.h @@ -19,8 +19,6 @@ namespace cpr { * Example: * // Create a new thread pool object * cpr::ThreadPool tp; - * // Start the thread pool and spawn initial set of worker threads. - * tp.Start() * // Add work * tp.Submit(..) * ... @@ -115,6 +113,7 @@ class ThreadPool { public: /** * Creates a new thread pool object with a minimum and maximum thread count. + * Starts the thread pool via spawning 'minThreadCount' threads. * minThreadCount: Number of threads that should always be in standby or working. * maxThreadCount: The maximum number of threads allowed to be used by this thread pool. **/ @@ -128,7 +127,7 @@ class ThreadPool { /** * Returns the current thread pool state. - * The thread pool is in STOP state when initially created and will move over to RUNNING once Start() is invoked for the first time. + * The thread pool is in RUNNING state when initially created and will move over to STOP once Stop() is invoked. **/ [[nodiscard]] State GetState() const; /** @@ -153,7 +152,7 @@ class ThreadPool { **/ void SetMinThreadCount(size_t minThreadCount); /** - * Sets the current number of threads available to the thread pool (working or idle). + * Sets the maximum number of threads allowed to be used by this thread pool. **/ void SetMaxThreadCount(size_t maxThreadCount); From 350cf34e1e4abdb2caec2f222204839e1a7fa7bc Mon Sep 17 00:00:00 2001 From: Fabian Sauter Date: Sat, 8 Feb 2025 09:23:29 +0100 Subject: [PATCH 08/13] Fixed joinStoppedThreads() getting stuck --- cpr/threadpool.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cpr/threadpool.cpp b/cpr/threadpool.cpp index 3d3cc07c1..3d290d81b 100644 --- a/cpr/threadpool.cpp +++ b/cpr/threadpool.cpp @@ -169,6 +169,8 @@ void ThreadPool::joinStoppedThreads() { } iter = workers.erase(iter); workerJoinReadyCount--; + } else { + iter++; } } } From 33592c9e7ce89c364b2d46bd6c2049a4bce603ee Mon Sep 17 00:00:00 2001 From: Fabian Sauter Date: Sat, 8 Feb 2025 10:18:43 +0100 Subject: [PATCH 09/13] Ensuring thread pool Start() works in the ctr --- include/cpr/threadpool.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/cpr/threadpool.h b/include/cpr/threadpool.h index a07f76fea..061e6a149 100644 --- a/include/cpr/threadpool.h +++ b/include/cpr/threadpool.h @@ -87,7 +87,7 @@ class ThreadPool { /** * The current state for the thread pool. **/ - std::atomic state = State::RUNNING; + std::atomic state = State::STOP; /** * The number of threads that should always be in standby or working. **/ From fded2b47bdddaa8006c26aaa0de70ca9b605e1e2 Mon Sep 17 00:00:00 2001 From: Fabian Sauter Date: Sat, 8 Feb 2025 10:40:03 +0100 Subject: [PATCH 10/13] More reliable CanceledBeforeDone tests --- test/threadpool_tests.cpp | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/test/threadpool_tests.cpp b/test/threadpool_tests.cpp index bb2484f75..985486c76 100644 --- a/test/threadpool_tests.cpp +++ b/test/threadpool_tests.cpp @@ -117,7 +117,8 @@ TEST(ThreadPoolTests, StartStopBasicWorkMultipleThreads) { // Ensure only the current task gets finished when stopping worker TEST(ThreadPoolTests, CanceledBeforeDoneSingleThread) { - std::atomic_uint32_t invCount{0}; + std::atomic_uint32_t threadsDone{0}; + std::atomic_uint32_t threadsWaiting{0}; std::mutex lock; lock.lock(); @@ -125,12 +126,18 @@ TEST(ThreadPoolTests, CanceledBeforeDoneSingleThread) { cpr::ThreadPool tp(1, 1); for (size_t i = 0; i < 100; ++i) { - tp.Submit([&invCount, &lock]() -> void { + tp.Submit([&threadsDone, &lock, &threadsWaiting]() -> void { + threadsWaiting++; const std::unique_lock guard(lock); - invCount++; + threadsDone++; }); } + // Wait until all threads started. Can be replaced by std::barrier in C++20. + while (threadsWaiting < 1) { + std::this_thread::yield(); + } + EXPECT_EQ(tp.GetCurThreadCount(), 1); EXPECT_EQ(tp.GetIdleThreadCount(), 0); EXPECT_EQ(tp.GetMaxThreadCount(), 1); @@ -139,12 +146,13 @@ TEST(ThreadPoolTests, CanceledBeforeDoneSingleThread) { lock.unlock(); } - EXPECT_EQ(invCount, 1); + EXPECT_EQ(threadsDone, 1); } // Ensure only the current task gets finished when stopping worker TEST(ThreadPoolTests, CanceledBeforeDoneMultipleThreads) { - std::atomic_uint32_t invCount{0}; + std::atomic_uint32_t threadsDone{0}; + std::atomic_uint32_t threadsWaiting{0}; std::mutex lock; lock.lock(); @@ -152,12 +160,20 @@ TEST(ThreadPoolTests, CanceledBeforeDoneMultipleThreads) { cpr::ThreadPool tp(1, 10); for (size_t i = 0; i < 100; ++i) { - tp.Submit([&invCount, &lock]() -> void { + tp.Submit([&threadsDone, &lock, &threadsWaiting]() -> void { + threadsWaiting++; const std::unique_lock guard(lock); - invCount++; + threadsDone++; }); } + // Wait until all threads started. Can be replaced by std::barrier in C++20. + while (threadsWaiting < 10) { + std::this_thread::yield(); + } + + EXPECT_EQ(threadsDone, 0); + EXPECT_EQ(tp.GetCurThreadCount(), 10); EXPECT_EQ(tp.GetIdleThreadCount(), 0); EXPECT_EQ(tp.GetMaxThreadCount(), 10); @@ -166,7 +182,7 @@ TEST(ThreadPoolTests, CanceledBeforeDoneMultipleThreads) { lock.unlock(); } - EXPECT_EQ(invCount, 10); + EXPECT_EQ(threadsDone, 10); } int main(int argc, char** argv) { From ecf4854c3553f9bb6e3748b0b0c8364413ed0930 Mon Sep 17 00:00:00 2001 From: Fabian Sauter Date: Fri, 14 Feb 2025 14:54:14 +0100 Subject: [PATCH 11/13] Fixed cppcheck --- include/cpr/threadpool.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/cpr/threadpool.h b/include/cpr/threadpool.h index 061e6a149..dcb586ed2 100644 --- a/include/cpr/threadpool.h +++ b/include/cpr/threadpool.h @@ -186,7 +186,7 @@ class ThreadPool { { std::unique_lock lock(taskQueueMutex); if (idleThreadCount <= tasks.size() && curThreadCount < maxThreadCount) { - const std::unique_lock lock(controlMutex); + const std::unique_lock lockControl(controlMutex); if (state == State::RUNNING) { addThread(); } From 0e91979490ff218ecf12b722e97188a008449936 Mon Sep 17 00:00:00 2001 From: Fabian Sauter Date: Fri, 14 Feb 2025 15:38:20 +0100 Subject: [PATCH 12/13] Making sure the hardware_concurrency is at least 1 --- cpr/threadpool.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpr/threadpool.cpp b/cpr/threadpool.cpp index 3d290d81b..e8131ee3e 100644 --- a/cpr/threadpool.cpp +++ b/cpr/threadpool.cpp @@ -11,7 +11,8 @@ #include namespace cpr { -size_t ThreadPool::DEFAULT_MAX_THREAD_COUNT = std::thread::hardware_concurrency(); +// NOLINTNEXTLINE(cert-err58-cpp) Not relevant since trivial function. +size_t ThreadPool::DEFAULT_MAX_THREAD_COUNT = std::max(std::thread::hardware_concurrency(), static_cast(1)); ThreadPool::ThreadPool(size_t minThreadCount, size_t maxThreadCount) : minThreadCount(minThreadCount), maxThreadCount(maxThreadCount) { assert(minThreadCount <= maxThreadCount); From a6c8fcab4f9a1618b848d5f8449f5af29bb78bda Mon Sep 17 00:00:00 2001 From: Fabian Sauter Date: Fri, 14 Feb 2025 16:01:00 +0100 Subject: [PATCH 13/13] Avoiding a deadlock scenario when submitting a task --- include/cpr/threadpool.h | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/include/cpr/threadpool.h b/include/cpr/threadpool.h index dcb586ed2..044ecb34d 100644 --- a/include/cpr/threadpool.h +++ b/include/cpr/threadpool.h @@ -182,15 +182,23 @@ class ThreadPool { **/ template auto Submit(Fn&& fn, Args&&... args) { - // Add a new worker thread in case the tasks queue is not empty and we still can add a thread { - std::unique_lock lock(taskQueueMutex); - if (idleThreadCount <= tasks.size() && curThreadCount < maxThreadCount) { - const std::unique_lock lockControl(controlMutex); - if (state == State::RUNNING) { - addThread(); + const std::unique_lock lock(controlMutex); + // Add a new worker thread in case the tasks queue is not empty and we still can add a thread + bool shouldAddThread{false}; + { + std::unique_lock lock(taskQueueMutex); + if (idleThreadCount <= tasks.size() && curThreadCount < maxThreadCount) { + if (state == State::RUNNING) { + shouldAddThread = true; + } } } + + // We add a thread outside the 'taskQueueMutex' mutex block to avoid a potential deadlock caused within the 'addThread()' function. + if (shouldAddThread) { + addThread(); + } } // Add task to queue