-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy paththreadpool.c
340 lines (282 loc) · 11.7 KB
/
threadpool.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
#include "threadpool.h"
#include "log.h"
//做一些初始化操作
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{
int i;
threadpool_t *pool = NULL;
do {
if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {
LogWrite(ERROR, "%d %s:%s", __LINE__, "malloc threadpool failed", strerror(errno));
break;
}
pool->min_thr_num = min_thr_num; //线程池最小线程数
pool->max_thr_num = max_thr_num; //线程池最大线程数
pool->busy_thr_num = 0; //忙状态的线程数初始值为0
pool->live_thr_num = min_thr_num; //活着的线程数 初值=最小线程数
pool->wait_exit_thr_num = 0; //要销毁的线程个数初始值也初始为0.
pool->queue_size = 0; //任务队列实际元素个数初始值为0
pool->queue_max_size = queue_max_size; //任务队列最大元素个数。
pool->queue_front = 0;
pool->queue_rear = 0;
pool->shutdown = false; //不关闭线程池
/* 根据最大线程上限数, 给工作线程数组开辟空间, 并清零 */
pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); //线程池中线程最大个数
if (pool->threads == NULL) {
LogWrite(ERROR, "%d %s:%s", __LINE__, "malloc threads failed", strerror(errno));
break;
}
memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);
/* 队列开辟空间 */
pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size); //每个元素都是一个结构体,结构体中有一个函数指针和一个void*的指针。
if (pool->task_queue == NULL) {
LogWrite(ERROR, "%d %s:%s", __LINE__, "malloc task_queue failed", strerror(errno));
break;
}
/* 初始化互斥琐、条件变量 */
if (pthread_mutex_init(&(pool->lock), NULL) != 0 || pthread_mutex_init(&(pool->thread_counter), NULL) != 0 ||
pthread_cond_init(&(pool->queue_not_empty), NULL) != 0 || pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
{
LogWrite(ERROR, "%d %s:%s", __LINE__, "init the lock or cond failed", strerror(errno));
break;
}
for (i = 0; i < min_thr_num; i++) {
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
LogWrite(DEBUG, "%d %s%x", __LINE__, "start thread 0x", (unsigned int)pool->threads[i]);
}
pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool); /* 启动管理者线程 */
pthread_detach(pool->adjust_tid); //把管理线程设置为分离的,系统帮助回收资源。
LogWrite(DEBUG, "%d %s", __LINE__, "adjust_thread start");
//等待线程创建完成再回到主函数中。
for (i = 0; i < min_thr_num; i++) {
pthread_join(pool->threads[i], NULL);
}
//sleep(1); //等待线程创建完成再回到主函数中。
return pool;
} while (0);
threadpool_free(pool); /* 前面代码调用失败时,释放poll存储空间 */
return NULL;
}
/* 线程池中各个工作线程 */
void *threadpool_thread(void *threadpool)
{
threadpool_t *pool = (threadpool_t*)threadpool;
threadpool_task_t task;
while(true)
{
/*刚创建出线程,等待任务队列里 有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/
pthread_mutex_lock( &(pool->lock) );
/*queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上, 若有任务,跳过该while*/
while( (pool->queue_size ==0) && (!pool->shutdown) ) //线程池没有任务且不关闭线程池。
{
LogWrite(DEBUG, "%d %s%x %s", __LINE__, "thread 0x", (unsigned int)pthread_self(), "is waitting");
pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));//线程阻塞在这个条件变量上
/*清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程*/
if( pool->wait_exit_thr_num > 0) /* 要销毁的线程个数大于0 */
{
pool->wait_exit_thr_num--;
/*如果线程池里线程个数大于最小值时可以结束当前线程*/
if (pool->live_thr_num > pool->min_thr_num) {
LogWrite(DEBUG, "%d %s%x %s", __LINE__, "thread 0x", (unsigned int)pthread_self(), "is exiting");
pool->live_thr_num--;
pthread_mutex_unlock(&(pool->lock));
pthread_exit(NULL);
}
}
}
/*如果指定了true,要关闭线程池里的每个线程,自行退出处理*/
if(pool->shutdown)
{
pthread_mutex_unlock ( &(pool->lock) );
LogWrite(DEBUG, "%d %s%x %s", __LINE__, "thread 0x", (unsigned int)pthread_self(), "is exiting");
pthread_exit(NULL); /* 线程自行结束 */
}
//从任务队列里获取任务,是一个出队操作
task.function=pool->task_queue[pool->queue_front ].function;
task.arg = pool->task_queue[pool->queue_front ].arg;
pool->queue_front = ((pool->queue_front +1) % pool->queue_max_size);//队头指针向后移动一位。
pool->queue_size--;
/*任务队列中出了一个元素,还有位置 ,唤醒阻塞在这个条件变量上的线程,现在通知可以有新的任务添加进来*/
pthread_cond_broadcast(&(pool->queue_not_full)); //queue_not_full另一个条件变量。
/*任务取出后,立即将 线程池琐 释放*/
pthread_mutex_unlock(&(pool->lock));
/*执行任务*/
LogWrite(DEBUG, "%d %s%x %s", __LINE__, "thread 0x", (unsigned int)pthread_self(), "start working");
pthread_mutex_lock(&(pool->thread_counter)); /*忙状态线程数变量琐*/
pool->busy_thr_num++; /*忙状态线程数+1*/
pthread_mutex_unlock(&(pool->thread_counter));
(task.function)(task.arg); /*执行回调函数任务,相当于process(arg) */
/*任务结束处理*/
LogWrite(DEBUG, "%d %s%x %s", __LINE__, "thread 0x", (unsigned int)pthread_self(), "end working");
usleep(10000);
pthread_mutex_lock(&(pool->thread_counter));
pool->busy_thr_num--; /*处理掉一个任务,忙状态数线程数-1*/
pthread_mutex_unlock(&(pool->thread_counter));
}
pthread_exit(NULL);
}
int is_thread_alive(pthread_t tid)
{
int kill_rc = pthread_kill(tid, 0); //发0号信号,测试线程是否存活
if (kill_rc == ESRCH) {
return false;
}
return true;
}
/* 管理线程 */
void *adjust_thread(void *threadpool)
{
int i;
threadpool_t *pool = (threadpool_t *)threadpool ;
while( !(pool->shutdown) ) //线程池没有关闭
{
sleep(DEFAULT_TIME); /*定时 对线程池管理*/
LogWrite(DEBUG, "%d %s", __LINE__, "10s is finish,start test thread pool");
pthread_mutex_lock(&(pool->lock));
int queue_size = pool->queue_size; /* 关注 任务数 */
int live_thr_num = pool->live_thr_num; /* 存活 线程数 */
pthread_mutex_unlock(&(pool->lock));
pthread_mutex_lock(&(pool->thread_counter));
int busy_thr_num = pool->busy_thr_num;
pthread_mutex_unlock(&(pool->thread_counter)); /* 忙着的线程数 */
LogWrite(DEBUG, "%d %s %d", __LINE__, "busy thread number is", busy_thr_num);
/* 创建新线程 算法: 任务数大于最小线程池个数, 且存活的线程数少于最大线程个数时 如:30>=10 && 40<100*/
if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num)
{
LogWrite(DEBUG, "%d %s", __LINE__, "create new thread");
pthread_mutex_lock(&(pool->lock));
int add = 0;
/*一次增加 DEFAULT_THREAD 个线程*/
for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY
&& pool->live_thr_num < pool->max_thr_num; i++)
{
if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i]))
{
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
add++;
pool->live_thr_num++;
}
}
pthread_mutex_unlock(&(pool->lock));
}
/* 销毁多余的空闲线程 算法:忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/
if ((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num)
{
LogWrite(DEBUG, "%d %s", __LINE__, "delete pthread");
/* 一次销毁DEFAULT_THREAD个线程, 隨機10個即可 */
pthread_mutex_lock(&(pool->lock));
pool->wait_exit_thr_num = DEFAULT_THREAD_VARY;
pthread_mutex_unlock(&(pool->lock));
for (i = 0; i < DEFAULT_THREAD_VARY; i++)
{
/* 通知处在空闲状态的线程, 他们会自行终止*/
pthread_cond_signal(&(pool->queue_not_empty));
}
}
}
return NULL;
}
/* 向任务队列中, 添加一个任务 */
int threadpool_add(threadpool_t *pool, void *(*function)(void *arg), void *arg)
{
pthread_mutex_lock( &(pool->lock) );
/* ==为真,队列已经满, 调wait阻塞 */
while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown))
{
pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
}
if (pool->shutdown)
{
pthread_mutex_unlock(&(pool->lock));
}
/* 清空 工作线程 调用的回调函数 的参数arg */
if (pool->task_queue[pool->queue_rear].arg != NULL)
{
free(pool->task_queue[pool->queue_rear].arg);//这里调用free不安全,因为有可能线程执行函数中, 忙符合时低概率
pool->task_queue[pool->queue_rear].arg = NULL;
}
/*添加任务到任务队列里*/
pool->task_queue[pool->queue_rear].function = function; //在队列的尾部添加元素
pool->task_queue[pool->queue_rear].arg = arg;
pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; /* 队尾指针移动, 模拟环形 */
pool->queue_size++;
/*添加完任务后,队列不为空,唤醒阻塞在为空的那个条件变量上中的线程*/
pthread_cond_signal(&(pool->queue_not_empty));
pthread_mutex_unlock(&(pool->lock));
return EXIT_SUCCESS_CODE;
}
int threadpool_free(threadpool_t *pool)
{
if (pool == NULL) {
return EXIT_FAIL_CODE;
}
if (pool->task_queue) { //释放任务队列
free(pool->task_queue);
}
if (pool->threads) { //释放线程池数组以及各种锁。
free(pool->threads);
pthread_mutex_unlock(&(pool->lock));
pthread_mutex_destroy(&(pool->lock));
pthread_mutex_unlock(&(pool->thread_counter));
pthread_mutex_destroy(&(pool->thread_counter));
pthread_cond_destroy(&(pool->queue_not_empty));
pthread_cond_destroy(&(pool->queue_not_full));
}
free(pool);
pool = NULL;
return EXIT_SUCCESS_CODE;
}
int threadpool_destroy(threadpool_t *pool)
{
int i;
if (pool == NULL) {
return EXIT_FAIL_CODE;
}
pool->shutdown = true;
/*通知所有的空闲线程, 不需要循环广播*/
pthread_cond_broadcast(&(pool->queue_not_empty));
for (i = 0; i < pool->live_thr_num; i++) {
pthread_join(pool->threads[i], NULL);
}
threadpool_free(pool);
return EXIT_SUCCESS_CODE;
}
/* 线程池中的线程,模拟处理业务 */
void *process(void *arg)
{
LogWrite(DEBUG, "%d %s%x %s %d", __LINE__, "thread 0x", (unsigned int)pthread_self(), "working on task", *(int *)arg);
sleep(5);
LogWrite(ERROR, "%d %s %d %s", __LINE__, "task", *(int *)arg, "is end");
return NULL;
}
/*
int main()
{
threadpool_t *thp = threadpool_create(3,30,30);//创建线程池,池里最小3个线程,最大100,队列最大100
if (thp == NULL) {
LogWrite(ERROR, "%d %s", __LINE__, "thread pool create error");
return EXIT_FAIL_CODE;
}
LogWrite(INFO, "%d %s", __LINE__, "thread pool inited");
int num[30], i;
for (i = 0; i < 30; i++) {
num[i] = i;
LogWrite(INFO, "%d %s %d", __LINE__, "add task", i);
threadpool_add(thp, process, (void*)&num[i]); //向任务队列中添加任务
}
while(true) {
pthread_mutex_lock(&(thp->thread_counter));
int busyNum = thp->busy_thr_num;
int taskNum = thp->queue_size;
pthread_mutex_unlock(&(thp->thread_counter));
// 释放线程池
if (busyNum == 0 && taskNum == 0) {
LogWrite(DEBUG, "%d %s", __LINE__, "busyNum == 0 && taskNum == 0, clean up thread pool");
threadpool_destroy(thp);
return EXIT_SUCCESS_CODE;
}
}
printf("never go here\n");
}
*/
/* gcc threadpool.c -o threadpool -lpthread */