定义一个线程池的结构体,先确定线程池大概的组成部分:任务队列(tasks) 、工作线程(workers) 、管理者(manager) 。然后围绕这三个概念去补充就好了。
任务其实就是在线程中要被调用的函数,所以具体的定义如下:
1 2 3 4 struct task { void * (*func)(void * arg); void * arg; };
而线程池中的 任务队列 顾名思义就是由一系列我们要做的任务组成的队列,所以在线程池中定义任务队列时还需要定义一些队列相关的变量
1 2 3 4 5 struct task * taskQueue ;int queueCapacity; int queueSize; int queueFront; int queueRear;
在线程池中的 工作线程 应该不止一个,所以我们需要一个数组来存放这些工作线程,同时,还需要一些变量来描述这个数组,以便 管理者 线程 更好的管理这个数组。
1 2 3 4 5 6 7 8 pthread_t * workers;int minimum; int maximum; int livenum; int busynum; int exitnum; pthread_t manager;
管理者 在线程池中的定义其实还算简单,它就是一个线程,我们去实现它就是去写好一个函数,线程池的核心就是这个管理者函数的运行。
具体实现 C语言版 1 2 3 4 5 6 7 8 9 #ifndef _THREADPOOL_H #define _THREADPOOL_H typedef struct task Task ;typedef struct threadpool ThreadPool ;#endif
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 int maximum; int busynum; int livenum; int exitnum; pthread_mutex_t poolMutex; pthread_mutex_t busyMutex; pthread_cond_t emptyCond; pthread_cond_t fullCond; int stop; }; void suicide (ThreadPool* pool) { pthread_t tid = pthread_self(); for (int i = 0 ; i < pool->maximum; i++) { if (tid == pool->workers[i]) { pool->workers[i] = 0 ; printf ("worker: %d exit\n" , tid); break ; } } pthread_exit(NULL ); } void * worker (void * arg) { ThreadPool* pool = (ThreadPool*)arg; while (1 ) { pthread_mutex_lock(&pool->poolMutex); while (pool->queueSize == 0 && !pool->stop) { pthread_cond_wait(&pool->emptyCond, &pool->poolMutex); if (pool->exitnum > 0 ) { pool->exitnum--; pool->livenum--; pthread_mutex_unlock(&pool->poolMutex); suicide(pool); } } if (pool->stop) { pthread_mutex_unlock(&pool->poolMutex); suicide(pool); } Task task; task.func = pool->taskQueue[pool->queueFront].func; task.arg = pool->taskQueue[pool->queueFront].arg; pool->queueFront = (pool->queueFront + 1 ) % pool->queueCapacity; pool->queueSize--; pthread_cond_signal(&pool->fullCond); pthread_mutex_unlock(&pool->poolMutex); pthread_mutex_lock(&pool->busyMutex); pool->busynum++; pthread_mutex_unlock(&pool->busyMutex); task.func(task.arg); free (task.arg); task.arg = NULL ; pthread_mutex_lock(&pool->busyMutex); pool->busynum--; pthread_mutex_unlock(&pool->busyMutex); sleep(3 ); } return NULL ; } void * manager (void * arg) { ThreadPool* pool = (ThreadPool*)arg; while (!pool->stop) { pthread_mutex_lock(&pool->poolMutex); int taskNum = pool->queueSize; int workerNum = pool->livenum; int workingNum = pool->busynum; if (taskNum > workerNum && workerNum < pool->maximum) { int cnt = 0 ; for (int i = 0 ; workerNum < pool->maximum && i < pool->maximum && cnt < ADDWORKER; i++) { if (pool->workers[i] == 0 ) { pthread_create(&pool->workers[i], NULL , worker, pool); cnt++; workerNum++, pool->livenum++; } } } if (workingNum * 2 < workerNum && workerNum > pool->minimum) { pool->exitnum++; pthread_cond_signal(&pool->emptyCond); } pthread_mutex_unlock(&pool->poolMutex); } } ThreadPool* creadThreadPool (int max, int min, int taskCapacity) { ThreadPool* pool = (ThreadPool*)malloc (sizeof (ThreadPool)); if (pool == NULL ) { perror("线程池malloc失败" ); return NULL ; } pool->workers = (pthread_t *)malloc (sizeof (pthread_t ) * max); if (pool->workers == NULL ) { free (pool); perror("工作线程malloc失败" ); return NULL ; } memset (pool->workers, 0 , sizeof (pthread_t ) * max); pool->minimum = min; pool->maximum = max; pool->busynum = 0 ; pool->livenum = min; pool->exitnum = 0 ; int res = 0 ; res |= pthread_mutex_init(&pool->poolMutex, NULL ); res |= pthread_mutex_init(&pool->busyMutex, NULL ); res |= pthread_cond_init(&pool->emptyCond, NULL ); res |= pthread_cond_init(&pool->fullCond, NULL ); if (res != 0 ) { free (pool->workers); free (pool); perrof("互斥条件初始化失败" ); return NULL ; } pool->taskQueue = (Task*)malloc (sizeof (Task) * taskCapacity); pool->queueCapacity = taskCapacity; pool->queueSize = 0 ; pool->queueFront = 0 ; pool->queueRear = 0 ; pthread_create(&pool->manager, NULL , manager, NULL ); for (int i = 0 ; i < min; i++) { pthread_create(&pool->workers[i], NULL , worker, pool); } pool->stop = 0 ; return pool; } void insertTask (ThreadPool* pool, void (*func)(void *), void * arg) { pthread_mutex_lock(&pool->poolMutex); while (pool->queueSize == pool->queueCapacity && !pool->stop) { pthread_cond_wait(&pool->fullCond, &pool->poolMutex); } if (pool->stop) { pthread_mutex_unlock(&pool->poolMutex); return ; } pool->taskQueue[pool->queueRear].func = func; pool->taskQueue[pool->queueRear].arg = arg; pool->queueRear = (pool->queueRear + 1 ) % pool->queueCapacity; pool->queueSize++; pthread_cond_signal(&pool->emptyCond); pthread_mutex_unlock(&pool->poolMutex); } int freeThreadPool (ThreadPool* pool) { if (pool == NULL ) return -1 ; pool->stop = 1 ; pthread_join(pool->manager, NULL ); pthread_cond_broadcast(&pool->emptyCond); for (int i = 0 ; i < pool->maximum; i++) { if (pool->workers[i] != 0 ) pthread_join(pool->workers[i], NULL ); } pthread_cond_broadcast(&pool->fullCond); if (pool->taskQueue) free (pool->taskQueue); if (pool->workers) free (pool->workers); pthread_mutex_destroy(&pool->poolMutex); pthread_mutex_destroy(&pool->busyMutex); pthread_cond_destroy(&pool->emptyCond); pthread_cond_destroy(&pool->fullCond); free (pool); pool = NULL ; return 0 ; }
C++版 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 #pragma once #include "TaskQueue.h" class ThreadPool { private : TaskQueue* m_TaskQueue; pthread_t managerID; pthread_t * workers; int minimum; int maximum; int busyNum; int liveNum; int exitNum; pthread_mutex_t m_Mutex; pthread_cond_t m_EmptyCond; bool stop; private : static void * manager (void * arg) ; static void * worker (void * arg) ; void ExitThread () ; public : ThreadPool (int min, int max); ~ThreadPool (); int Stop () ; void InsertTask (Task task) ; void InsertTask (void (*func)(void *), void * arg) ; int GetBusyNum () ; int GetLiveNum () ; };
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 #pragma once #include <queue> #include <pthread.h> struct Task { void (*func)(void * arg); void * arg; Task () { func = nullptr ; arg = nullptr ; } Task (void (*func)(void * arg), void * arg) { this ->func = func; this ->arg = arg; } }; class TaskQueue { private : std::queue<Task> m_TaskQueue; pthread_mutex_t m_Mutex; public : TaskQueue (); ~TaskQueue (); void AddTask (Task task) ; void AddTask (void (*func)(void * arg), void * arg) ; Task GetTask () ; inline int TaskNumber () { return m_TaskQueue.size (); } };
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 #include "TaskQueue.h" TaskQueue::TaskQueue () { pthread_mutex_init (&m_Mutex, NULL ); } TaskQueue::~TaskQueue () { pthread_mutex_destroy (&m_Mutex); } void TaskQueue::AddTask (Task task) { pthread_mutex_lock (&m_Mutex); m_TaskQueue.push (task); pthread_mutex_unlock (&m_Mutex); } void TaskQueue::AddTask (void (*func)(void * arg), void * arg) { pthread_mutex_lock (&m_Mutex); m_TaskQueue.push (Task (func, arg)); pthread_mutex_unlock (&m_Mutex); } Task TaskQueue::GetTask () { Task task; pthread_mutex_lock (&m_Mutex); if (m_TaskQueue.empty () == false ) { task = m_TaskQueue.front (); m_TaskQueue.pop (); } pthread_mutex_unlock (&m_Mutex); return task; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 #include <iostream> #include "ThreadPool.h" #include <string.h> ThreadPool::ThreadPool (int min, int max) { do { m_TaskQueue = new TaskQueue; if (m_TaskQueue == nullptr ) break ; workers = new pthread_t [max]; if (workers == nullptr ) { std::cout << "new workers fail.." << std::endl; break ; } memset (workers, 0 , sizeof (pthread_t ) * max); minimum = min; maximum = max; busyNum = 0 ; liveNum = 0 ; exitNum = 0 ; int res = pthread_mutex_init (&m_Mutex, NULL ); res |= pthread_cond_init (&m_EmptyCond, NULL ); if (res != 0 ) { std::cout << "mutex or cond init fail..." << std::endl; break ; } stop = false ; pthread_create (&managerID, NULL , manager, this ); for (int i = 0 ; i < min; i++) { pthread_create (&workers[i], NULL , worker, this ); } } while (0 ); if (workers) delete [] workers; if (m_TaskQueue) delete m_TaskQueue; } ThreadPool::~ThreadPool () { } void * ThreadPool::worker (void * arg) { ThreadPool* pool = static_cast <ThreadPool*>(arg); while (true ) { pthread_mutex_lock (&pool->m_Mutex); while (pool->m_TaskQueue->TaskNumber () == 0 && !pool->stop) { pthread_cond_wait (&pool->m_EmptyCond, &pool->m_Mutex); if (pool->exitNum > 0 ) { pool->exitNum--; if (pool->liveNum) { pool->liveNum--; pthread_mutex_unlock (&pool->m_Mutex); pool->ExitThread (); } } } if (pool->stop) { pthread_mutex_unlock (&pool->m_Mutex); pool->ExitThread (); } pthread_mutex_unlock (&pool->m_Mutex); } }