-
Notifications
You must be signed in to change notification settings - Fork 64
/
Copy paththreadPool.h
172 lines (132 loc) · 3.58 KB
/
threadPool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
/*
Written by Antoine Savine in 2018
This code is the strict IP of Antoine Savine
License to use and alter this code for personal and commercial applications
is freely granted to any person or company who purchased a copy of the book
Modern Computational Finance: AAD and Parallel Simulations
Antoine Savine
Wiley, 2018
As long as this comment is preserved at the top of the file
*/
#pragma once
// Thread pool of chapter 3
#include <future>
#include <thread>
#include "ConcurrentQueue.h"
using namespace std;
typedef packaged_task<bool(void)> Task;
typedef future<bool> TaskHandle;
class ThreadPool
{
// The one and only instance
static ThreadPool myInstance;
// The task queue
ConcurrentQueue<Task> myQueue;
// The threads
vector<thread> myThreads;
// Active indicator
bool myActive;
// Interruption indicator
bool myInterrupt;
// Thread number
static thread_local size_t myTLSNum;
// The function that is executed on every thread
void threadFunc(const size_t num)
{
myTLSNum = num;
Task t;
// "Infinite" loop, only broken on destruction
while (!myInterrupt)
{
// Pop and executes tasks
myQueue.pop(t);
if (!myInterrupt) t();
}
}
// The constructor stays private, ensuring single instance
ThreadPool() : myActive(false), myInterrupt(false) {}
public:
// Access the instance
static ThreadPool* getInstance() { return &myInstance; }
// Number of threads
size_t numThreads() const { return myThreads.size(); }
// The number of the caller thread
static size_t threadNum() { return myTLSNum; }
// Starter
void start(const size_t nThread = thread::hardware_concurrency() - 1)
{
if (!myActive) // Only start once
{
myThreads.reserve(nThread);
// Launch threads on threadFunc and keep handles in a vector
for (size_t i = 0; i < nThread; i++)
myThreads.push_back(thread(&ThreadPool::threadFunc, this, i + 1));
myActive = true;
}
}
// Destructor
~ThreadPool()
{
stop();
}
void stop()
{
if (myActive)
{
// Interrupt mode
myInterrupt = true;
// Interrupt all waiting threads
myQueue.interrupt();
// Wait for them all to join
for_each(myThreads.begin(), myThreads.end(), mem_fn(&thread::join));
// Clear all threads
myThreads.clear();
// Clear the queue and reset interrupt
myQueue.clear();
myQueue.resetInterrupt();
// Mark as inactive
myActive = false;
// Reset interrupt
myInterrupt = false;
}
}
// Forbid copies etc
ThreadPool(const ThreadPool& rhs) = delete;
ThreadPool& operator=(const ThreadPool& rhs) = delete;
ThreadPool(ThreadPool&& rhs) = delete;
ThreadPool& operator=(ThreadPool&& rhs) = delete;
// Spawn task
template<typename Callable>
TaskHandle spawnTask(Callable c)
{
Task t(move(c));
TaskHandle f = t.get_future();
myQueue.push(move(t));
return f;
}
// Run queued tasks synchronously
// while waiting on a future,
// return true if at least one task was run
bool activeWait(const TaskHandle& f)
{
Task t;
bool b = false;
// Check if the future is ready without blocking
// The only syntax C++11 provides for that is
// wait 0 seconds and return status
while (f.wait_for(0s) != future_status::ready)
{
// Non blocking
if (myQueue.tryPop(t))
{
t();
b = true;
}
else // Nothing in the queue: go to sleep
{
f.wait();
}
}
return b;
}
};