0%

C++: threadPool的实现

线程池创建后,通过append函数来给线程池里加入task,通过任务队列来维护这些task,在线程池的内部使用互斥锁来保护线程池,用一个sem变量来表示现在是否有task需要去运行。
在thread内有一个static void* worker类型的函数,它就是创建的threadNum个线程所共用的一个线程执行函数,传入的argthis,即该threadPool实例的内存地址,然后强制类型转换后,通过pool->run调用run函数来执行真正的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#ifndef THREADPOOL_H
#define THREADPOOL_H

#include<iostream>
#include<pthread.h>
#include<assert.h>
#include<vector>
#include<queue>

#include "lock.h"

using namespace std;

template<class T>
class threadPool{
private:
int threadNum;
int maxRequestNum;
pthread_t* threads = nullptr;
queue<T*> workQueue;
locker poolLocker;
sem queueState; // whether has tasks to do ? P/V operation

static void *worker(void *arg);// pull task from workQueue
void run();
public:
threadPool(int thread_number = 8, int max_request = 10000);
~threadPool();
bool append(T *request, int state);
bool append_p(T *request);
};

template<class T>
threadPool<T>::threadPool(int _threadNum, int _maxRequestNum){
if(_threadNum == 0 || _maxRequestNum == 0)
assert(0);
threadNum = _threadNum;
maxRequestNum = _maxRequestNum;

threads = new pthread_t[_threadNum];
assert(threads != nullptr);

for(int i = 0; i < _threadNum; ++i){
int ret = pthread_create(threads + i, NULL, worker, this); // public static void worker(){...}
if(ret != 0){
delete[] threads;
throw std::exception();
}
ret = pthread_detach(threads[i]);
if(ret != 0){
delete[] threads;
throw std::exception();
}
}
//std::cout << "init finished" << endl;
}

template<class T>
threadPool<T>::~threadPool(){
delete[] threads;
}

template<class T>
bool threadPool<T>::append(T *request, int state){
poolLocker.lock();
if(workQueue.size() >= threadNum){
poolLocker.unlock();
return false;
}
workQueue.push(request);
queueState.post(); // V: +1
poolLocker.unlock();

return true;
}

template<class T>
void* threadPool<T>::worker(void *arg){
threadPool* pool = (threadPool*) arg; // this
pool->run();
return pool;
}

template<class T>
void threadPool<T>::run(){
while(true){
/*
* P operation. if > 0, reduce 1 and return; else block until it > 0
* if there are no task to do, all threads will block here.
* else some threads will reduce and continue to run the following code.
*/
queueState.wait();

poolLocker.lock();
if(workQueue.empty()){ // there are no task in the workQueue, loop until workQueue is not empty
poolLocker.unlock();
continue;
}
T* request = workQueue.front();
workQueue.pop();
poolLocker.unlock();
if (!request){
std::cout << "request is null" << std::endl;
continue;
}

request->work(); // request is workinng !
}
}
#endif

这里对lock进行了一些封装,简化了lock、sem的调用,简化了threadPool的成员变量的结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#ifndef LOCK_H
#define LOCK_H

#include <exception>
#include <pthread.h>
#include <semaphore.h>

class sem{
public:
sem(){
if (sem_init(&m_sem, 0, 0) != 0){
throw std::exception();
}
}
sem(int num){
if (sem_init(&m_sem, 0, num) != 0){
throw std::exception();
}
}
~sem(){
sem_destroy(&m_sem);
}
bool wait(){
return sem_wait(&m_sem) == 0;
}
bool post(){
return sem_post(&m_sem) == 0;
}
private:
sem_t m_sem;
};
class locker{
public:
locker(){
if (pthread_mutex_init(&m_mutex, NULL) != 0){
throw std::exception();
}
}
~locker(){
pthread_mutex_destroy(&m_mutex);
}
bool lock(){
return pthread_mutex_lock(&m_mutex) == 0;
}
bool unlock(){
return pthread_mutex_unlock(&m_mutex) == 0;
}
pthread_mutex_t *get(){
return &m_mutex;
}
private:
pthread_mutex_t m_mutex;
};
class cond{
public:
cond(){
if (pthread_cond_init(&m_cond, NULL) != 0){
//pthread_mutex_destroy(&m_mutex);
throw std::exception();
}
}
~cond(){
pthread_cond_destroy(&m_cond);
}
bool wait(pthread_mutex_t *m_mutex){
int ret = 0;
//pthread_mutex_lock(&m_mutex);
ret = pthread_cond_wait(&m_cond, m_mutex);
//pthread_mutex_unlock(&m_mutex);
return ret == 0;
}
bool timewait(pthread_mutex_t *m_mutex, struct timespec t){
int ret = 0;
//pthread_mutex_lock(&m_mutex);
ret = pthread_cond_timedwait(&m_cond, m_mutex, &t);
//pthread_mutex_unlock(&m_mutex);
return ret == 0;
}
bool signal(){
return pthread_cond_signal(&m_cond) == 0;
}
bool broadcast(){
return pthread_cond_broadcast(&m_cond) == 0;
}

private:
//static pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
};
#endif

worker就是一个task的实例,通过threadPoolappend函数来提交task,然后在threadPoolrun函数里调用相应的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#ifndef WORKER_H_
#define WORKER_H_

#include<iostream>

class worker{
public:
worker(int i):id(i){}
void work(){
std::cout << "worker: " << id << std::endl;
}
int getId(){return id;}
private:
int id;
};

#endif

初始化线程池没什么好说的。但是在append提交时会存在线程池满载而提交失败的问题,这时返回false,所以需要通过while(append)来持续提交。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include<iostream>
#include<vector>

#include "threadPool.h"
#include "worker.h"
#include <unistd.h>

using namespace std;

int main(){
threadPool<worker>* pool = new threadPool<worker>(8, 10000);
vector<worker*> workers;
for(int i = 0; i < 20; ++i){
workers.push_back(new worker(i));
}

for(worker* w: workers){
while(!pool->append(w, 0)){

}
}
pause();
return 0;
}

最后给出makefile,记住需要加入lpthread flag

1
2
3
4
5
app: main.cpp threadPool.h worker.h lock.h
g++ -o app $^ -g -lpthread

clean:
rm -r app

40个task的运行结果如下
res

Reference

线程同步之信号量