Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threadpool Refactor #1168

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
230 changes: 125 additions & 105 deletions cpr/threadpool.cpp
Original file line number Diff line number Diff line change
@@ -1,158 +1,178 @@
#include "cpr/threadpool.h"
#include <algorithm>
#include <cassert>
#include <chrono>
#include <condition_variable>
#include <cstddef>
#include <ctime>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#include <utility>

namespace cpr {
// NOLINTNEXTLINE(cert-err58-cpp) Not relevant since trivial function.
size_t ThreadPool::DEFAULT_MAX_THREAD_COUNT = std::max<size_t>(std::thread::hardware_concurrency(), static_cast<size_t>(1));

ThreadPool::ThreadPool(size_t min_threads, size_t max_threads, std::chrono::milliseconds max_idle_ms) : min_thread_num(min_threads), max_thread_num(max_threads), max_idle_time(max_idle_ms) {}
ThreadPool::ThreadPool(size_t minThreadCount, size_t maxThreadCount) : minThreadCount(minThreadCount), maxThreadCount(maxThreadCount) {
assert(minThreadCount <= maxThreadCount);
Start();
}

ThreadPool::~ThreadPool() {
Stop();
}

int ThreadPool::Start(size_t start_threads) {
if (status != STOP) {
return -1;
}
status = RUNNING;
start_threads = std::clamp(start_threads, min_thread_num, max_thread_num);
for (size_t i = 0; i < start_threads; ++i) {
CreateThread();
}
return 0;
ThreadPool::State ThreadPool::GetState() const {
return state.load();
}

int ThreadPool::Stop() {
const std::unique_lock status_lock(status_wait_mutex);
if (status == STOP) {
return -1;
}
size_t ThreadPool::GetMaxThreadCount() const {
return maxThreadCount.load();
}

status = STOP;
status_wait_cond.notify_all();
task_cond.notify_all();
size_t ThreadPool::GetCurThreadCount() const {
return curThreadCount.load();
}

for (auto& i : threads) {
if (i.thread->joinable()) {
i.thread->join();
}
}
size_t ThreadPool::GetIdleThreadCount() const {
return idleThreadCount.load();
}

size_t ThreadPool::GetMinThreadCount() const {
return minThreadCount.load();
}

void ThreadPool::SetMinThreadCount(size_t minThreadCount) {
assert(minThreadCount <= maxThreadCount);
this->minThreadCount = minThreadCount;
}

threads.clear();
cur_thread_num = 0;
idle_thread_num = 0;
return 0;
void ThreadPool::SetMaxThreadCount(size_t maxThreadCount) {
assert(minThreadCount <= maxThreadCount);
this->maxThreadCount = maxThreadCount;
}

int ThreadPool::Pause() {
if (status == RUNNING) {
status = PAUSE;
void ThreadPool::Start() {
const std::unique_lock lock(controlMutex);
if (setState(State::RUNNING)) {
for (size_t i = 0; i < std::max(minThreadCount.load(), tasks.size()); i++) {
addThread();
}
}
return 0;
}

int ThreadPool::Resume() {
const std::unique_lock status_lock(status_wait_mutex);
if (status == PAUSE) {
status = RUNNING;
status_wait_cond.notify_all();
void ThreadPool::Stop() {
const std::unique_lock controlLock(controlMutex);
setState(State::STOP);
taskQueueCondVar.notify_all();

// Join all workers
const std::unique_lock workersLock{workerMutex};
auto iter = workers.begin();
while (iter != workers.end()) {
if (iter->thread->joinable()) {
iter->thread->join();
}
iter = workers.erase(iter);
}
return 0;
}

int ThreadPool::Wait() {
void ThreadPool::Wait() {
while (true) {
if (status == STOP || (tasks.empty() && idle_thread_num == cur_thread_num)) {
if ((state != State::RUNNING && curThreadCount <= 0) || (tasks.empty() && curThreadCount <= idleThreadCount)) {
break;
}
std::this_thread::yield();
}
return 0;
}

bool ThreadPool::CreateThread() {
if (cur_thread_num >= max_thread_num) {
bool ThreadPool::setState(State state) {
const std::unique_lock lock(controlMutex);
if (this->state == state) {
return false;
}
std::thread* thread = new std::thread([this] {
bool initialRun = true;
while (status != STOP) {
{
std::unique_lock status_lock(status_wait_mutex);
status_wait_cond.wait(status_lock, [this]() { return status != Status::PAUSE; });
this->state = state;
return true;
}

void ThreadPool::addThread() {
assert(state != State::STOP);

const std::unique_lock lock{workerMutex};
workers.emplace_back();
workers.back().thread = std::make_unique<std::thread>(&ThreadPool::threadFunc, this, std::ref(workers.back()));
curThreadCount++;
idleThreadCount++;
}

void ThreadPool::threadFunc(WorkerThread& workerThread) {
while (true) {
std::cv_status result{std::cv_status::no_timeout};
{
std::unique_lock lock(taskQueueMutex);
if (tasks.empty()) {
result = taskQueueCondVar.wait_for(lock, std::chrono::milliseconds(250));
}
}

if (state == State::STOP) {
curThreadCount--;
break;
}

// A timeout has been reached check if we should cleanup the thread
if (result == std::cv_status::timeout) {
const std::unique_lock lock(controlMutex);
if (curThreadCount > minThreadCount) {
curThreadCount--;
break;
}
}

Task task;
{
std::unique_lock<std::mutex> locker(task_mutex);
task_cond.wait_for(locker, std::chrono::milliseconds(max_idle_time), [this]() { return status == STOP || !tasks.empty(); });
if (status == STOP) {
return;
}
if (tasks.empty()) {
if (cur_thread_num > min_thread_num) {
DelThread(std::this_thread::get_id());
return;
}
continue;
}
if (!initialRun) {
--idle_thread_num;
}
// Check for tasks and execute one
std::function<void()> task;
{
const std::unique_lock lock(taskQueueMutex);
if (!tasks.empty()) {
idleThreadCount--;
task = std::move(tasks.front());
tasks.pop();
}
if (task) {
task();
++idle_thread_num;
initialRun = false;
}
}
});
AddThread(thread);
return true;

// Execute the task
if (task) {
task();
idleThreadCount++;
}
}

// Make sure we clean up other stopped threads
if (state != State::STOP) {
joinStoppedThreads();
}

workerThread.state = State::STOP;

// Mark worker thread to be removed
workerJoinReadyCount++;
idleThreadCount--;
}

void ThreadPool::AddThread(std::thread* thread) {
thread_mutex.lock();
++cur_thread_num;
ThreadData data;
data.thread = std::shared_ptr<std::thread>(thread);
data.id = thread->get_id();
data.status = RUNNING;
data.start_time = std::chrono::steady_clock::now();
data.stop_time = std::chrono::steady_clock::time_point::max();
threads.emplace_back(data);
thread_mutex.unlock();
}

void ThreadPool::DelThread(std::thread::id id) {
const std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();

thread_mutex.lock();
--cur_thread_num;
--idle_thread_num;
auto iter = threads.begin();
while (iter != threads.end()) {
if (iter->status == STOP && now > iter->stop_time) {
void ThreadPool::joinStoppedThreads() {
const std::unique_lock lock{workerMutex};
auto iter = workers.begin();
while (iter != workers.end()) {
if (iter->state == State::STOP) {
if (iter->thread->joinable()) {
iter->thread->join();
iter = threads.erase(iter);
continue;
}
} else if (iter->id == id) {
iter->status = STOP;
iter->stop_time = std::chrono::steady_clock::now();
iter = workers.erase(iter);
workerJoinReadyCount--;
} else {
iter++;
}
++iter;
}
thread_mutex.unlock();
}

} // namespace cpr
11 changes: 3 additions & 8 deletions include/cpr/async.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,10 @@ auto async(Fn&& fn, Args&&... args) {

class async {
public:
static void startup(size_t min_threads = CPR_DEFAULT_THREAD_POOL_MIN_THREAD_NUM, size_t max_threads = CPR_DEFAULT_THREAD_POOL_MAX_THREAD_NUM, std::chrono::milliseconds max_idle_ms = CPR_DEFAULT_THREAD_POOL_MAX_IDLE_TIME) {
static void startup(size_t minThreads = ThreadPool::DEFAULT_MIN_THREAD_COUNT, size_t maxThreads = ThreadPool::DEFAULT_MAX_THREAD_COUNT) {
GlobalThreadPool* gtp = GlobalThreadPool::GetInstance();
if (gtp->IsStarted()) {
return;
}
gtp->SetMinThreadNum(min_threads);
gtp->SetMaxThreadNum(max_threads);
gtp->SetMaxIdleTime(max_idle_ms);
gtp->Start();
gtp->SetMinThreadCount(minThreads);
gtp->SetMaxThreadCount(maxThreads);
}

static void cleanup() {
Expand Down
Loading
Loading