定义一个线程池的结构体,先确定线程池大概的组成部分:任务队列(tasks) 、工作线程(workers) 、管理者(manager) 。然后围绕这三个概念去补充就好了。
任务其实就是在线程中要被调用的函数,所以具体的定义如下:
1 2 3 4 struct task { void * (*func)(void * arg); void * arg; };
而线程池中的 任务队列 顾名思义就是由一系列我们要做的任务组成的队列,所以在线程池中定义任务队列时还需要定义一些队列相关的变量
1 2 3 4 5 struct task * taskQueue ;int queueCapacity; int queueSize; int queueFront; int queueRear;
在线程池中的 工作线程 应该不止一个,所以我们需要一个数组来存放这些工作线程,同时,还需要一些变量来描述这个数组,以便 管理者 线程 更好的管理这个数组。
1 2 3 4 5 6 7 8 pthread_t * workers;int minimum; int maximum; int livenum; int busynum; int exitnum; pthread_t manager;
管理者 在线程池中的定义其实还算简单,它就是一个线程,我们去实现它就是去写好一个函数,线程池的核心就是这个管理者函数的运行。
具体实现 C语言版 1 2 3 4 5 6 7 8 9 #ifndef _THREADPOOL_H #define _THREADPOOL_H typedef struct task Task ;typedef struct threadpool ThreadPool ;#endif
int maximum; int busynum; int livenum; int exitnum; pthread_mutex_t poolMutex; pthread_mutex_t busyMutex; pthread_cond_t emptyCond; pthread_cond_t fullCond; int stop; }; void suicide (ThreadPool* pool) { pthread_t tid = pthread_self(); for (int i = 0 ; i < pool->maximum; i++) { if (tid == pool->workers[i]) { pool->workers[i] = 0 ; printf ("worker: %d exit\n" , tid); break ; } } pthread_exit(NULL ); } void * worker (void * arg) { ThreadPool* pool = (ThreadPool*)arg; while (1 ) { pthread_mutex_lock(&pool->poolMutex); while (pool->queueSize == 0 && !pool->stop) { pthread_cond_wait(&pool->emptyCond, &pool->poolMutex); if (pool->exitnum > 0 ) { pool->exitnum--; pool->livenum--; pthread_mutex_unlock(&pool->poolMutex); suicide(pool); } } if (pool->stop) { pthread_mutex_unlock(&pool->poolMutex); suicide(pool); } Task task; task.func = pool->taskQueue[pool->queueFront].func; task.arg = pool->taskQueue[pool->queueFront].arg; pool->queueFront = (pool->queueFront + 1 ) % pool->queueCapacity; pool->queueSize--; pthread_cond_signal(&pool->fullCond); pthread_mutex_unlock(&pool->poolMutex); pthread_mutex_lock(&pool->busyMutex); pool->busynum++; pthread_mutex_unlock(&pool->busyMutex); task.func(task.arg); free (task.arg); task.arg = NULL ; pthread_mutex_lock(&pool->busyMutex); pool->busynum--; pthread_mutex_unlock(&pool->busyMutex); sleep(3 ); } return NULL ; } void * manager (void * arg) { ThreadPool* pool = (ThreadPool*)arg; while (!pool->stop) { pthread_mutex_lock(&pool->poolMutex); int taskNum = pool->queueSize; int workerNum = pool->livenum; int workingNum = pool->busynum; if (taskNum > workerNum && workerNum < pool->maximum) { int cnt = 0 ; for (int i = 0 ; workerNum < pool->maximum && i < pool->maximum && cnt < ADDWORKER; i++) { if (pool->workers[i] == 0 ) { pthread_create(&pool->workers[i], NULL , worker, pool); cnt++; workerNum++, pool->livenum++; } } } if (workingNum * 2 < workerNum && workerNum > pool->minimum) { pool->exitnum++; pthread_cond_signal(&pool->emptyCond); } pthread_mutex_unlock(&pool->poolMutex); } } ThreadPool* creadThreadPool (int max, int min, int taskCapacity) { ThreadPool* pool = (ThreadPool*)malloc (sizeof (ThreadPool)); if (pool == NULL ) { perror("线程池malloc失败" ); return NULL ; } pool->workers = (pthread_t *)malloc (sizeof (pthread_t ) * max); if (pool->workers == NULL ) { free (pool); perror("工作线程malloc失败" ); return NULL ; } memset (pool->workers, 0 , sizeof (pthread_t ) * max); pool->minimum = min; pool->maximum = max; pool->busynum = 0 ; pool->livenum = min; pool->exitnum = 0 ; int res = 0 ; res |= pthread_mutex_init(&pool->poolMutex, NULL ); res |= pthread_mutex_init(&pool->busyMutex, NULL ); res |= pthread_cond_init(&pool->emptyCond, NULL ); res |= pthread_cond_init(&pool->fullCond, NULL ); if (res != 0 ) { free (pool->workers); free (pool); perrof("互斥条件初始化失败" ); return NULL ; } pool->taskQueue = (Task*)malloc (sizeof (Task) * taskCapacity); pool->queueCapacity = taskCapacity; pool->queueSize = 0 ; pool->queueFront = 0 ; pool->queueRear = 0 ; pthread_create(&pool->manager, NULL , manager, NULL ); for (int i = 0 ; i < min; i++) { pthread_create(&pool->workers[i], NULL , worker, pool); } pool->stop = 0 ; return pool; } void insertTask (ThreadPool* pool, void (*func)(void *), void * arg) { pthread_mutex_lock(&pool->poolMutex); while (pool->queueSize == pool->queueCapacity && !pool->stop) { pthread_cond_wait(&pool->fullCond, &pool->poolMutex); } if (pool->stop) { pthread_mutex_unlock(&pool->poolMutex); return ; } pool->taskQueue[pool->queueRear].func = func; pool->taskQueue[pool->queueRear].arg = arg; pool->queueRear = (pool->queueRear + 1 ) % pool->queueCapacity; pool->queueSize++; pthread_cond_signal(&pool->emptyCond); pthread_mutex_unlock(&pool->poolMutex); } int freeThreadPool (ThreadPool* pool) { if (pool == NULL ) return -1 ; pool->stop = 1 ; pthread_join(pool->manager, NULL ); pthread_cond_broadcast(&pool->emptyCond); for (int i = 0 ; i < pool->maximum; i++) { if (pool->workers[i] != 0 ) pthread_join(pool->workers[i], NULL ); } pthread_cond_broadcast(&pool->fullCond); if (pool->taskQueue) free (pool->taskQueue); if (pool->workers) free (pool->workers); pthread_mutex_destroy(&pool->poolMutex); pthread_mutex_destroy(&pool->busyMutex); pthread_cond_destroy(&pool->emptyCond); pthread_cond_destroy(&pool->fullCond); free (pool); pool = NULL ; return 0 ; }
C++版 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 #pragma once #include "TaskQueue.h" class ThreadPool { private : TaskQueue* m_TaskQueue; pthread_t managerID; pthread_t * workers; int minimum; int maximum; int busyNum; int liveNum; int exitNum; pthread_mutex_t m_Mutex; pthread_cond_t m_EmptyCond; bool stop; private : static void * manager (void * arg) ; static void * worker (void * arg) ; void ExitThread () ; public : ThreadPool (int min, int max); ~ThreadPool (); int Stop () ; void InsertTask (Task task) ; void InsertTask (void (*func)(void *), void * arg) ; int GetBusyNum () ; int GetLiveNum () ; };
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 #pragma once #include <queue> #include <pthread.h> struct Task { void (*func)(void * arg); void * arg; Task () { func = nullptr ; arg = nullptr ; } Task (void (*func)(void * arg), void * arg) { this ->func = func; this ->arg = arg; } }; class TaskQueue { private : std::queue<Task> m_TaskQueue; pthread_mutex_t m_Mutex; public : TaskQueue (); ~TaskQueue (); void AddTask (Task task) ; void AddTask (void (*func)(void * arg), void * arg) ; Task GetTask () ; inline int TaskNumber () { return m_TaskQueue.size (); } };
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 #include "TaskQueue.h" TaskQueue::TaskQueue () { pthread_mutex_init (&m_Mutex, NULL ); } TaskQueue::~TaskQueue () { pthread_mutex_destroy (&m_Mutex); } void TaskQueue::AddTask (Task task) { pthread_mutex_lock (&m_Mutex); m_TaskQueue.push (task); pthread_mutex_unlock (&m_Mutex); } void TaskQueue::AddTask (void (*func)(void * arg), void * arg) { pthread_mutex_lock (&m_Mutex); m_TaskQueue.push (Task (func, arg)); pthread_mutex_unlock (&m_Mutex); } Task TaskQueue::GetTask () { Task task; pthread_mutex_lock (&m_Mutex); if (m_TaskQueue.empty () == false ) { task = m_TaskQueue.front (); m_TaskQueue.pop (); } pthread_mutex_unlock (&m_Mutex); return task; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 #include <iostream> #include "ThreadPool.h" #include <string.h> ThreadPool::ThreadPool (int min, int max) { do { m_TaskQueue = new TaskQueue; if (m_TaskQueue == nullptr ) break ; workers = new pthread_t [max]; if (workers == nullptr ) { std::cout << "new workers fail.." << std::endl; break ; } memset (workers, 0 , sizeof (pthread_t ) * max); minimum = min; maximum = max; busyNum = 0 ; liveNum = 0 ; exitNum = 0 ; int res = pthread_mutex_init (&m_Mutex, NULL ); res |= pthread_cond_init (&m_EmptyCond, NULL ); if (res != 0 ) { std::cout << "mutex or cond init fail..." << std::endl; break ; } stop = false ; pthread_create (&managerID, NULL , manager, this ); for (int i = 0 ; i < min; i++) { pthread_create (&workers[i], NULL , worker, this ); } } while (0 ); if (workers) delete [] workers; if (m_TaskQueue) delete m_TaskQueue; } ThreadPool::~ThreadPool () { } void * ThreadPool::worker (void * arg) { ThreadPool* pool = static_cast <ThreadPool*>(arg); while (true ) { pthread_mutex_lock (&pool->m_Mutex); while (pool->m_TaskQueue->TaskNumber () == 0 && !pool->stop) { pthread_cond_wait (&pool->m_EmptyCond, &pool->m_Mutex); if (pool->exitNum > 0 ) { pool->exitNum--; if (pool->liveNum) { pool->liveNum--; pthread_mutex_unlock (&pool->m_Mutex); pool->ExitThread (); } } } if (pool->stop) { pthread_mutex_unlock (&pool->m_Mutex); pool->ExitThread (); } pthread_mutex_unlock (&pool->m_Mutex); } }