定义一个线程池的结构体,先确定线程池大概的组成部分:任务队列(tasks)工作线程(workers)管理者(manager)。然后围绕这三个概念去补充就好了。

任务其实就是在线程中要被调用的函数,所以具体的定义如下:

1
2
3
4
struct task {
void* (*func)(void* arg);
void* arg;
};

而线程池中的 任务队列 顾名思义就是由一系列我们要做的任务组成的队列,所以在线程池中定义任务队列时还需要定义一些队列相关的变量

1
2
3
4
5
struct task* taskQueue;
int queueCapacity; /* 队列容量 */
int queueSize; /* 队列大小 */
int queueFront; /* 队头 */
int queueRear; /* 队尾 */

在线程池中的 工作线程 应该不止一个,所以我们需要一个数组来存放这些工作线程,同时,还需要一些变量来描述这个数组,以便 管理者线程 更好的管理这个数组。

1
2
3
4
5
6
7
8
pthread_t* workers;
int minimum; /* 最小工作线程数 */
int maximum; /* 最大工作线程数 */
int livenum; /* 存活的线程数 */
int busynum; /* 正在工作的线程数 */
int exitnum; /* 要销毁的线程数 */

pthread_t manager;

管理者 在线程池中的定义其实还算简单,它就是一个线程,我们去实现它就是去写好一个函数,线程池的核心就是这个管理者函数的运行。


具体实现

C语言版

1
2
3
4
5
6
7
8
9
/* threadpool.h */

#ifndef _THREADPOOL_H
#define _THREADPOOL_H

typedef struct task Task;
typedef struct threadpool ThreadPool;

#endif // _THREADPOOL_H
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
/* threadpool.c/

#include "threadpool.h"
#include <stdio.h>
#include <pthread.h>

#define ADDWORKER 2

struct task {
void* (*func)(void* arg);
void* arg;
};

struct threadpool {
struct task* taskQueue;
int queueCapacity;
int queueSize;
int queueFront;
int queueRear;

pthread_t manager;
pthread_t* workers;
int minimum; /* 最小工作线程数 */
int maximum; /* 最大工作线程数 */
int busynum; /* 正在工作的线程数 */
int livenum; /* 存活的线程数 */
int exitnum; /* 要销毁的线程数 */

pthread_mutex_t poolMutex; /* 锁整个线程池 */
pthread_mutex_t busyMutex; /* 锁busynum */

pthread_cond_t emptyCond; /* 任务队列为空的条件 */
pthread_cond_t fullCond; /* 任务队列为满的条件 */

int stop;
};

/* 不是让管理者主动销毁线程
而是发送信号唤醒被阻塞的工作线程
让工作线程去检测是否要自杀 */
void suicide(ThreadPool* pool) {
pthread_t tid = pthread_self();
for (int i = 0; i < pool->maximum; i++) {
if (tid == pool->workers[i]) {
pool->workers[i] = 0;
printf("worker: %d exit\n", tid);
break;
}
}
pthread_exit(NULL);
}

/* 工作线程被创建出来,要做的事就是不断读取任务队列,执行任务 */
void* worker(void* arg) {
ThreadPool* pool = (ThreadPool*)arg;
while (1) {
/* 很多worker都要从线程池中拿任务,涉及到线程同步问题 */
pthread_mutex_lock(&pool->poolMutex);
while (pool->queueSize == 0 && !pool->stop) {
pthread_cond_wait(&pool->emptyCond, &pool->poolMutex);
if (pool->exitnum > 0) {
pool->exitnum--;
pool->livenum--;
pthread_mutex_unlock(&pool->poolMutex);
suicide(pool);
}
}
/* 考虑到在取到任务时用户可能已经下达关闭线程池的指令 */
if (pool->stop) {
pthread_mutex_unlock(&pool->poolMutex);
suicide(pool);
}
Task task;
task.func = pool->taskQueue[pool->queueFront].func;
task.arg = pool->taskQueue[pool->queueFront].arg;
pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
pool->queueSize--;
pthread_cond_signal(&pool->fullCond);
/* 对任务队列操作完了就可解锁了,下面的操作不再涉及对任务队列的操作 */
pthread_mutex_unlock(&pool->poolMutex);

/* 获取到任务后,开始执行任务,其实就是调用下函数指针*/
pthread_mutex_lock(&pool->busyMutex);
pool->busynum++;
pthread_mutex_unlock(&pool->busyMutex);
task.func(task.arg);
free(task.arg);
task.arg = NULL;
pthread_mutex_lock(&pool->busyMutex);
pool->busynum--;
pthread_mutex_unlock(&pool->busyMutex);

sleep(3);
}
return NULL;
}

/* 管理的工作就是依据
线程池中的某些变量
根据自己指定的规则
不断地 动态地
创建或销毁工作线程 */
void* manager(void* arg) {
ThreadPool* pool = (ThreadPool*)arg;
while (!pool->stop) {
pthread_mutex_lock(&pool->poolMutex);
int taskNum = pool->queueSize;
int workerNum = pool->livenum;
int workingNum = pool->busynum;

/* 任务太多了,忙不过来了 */
if (taskNum > workerNum && workerNum < pool->maximum) {
int cnt = 0;
for (int i = 0; workerNum < pool->maximum && i < pool->maximum && cnt < ADDWORKER; i++) {
if (pool->workers[i] == 0) {
pthread_create(&pool->workers[i], NULL, worker, pool);
cnt++;
workerNum++, pool->livenum++;
}
}
}

/* 摸鱼的工人太多了,需要唤醒一些让它们去自杀 */
if (workingNum * 2 < workerNum && workerNum > pool->minimum) {
pool->exitnum++;
pthread_cond_signal(&pool->emptyCond);
}
pthread_mutex_unlock(&pool->poolMutex);

}
}

ThreadPool* creadThreadPool(int max, int min, int taskCapacity) {
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
if (pool == NULL) {
perror("线程池malloc失败");
return NULL;
}
/* 工作线程还没创建,但是得先有个坑位 */
pool->workers = (pthread_t*)malloc(sizeof(pthread_t) * max);
if (pool->workers == NULL) {
free(pool);
perror("工作线程malloc失败");
return NULL;
}
memset(pool->workers, 0, sizeof(pthread_t) * max);
pool->minimum = min;
pool->maximum = max;
pool->busynum = 0;
pool->livenum = min;
pool->exitnum = 0;

int res = 0;
res |= pthread_mutex_init(&pool->poolMutex, NULL);
res |= pthread_mutex_init(&pool->busyMutex, NULL);
res |= pthread_cond_init(&pool->emptyCond, NULL);
res |= pthread_cond_init(&pool->fullCond, NULL);
if (res != 0) {
free(pool->workers);
free(pool);
perrof("互斥条件初始化失败");
return NULL;
}

pool->taskQueue = (Task*)malloc(sizeof(Task) * taskCapacity);
pool->queueCapacity = taskCapacity;
pool->queueSize = 0;
pool->queueFront = 0;
pool->queueRear = 0;

pthread_create(&pool->manager, NULL, manager, NULL);
/* 先按最小线程数创建工作线程 */
for (int i = 0; i < min; i++) {
pthread_create(&pool->workers[i], NULL, worker, pool);
}

pool->stop = 0;
return pool;
}

void insertTask(ThreadPool* pool, void(*func)(void*), void* arg) {
pthread_mutex_lock(&pool->poolMutex);
while (pool->queueSize == pool->queueCapacity && !pool->stop) {
pthread_cond_wait(&pool->fullCond, &pool->poolMutex);
}
if (pool->stop) {
pthread_mutex_unlock(&pool->poolMutex);
return;
}
pool->taskQueue[pool->queueRear].func = func;
pool->taskQueue[pool->queueRear].arg = arg;
pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
pool->queueSize++;
pthread_cond_signal(&pool->emptyCond);
pthread_mutex_unlock(&pool->poolMutex);
}

int freeThreadPool(ThreadPool* pool) {
if (pool == NULL) return -1;
pool->stop = 1;
pthread_join(pool->manager, NULL);
pthread_cond_broadcast(&pool->emptyCond);
for (int i = 0; i < pool->maximum; i++) {
if (pool->workers[i] != 0)
pthread_join(pool->workers[i], NULL);
}
pthread_cond_broadcast(&pool->fullCond);
if (pool->taskQueue) free(pool->taskQueue);
if (pool->workers) free(pool->workers);
pthread_mutex_destroy(&pool->poolMutex);
pthread_mutex_destroy(&pool->busyMutex);
pthread_cond_destroy(&pool->emptyCond);
pthread_cond_destroy(&pool->fullCond);
free(pool);
pool = NULL;
return 0;
}

C++版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/* ThreadPool.h */
#pragma once
#include "TaskQueue.h"

class ThreadPool
{
private:
TaskQueue* m_TaskQueue;

pthread_t managerID;
pthread_t* workers;
int minimum;
int maximum;
int busyNum;
int liveNum;
int exitNum;
pthread_mutex_t m_Mutex;
pthread_cond_t m_EmptyCond;

bool stop;

private:
static void* manager(void* arg);
static void* worker(void* arg);
void ExitThread();
public:
ThreadPool(int min, int max);
~ThreadPool();
int Stop();
void InsertTask(Task task);
void InsertTask(void(*func)(void*), void* arg);
int GetBusyNum();
int GetLiveNum();
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/* TaskQueue.h */
#pragma once
#include <queue>
#include <pthread.h>

struct Task {
void (*func)(void* arg);
void* arg;
Task() {
func = nullptr;
arg = nullptr;
}
Task(void (*func)(void* arg), void* arg) {
this->func = func;
this->arg = arg;
}
};

class TaskQueue
{
private:
std::queue<Task> m_TaskQueue;
pthread_mutex_t m_Mutex;
public:
TaskQueue();
~TaskQueue();
void AddTask(Task task);
void AddTask(void (*func)(void* arg), void* arg);
Task GetTask();
inline int TaskNumber() {
return m_TaskQueue.size();
}
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/* TaskQueue.cpp */
#include "TaskQueue.h"

TaskQueue::TaskQueue()
{
pthread_mutex_init(&m_Mutex, NULL);
}

TaskQueue::~TaskQueue()
{
pthread_mutex_destroy(&m_Mutex);
}

void TaskQueue::AddTask(Task task)
{
pthread_mutex_lock(&m_Mutex);
m_TaskQueue.push(task);
pthread_mutex_unlock(&m_Mutex);
}

void TaskQueue::AddTask(void (*func)(void* arg), void* arg)
{
pthread_mutex_lock(&m_Mutex);
m_TaskQueue.push(Task(func, arg));
pthread_mutex_unlock(&m_Mutex);
}

Task TaskQueue::GetTask()
{
Task task;
pthread_mutex_lock(&m_Mutex);
if (m_TaskQueue.empty() == false) {
task = m_TaskQueue.front();
m_TaskQueue.pop();
}
pthread_mutex_unlock(&m_Mutex);
return task;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/* TaskPool.cpp */
#include <iostream>
#include "ThreadPool.h"
#include <string.h>

ThreadPool::ThreadPool(int min, int max)
{
do {
m_TaskQueue = new TaskQueue;
if (m_TaskQueue == nullptr) break;
workers = new pthread_t[max];
if (workers == nullptr) {
std::cout << "new workers fail.." << std::endl;
break;
}
memset(workers, 0, sizeof(pthread_t) * max);
minimum = min;
maximum = max;
busyNum = 0;
liveNum = 0;
exitNum = 0;

int res = pthread_mutex_init(&m_Mutex, NULL);
res |= pthread_cond_init(&m_EmptyCond, NULL);
if (res != 0) {
std::cout << "mutex or cond init fail..." << std::endl;
break;
}

stop = false;

pthread_create(&managerID, NULL, manager, this);
for (int i = 0; i < min; i++) {
pthread_create(&workers[i], NULL, worker, this);
}
} while (0);

if (workers) delete[] workers;
if (m_TaskQueue) delete m_TaskQueue;
}

ThreadPool::~ThreadPool()
{
}

void* ThreadPool::worker(void* arg)
{
ThreadPool* pool = static_cast<ThreadPool*>(arg);
while (true) {
pthread_mutex_lock(&pool->m_Mutex);
while (pool->m_TaskQueue->TaskNumber() == 0 && !pool->stop) {
pthread_cond_wait(&pool->m_EmptyCond, &pool->m_Mutex);
if (pool->exitNum > 0) {
pool->exitNum--;
if (pool->liveNum) {
pool->liveNum--;
pthread_mutex_unlock(&pool->m_Mutex);
pool->ExitThread();
}
}
}
if (pool->stop) {
pthread_mutex_unlock(&pool->m_Mutex);
pool->ExitThread();
}
pthread_mutex_unlock(&pool->m_Mutex);
}
}