线程池创建后,通过append函数来给线程池里加入task,通过任务队列来维护这些task,在线程池的内部使用互斥锁来保护线程池,用一个sem变量来表示现在是否有task需要去运行。
在thread内有一个static void* worker类型的函数,它就是创建的threadNum个线程所共用的一个线程执行函数,传入的arg是this,即该threadPool实例的内存地址,然后强制类型转换后,通过pool->run调用run函数来执行真正的逻辑。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
using namespace std;
template<class T>
class threadPool{
private:
int threadNum;
int maxRequestNum;
pthread_t* threads = nullptr;
queue<T*> workQueue;
locker poolLocker;
sem queueState; // whether has tasks to do ? P/V operation
static void *worker(void *arg);// pull task from workQueue
void run();
public:
threadPool(int thread_number = 8, int max_request = 10000);
~threadPool();
bool append(T *request, int state);
bool append_p(T *request);
};
template<class T>
threadPool<T>::threadPool(int _threadNum, int _maxRequestNum){
if(_threadNum == 0 || _maxRequestNum == 0)
assert(0);
threadNum = _threadNum;
maxRequestNum = _maxRequestNum;
threads = new pthread_t[_threadNum];
assert(threads != nullptr);
for(int i = 0; i < _threadNum; ++i){
int ret = pthread_create(threads + i, NULL, worker, this); // public static void worker(){...}
if(ret != 0){
delete[] threads;
throw std::exception();
}
ret = pthread_detach(threads[i]);
if(ret != 0){
delete[] threads;
throw std::exception();
}
}
//std::cout << "init finished" << endl;
}
template<class T>
threadPool<T>::~threadPool(){
delete[] threads;
}
template<class T>
bool threadPool<T>::append(T *request, int state){
poolLocker.lock();
if(workQueue.size() >= threadNum){
poolLocker.unlock();
return false;
}
workQueue.push(request);
queueState.post(); // V: +1
poolLocker.unlock();
return true;
}
template<class T>
void* threadPool<T>::worker(void *arg){
threadPool* pool = (threadPool*) arg; // this
pool->run();
return pool;
}
template<class T>
void threadPool<T>::run(){
while(true){
/*
* P operation. if > 0, reduce 1 and return; else block until it > 0
* if there are no task to do, all threads will block here.
* else some threads will reduce and continue to run the following code.
*/
queueState.wait();
poolLocker.lock();
if(workQueue.empty()){ // there are no task in the workQueue, loop until workQueue is not empty
poolLocker.unlock();
continue;
}
T* request = workQueue.front();
workQueue.pop();
poolLocker.unlock();
if (!request){
std::cout << "request is null" << std::endl;
continue;
}
request->work(); // request is workinng !
}
}
这里对lock进行了一些封装,简化了lock、sem的调用,简化了threadPool的成员变量的结构。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class sem{
public:
sem(){
if (sem_init(&m_sem, 0, 0) != 0){
throw std::exception();
}
}
sem(int num){
if (sem_init(&m_sem, 0, num) != 0){
throw std::exception();
}
}
~sem(){
sem_destroy(&m_sem);
}
bool wait(){
return sem_wait(&m_sem) == 0;
}
bool post(){
return sem_post(&m_sem) == 0;
}
private:
sem_t m_sem;
};
class locker{
public:
locker(){
if (pthread_mutex_init(&m_mutex, NULL) != 0){
throw std::exception();
}
}
~locker(){
pthread_mutex_destroy(&m_mutex);
}
bool lock(){
return pthread_mutex_lock(&m_mutex) == 0;
}
bool unlock(){
return pthread_mutex_unlock(&m_mutex) == 0;
}
pthread_mutex_t *get(){
return &m_mutex;
}
private:
pthread_mutex_t m_mutex;
};
class cond{
public:
cond(){
if (pthread_cond_init(&m_cond, NULL) != 0){
//pthread_mutex_destroy(&m_mutex);
throw std::exception();
}
}
~cond(){
pthread_cond_destroy(&m_cond);
}
bool wait(pthread_mutex_t *m_mutex){
int ret = 0;
//pthread_mutex_lock(&m_mutex);
ret = pthread_cond_wait(&m_cond, m_mutex);
//pthread_mutex_unlock(&m_mutex);
return ret == 0;
}
bool timewait(pthread_mutex_t *m_mutex, struct timespec t){
int ret = 0;
//pthread_mutex_lock(&m_mutex);
ret = pthread_cond_timedwait(&m_cond, m_mutex, &t);
//pthread_mutex_unlock(&m_mutex);
return ret == 0;
}
bool signal(){
return pthread_cond_signal(&m_cond) == 0;
}
bool broadcast(){
return pthread_cond_broadcast(&m_cond) == 0;
}
private:
//static pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
};
worker就是一个task的实例,通过threadPool的append函数来提交task,然后在threadPool的run函数里调用相应的功能。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class worker{
public:
worker(int i):id(i){}
void work(){
std::cout << "worker: " << id << std::endl;
}
int getId(){return id;}
private:
int id;
};
初始化线程池没什么好说的。但是在append提交时会存在线程池满载而提交失败的问题,这时返回false,所以需要通过while(append)来持续提交。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
using namespace std;
int main(){
threadPool<worker>* pool = new threadPool<worker>(8, 10000);
vector<worker*> workers;
for(int i = 0; i < 20; ++i){
workers.push_back(new worker(i));
}
for(worker* w: workers){
while(!pool->append(w, 0)){
}
}
pause();
return 0;
}
最后给出makefile,记住需要加入lpthread flag1
2
3
4
5app: main.cpp threadPool.h worker.h lock.h
g++ -o app $^ -g -lpthread
clean:
rm -r app
40个task的运行结果如下