-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththreadpool.cpp
116 lines (114 loc) · 2.79 KB
/
threadpool.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#include <pthread.h>
#include <signal.h>
#include <iostream>
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;
typedef struct Node
{
void *(*mission)(void* stat);//任务队列中任务项
void *value; //任务队列对应参数
Node *nextnode; //下一个任务
}thread_mission;
//C++create第三个参数必须是静态函数,如果要内传参数,必须用静态内容。
class threadpool
{
int thread_limit; //线程大小
static pthread_t *thread; //线程id
static pthread_mutex_t *pmutex; //线程锁,主线程激励工作
static void *work(void *arg); //工作线程定义
static thread_mission *list; //任务队列 头
static thread_mission *bottom; //任务队列尾
public:threadpool(int num) //初始化创建线程池
{
thread_limit=num;
thread=new pthread_t[num];
pmutex=new pthread_mutex_t;
pthread_mutex_init(pmutex,NULL);
pthread_mutex_lock(&mutex);
for(int i=0;i<num;i++)
{
if((pthread_create(thread+i,NULL,work,this))!=0)
throw std::exception();
}
}
public:void thread_input(void *(*mission)(void *stat),void *stat); //线程堆入
public:void join()
{
while(1)
{
pthread_mutex_lock(pmutex);
if(list==NULL)
return;
pthread_mutex_unlock(pmutex);
}
}
public:~threadpool() //析构,关闭线程
{
pthread_mutex_destroy(pmutex);
for(int i=0;i<thread_limit;i++)
kill(thread[i],SIGKILL);
delete []thread;
delete pmutex;
}
};
pthread_t *threadpool::thread=NULL;
pthread_mutex_t *threadpool::pmutex=NULL;
thread_mission *threadpool::list=NULL;
thread_mission *threadpool::bottom=NULL;
void threadpool::thread_input(void *(mission)(void *stat),void *stat)
{
pthread_mutex_lock(pmutex);//任务堆入前先确保其他线程没有修改
thread_mission *point=new thread_mission;
point->mission=mission;
point->value=stat;
point->nextnode=NULL;
if(list==NULL)
bottom=list=point;
else
{
bottom->nextnode=point;
bottom=point;
}
pthread_mutex_unlock(pmutex);
pthread_mutex_unlock(&mutex);//这里其实是通知空闲进程开始工作
}
void *threadpool::work(void *arg)//工作线程
{
while(1)
{
//std::cout<<"test"<<std::endl;
if(list==NULL)
{
pthread_mutex_lock(&mutex);//如果所有任务处理完,等待主线程派发新任务
}
pthread_mutex_lock(pmutex);//确认其他线程没有抢占资源
if(list==NULL)//防止抢占
{
pthread_mutex_unlock(pmutex);
continue;
}
thread_mission *point=list;
list=list->nextnode;
pthread_mutex_unlock(pmutex);
(point->mission)(point->value);
delete point;
}
}
void *demo(void *arg)
{
int *point=(int *)arg;
std::cout<<*point<<std::endl;
delete point;
return NULL;
}
int main()
{
threadpool t(4);
for(int i=0;i<100;i++)
{
int *point=new int;
*point=i;
t.thread_input(demo,(void *)point);
}
t.join();
return 0;
}