4 线程池
约 2828 字大约 9 分钟
2025-06-15
如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。
使得线程可以复用,执行完一个任务,并不被销毁,可以继续执行其他的任务
多线程的 " 惊群效应 " ,当很多线程因为条件变量而阻塞时,倘若生产者生产出一个资源可以被消费者消费,这时发出信号解除多线程的阻塞,但其中只有一个线程可以拿到资源,其他线程被唤醒后还要接着阻塞. 其中线程被唤醒而又被阻塞就会带来不必要的cpu性能的消耗,因此大多时需要配合多反应堆模型减少惊群效应的性能消耗
原理
线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。
线程池的组成主要分为3个部分,这三部分配合工作就可以得到一个完整的线程池:
- 任务队列,存储需要处理的任务,由工作的线程来处理这些任务
- 通过线程池提供的API函数,将一个待处理的任务添加到任务队列,或者从任务队列中删除
- 已处理的任务会被从任务队列中删除
- 线程池的使用者,也就是调用线程池函数往任务队列中添加任务的线程就是生产者线程
- 工作的线程(任务队列任务的消费者) ,N个
- 线程池中维护了一定数量的工作线程, 他们的作用是是不停的读任务队列, 从里边取出任务并处理
- 工作的线程相当于是任务队列的消费者角色,
- 如果任务队列为空, 工作的线程将会被阻塞 (使用条件变量/信号量阻塞)
- 如果阻塞之后有了新的任务, 由生产者将阻塞解除, 工作线程开始工作
- 管理者线程(不处理任务队列中的任务),1个
- 它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测
- 当任务过多的时候, 可以适当的创建一些新的工作线程
- 当任务过少的时候, 可以适当的销毁一些工作的线程
Glib库
data任务,而user_data是在创建线程池时传入的共享数据对于每个任务都是一样的
typedef void(*GFunc)(gpointer data, gpointer user_data);
- 创建线程池
- func:线程执行的函数
- user_data:传递给func的数据,可以为NULL,最终会存储在GThreadPool结构体的user_data属性中
- max_threads:线程池中最大线程数
- exclusive:独占标记位。决定当前的线程池独占所有的线程还是与其它线程池共享这些线程。取值可以是TRUE或FALSE TRUE:立即启动数量为max_threads的线程,且启动的线程只能被当前线程池使用 FALSE:只有在需要时,即需要执行任务时才创建线程,且线程可以被多个非独享资源的线程池共用
- error:用于报告错误信息,可以是NULL,表示忽略错误
- return:无论是否发生错误,都会返回有效的线程池对象
GThreadPool* g_thread_pool_new(GFunc func, gpointer user_data, gint max_threads, gboolean exclusive, GError** error);
- 添加任务 向pool指向的线程池实例添加任务。当存在可用线程时任务立即执行,否则任务数据会一直待在队列中,直至腾出可用线程执行任务
- pool指向线程池实例的指针
- data传递给每个任务的独享数据
- error:错误信息
- return:成功返回ture,失败返回false
gboolean g_thread_pool_push(GThreadPool* pool,gpointer data,GError** error);
- 释放线程池 释放pool指向的线程池分配的所有资源
- pool:线程池指针
- immediate:是否立即释放线程池 TRUE:立即释放所有资源,未处理的任务不被处理 FALSE:在最后一个任务执行完毕之前,线程池不会被释放 在释放线程池时,线程池的任何一个线程都不会被打断。无论这个参数是何取值,都可以保证线程池释放前正在运行的线程可以完成它们的任务。
- wait_:当前函数是否阻塞等待所有任务完成 TRUE:所有需要处理的任务执行完毕当前函数才会返回 FALSE:当前函数立即返回
void g_thread_pool_free(GThreadPool* pool,gboolean immediate,gboolean wait_)
示例:
#include "threadpool.h"
#include <pthread.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
typedef struct Task {
void* (*func)(void*);
void* arg;
} Task;
typedef struct TaskQueue {
Task* queue; // 任务队列
int capacity; // 队列容量
int size; // 队列大小
int front; // 队头
int rear; // 队尾
} TaskQueue;
typedef struct ThreadPool {
TaskQueue* taskqueue; // 任务队列
pthread_t manager; // 管理者线程
pthread_t* threads; // 工作线程
int min; // 最小线程数
int max; // 最大线程数
int busy; // 当前工作的线程数
int live; // 当前存活的线程数
int exit; // 要销毁的线程数
pthread_mutex_t mutexPool; // 锁线程池
pthread_mutex_t mutexBusy; // 锁busy
pthread_cond_t notEmpty; // 判定任务队列是否满
pthread_cond_t notFull; // 判定任务队列是否空
bool shutdown; // 线程池是否销毁
} ThreadPool;
/**
* @brief 创建任务队列
* @param queueSize 任务队列的容量
* @return 任务队列
*/
TaskQueue* taskQueueCreate(int queueSize) {
TaskQueue* taskQueue = (TaskQueue*)malloc(sizeof(TaskQueue));
if (taskQueue == NULL) {
return NULL;
}
taskQueue->queue = (Task*)malloc(sizeof(Task) * queueSize);
if (taskQueue->queue == NULL) {
return NULL;
}
taskQueue->capacity = queueSize;
taskQueue->size = 0;
taskQueue->front = 0;
taskQueue->rear = 0;
return taskQueue;
}
/**
* @brief 销毁任务队列
* @param queue 任务队列
* @return 0 成功,-1 失败
*/
int taskQueueDestroy(TaskQueue* queue) {
if (queue == NULL) {
return -1;
}
free(queue->queue);
queue->queue = NULL;
return 0;
}
/**
* @brief 创建线程池
* @param min 最小线程数
* @param max 最大线程数
* @param queueSize 任务队列的容量
* @return 线程池
*/
ThreadPool* threadPoolCreate(int min, int max, int queueSize) {
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
do {
if (pool == NULL) {
perror("malloc threadpool fail");
break;
}
pool->taskqueue = taskQueueCreate(queueSize);
if (pool->taskqueue == NULL) {
perror("malloc taskqueue fail");
break;
}
pool->threads = (pthread_t*)malloc(sizeof(pthread_t) * max);
if (pool->threads == NULL) {
perror("malloc threads fail");
break;
}
pool->min = min;
pool->max = max;
pool->busy = 0;
pool->live = min;
pool->exit = 0;
memset(pool->threads, 0, sizeof(pthread_t) * max);
if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
pthread_cond_init(&pool->notFull, NULL) != 0) {
perror("init mutex or cond fail");
break;
}
pool->shutdown = false;
pthread_create(&pool->manager, NULL, manager, pool);
for (int i = 0; i < min; i++) {
pthread_create(&pool->threads[i], NULL, worker, pool);
}
return pool;
} while (0);
if (pool) {
free(pool);
if (pool->taskqueue)
free(pool->taskqueue);
if (pool->threads)
free(pool->threads);
}
return NULL;
}
/**
* @brief 销毁线程池
* @param pool 要销毁的线程池对象
* @return 0 成功,-1 失败
*/
int threadPoolDestroy(ThreadPool* pool) {
if (pool == NULL) {
return -1;
}
pool->shutdown = true;
// 阻塞回收管理者线程
pthread_join(pool->manager, NULL);
// 唤醒阻塞的消费者线程
for (int i = 0; i < pool->live; ++i) {
pthread_cond_signal(&pool->notEmpty);
}
if (pool->taskqueue) {
taskQueueDestroy(pool->taskqueue);
free(pool->taskqueue);
pool->taskqueue = NULL;
}
if (pool->threads) {
free(pool->threads);
pool->threads = NULL;
}
pthread_mutex_destroy(&pool->mutexPool);
pthread_mutex_destroy(&pool->mutexBusy);
pthread_cond_destroy(&pool->notEmpty);
pthread_cond_destroy(&pool->notFull);
free(pool);
pool = NULL;
return 0;
}
/**
* @brief 添加任务
* @param pool 线程池对象
* @param func 任务函数
* @param arg 任务函数的参数
* @return 0 成功,-1 失败
* @note 线程池销毁时,会等待任务队列中的任务全部执行完毕再销毁
*/
void threadPoolAdd(ThreadPool* pool, void* (*func)(void*), void* arg) {
pthread_mutex_lock(&pool->mutexPool);
while (pool->shutdown != false && pool->taskqueue->size == pool->taskqueue->capacity) {
pthread_cond_wait(&pool->notFull, &pool->mutexPool);
}
if (pool->shutdown == false) {
pthread_mutex_unlock(&pool->mutexPool);
return;
} else {
pool->taskqueue->queue[pool->taskqueue->rear] = (Task){func, arg};
// 做成循环队列
pool->taskqueue->rear = (pool->taskqueue->rear + 1) % (pool->taskqueue->capacity);
pool->taskqueue->size++;
pthread_cond_signal(&pool->notEmpty);
pthread_mutex_unlock(&pool->mutexPool);
}
}
/**
* @brief 获取当前工作的线程数
* @param pool 线程池对象
* @return 当前工作的线程数
* @note 线程池销毁时,会等待任务队列中的任务全部执行完毕再销毁
*/
int threadPoolBusyNum(ThreadPool* pool) {
pthread_mutex_lock(&pool->mutexBusy);
int busy = pool->busy;
pthread_mutex_unlock(&pool->mutexBusy);
return busy;
}
/**
* @brief 获取当前存活的线程数
* @param pool 线程池对象
* @return 当前存活的线程数
* @note 线程池销毁时,会等待任务队列中的任务全部执行完毕再销毁
*/
int threadPoolAliveNum(ThreadPool* pool) {
pthread_mutex_lock(&pool->mutexPool);
int alive = pool->live;
pthread_mutex_unlock(&pool->mutexPool);
return alive;
}
/**
* @brief 工作线程
* @param arg 线程池对象
* @return NULL
* @note 线程池销毁时,会等待任务队列中的任务全部执行完毕再销毁
*/
void* worker(void* arg) {
ThreadPool* pool = (ThreadPool*)arg;
while (1) {
pthread_mutex_lock(&pool->mutexPool);
while (pool->shutdown != false && pool->taskqueue->size == 0) {
pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
// 判断线程是否过多需要销毁线程
if (pool->exit > 0) {
pool->exit--;
if (pool->exit > pool->min) {
pool->live--;
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
}
}
if (pool->shutdown) {
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
} else {
Task task = pool->taskqueue->queue[pool->taskqueue->front];
pool->taskqueue->front = (pool->taskqueue->front + 1) % (pool->taskqueue->capacity);
pool->taskqueue->size--;
pthread_mutex_unlock(&pool->mutexPool);
printf("thread %ld start working\n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busy++;
pthread_mutex_unlock(&pool->mutexBusy);
task.func(task.arg);
// 工作完成后正在工作的线程数-1
printf("thread %ld end working\n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busy--;
pthread_mutex_unlock(&pool->mutexBusy);
}
}
}
/**
* @brief 管理线程
* @param arg 线程池对象
* @return NULL
* @note 线程池销毁时,会等待任务队列中的任务全部执行完毕再销毁
*/
void* manager(void* arg) {
ThreadPool* pool = (ThreadPool*)arg;
while (!pool->shutdown) {
sleep(3);
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->taskqueue->size;
int liveNum = pool->live;
pthread_mutex_unlock(&pool->mutexPool);
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busy;
pthread_mutex_unlock(&pool->mutexBusy);
// 线程数较小需要添加线程时
// 任务个数>存活的线程个数 && 存活的线程数<最大线程数
if (queueSize > liveNum && liveNum < pool->max) {
pthread_mutex_lock(&pool->mutexPool);
int count = 0; // 每次加2个线程
for (int i = 0; i < pool->max && count < 2 && pool->live < pool->max; i++) {
if (pool->threads[i] == 0) {
pthread_create(&pool->threads[i], NULL, worker, (void*)pool);
count++;
pool->live++;
}
}
pthread_mutex_unlock(&pool->mutexPool);
}
// 存活的线程太多需要销毁线程
// 忙的线程数*2<存活的线程个数 && 存活的线程数>最小线程数
if (busyNum * 2 < liveNum && liveNum > pool->min) {
pthread_mutex_lock(&pool->mutexPool);
pool->exit = 2; // 每次销毁两个
pthread_mutex_unlock(&pool->mutexPool);
for (int i = 0; i < 2; ++i) {
// 把阻塞的线程唤醒,唤醒后它们判断是否需要销毁线程首先进入的被销毁,共销毁两个
pthread_cond_signal(&pool->notEmpty);
}
}
}
}
/**
* @brief 销毁线程池
* @param pool 线程池对象
* @return 0 成功,-1 失败
* @note 线程池销毁时,会等待任务队列中的任务全部执行完毕再销毁
*/
void threadExit(ThreadPool* pool) {
pthread_t tid = pthread_self();
for (int i = 0; i < pool->max; i++) {
if (pool->threads[i] == tid) {
pool->threads[i] = 0;
printf("threadExit():tid=%ld\n", tid);
break;
}
}
pthread_exit(NULL);
}
贡献者
版权所有
版权归属:PinkDopeyBug