当前位置: 首页 > news >正文

线程池666666

1. 作用

线程池内部维护了多个工作线程,每个工作线程都会去任务队列中拿取任务并执行,当执行完一个任务后不是马上销毁,而是继续保留执行其它任务。显然,线程池提高了多线程的复用率,减少了创建和销毁线程的时间。

2. 实现原理

线程池内部由任务队列、工作线程和管理者线程组成。

任务队列:存储需要处理的任务。每个任务其实就是具体的函数,在任务队列中存储函数指针和对应的实参。当工作线程获取任务后,就能根据函数指针来调用指定的函数。其实现可以是数组、链表、STL容器等。

工作线程:有N个工作线程,每个工作线程会去任务队列中拿取任务,然后执行具体的任务。当任务被处理后,任务队列中就不再有该任务了。当任务队列中没有任务时,工作线程就会阻塞。

管理者线程:周期性检测忙碌的工作线程数量和任务数量。当任务较多线程不够用时,管理者线程就会多创建几个工作线程来加快处理(不会超过工作线程数量的上限)。当任务较少线程空闲多时,管理者线程就会销毁几个工作线程来减少内存占用(不会低于工作线程数量的下限)。

注意:线程池中没有维护“生产者线程”,所谓的“生产者线程”就是往任务队列中添加任务的线程。

3. 手撕线程池

参考来源:爱编程的大丙。

【1】threadpool.c:

#include "threadpool.h"
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stdio.h>#define NUMBER	2	//管理者线程增加或减少的工作线程数量//任务结构体
typedef struct Task {void (*func)(void* arg);void* arg;
} Task;//线程池结构体
struct ThreadPool {//任务队列,视为环形队列Task* taskQ;int queueCapacity;	//队列容量int queueSize;		//当前任务个数int queueFront;		//队头 -> 取任务int queueRear;		//队尾 -> 加任务//线程相关pthread_t managerID;	//管理者线程IDpthread_t* threadIDs;	//工作线程IDint minNum;				//工作线程最小数量int maxNum;				//工作线程最大数量int busyNum;			//工作线程忙的数量int liveNum;			//工作线程存活数量int exitNum;			//要销毁的工作线程数量pthread_mutex_t mutexPool;	//锁整个线程池pthread_mutex_t mutexBusy;	//锁busyNumpthread_cond_t notFull;		//任务队列是否满pthread_cond_t notEmpty;	//任务队列是否空//线程池是否销毁int shutdown;		//释放为1,否则为0
};/**************************************************************** 函  数: threadPoolCreate* 功  能: 创建线程池并初始化* 参  数: min---工作线程的最小数量*         max---工作线程的最大数量*		   capacity---任务队列的最大容量* 返回值: 创建的线程池的地址**************************************************************/
ThreadPool* threadPoolCreate(int min, int max, int capacity)
{//申请线程池空间ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));do {//此处循环只是为了便于失败释放空间,只会执行一次if (pool == NULL) {printf("pool create error!\n");break;}//申请任务队列空间,并初始化pool->taskQ = (Task*)malloc(sizeof(Task) * capacity);if (pool->taskQ == NULL) {printf("Task create error!\n");break;}pool->queueCapacity = capacity;pool->queueSize = 0;pool->queueFront = 0;pool->queueRear = 0;//初始化互斥锁和条件变量if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||pthread_cond_init(&pool->notFull, NULL) != 0 ||pthread_cond_init(&pool->notEmpty, NULL) != 0){printf("mutex or cond create error!\n");break;}//初始化shutdownpool->shutdown = 0;//初始化线程相关参数pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);if (pool->threadIDs == NULL) {printf("threadIDs create error!\n");break;}memset(pool->threadIDs, 0, sizeof(pthread_t) * max);pool->minNum = min;pool->maxNum = max;pool->busyNum = 0;pool->liveNum = min;pool->exitNum = 0;//创建管理者线程和工作线程pthread_create(&pool->managerID, NULL, manager, pool);//创建管理线程for (int i = 0; i < min; ++i) {pthread_create(&pool->threadIDs[i], NULL, worker, pool);//创建工作线程}return pool;} while (0);//申请资源失败,释放已分配的资源if (pool && pool->taskQ) free(pool->taskQ);if (pool && pool->threadIDs) free(pool->threadIDs);if (pool) free(pool);return NULL;
}/**************************************************************** 函  数: threadPoolDestroy* 功  能: 销毁线程池* 参  数: pool---要销毁的线程池* 返回值: 0表示销毁成功,-1表示销毁失败**************************************************************/
int threadPoolDestroy(ThreadPool* pool)
{if (!pool) return -1;//关闭线程池pool->shutdown = 1;//阻塞回收管理者线程pthread_join(pool->managerID, NULL);//唤醒所有工作线程,让其自杀for (int i = 0; i < pool->liveNum; ++i) {pthread_cond_signal(&pool->notEmpty);}//释放所有互斥锁和条件变量pthread_mutex_destroy(&pool->mutexBusy);pthread_mutex_destroy(&pool->mutexPool);pthread_cond_destroy(&pool->notEmpty);pthread_cond_destroy(&pool->notFull);//释放堆空间if (pool->taskQ) {free(pool->taskQ);pool->taskQ = NULL;}if (pool->threadIDs) {free(pool->threadIDs);pool->threadIDs = NULL;}free(pool);pool = NULL;return 0;
}/**************************************************************** 函  数: threadPoolAdd* 功  能: 生产者往线程池的任务队列中添加任务* 参  数: pool---线程池*		   func---函数指针,要执行的任务地址*		   arg---func指向的函数的实参* 返回值: 无**************************************************************/
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{pthread_mutex_lock(&pool->mutexPool);//任务队列满,阻塞生产者while (pool->queueSize == pool->queueCapacity && !pool->shutdown) {pthread_cond_wait(&pool->notFull, &pool->mutexPool);}//判断线程池是否关闭if (pool->shutdown) {pthread_mutex_unlock(&pool->mutexPool);return;}//添加任务进pool->taskQpool->taskQ[pool->queueRear].func = func;pool->taskQ[pool->queueRear].arg = arg;pool->queueSize++;pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;pthread_cond_signal(&pool->notEmpty);//唤醒工作线程pthread_mutex_unlock(&pool->mutexPool);
}/**************************************************************** 函  数: getThreadPoolBusyNum* 功  能: 获取线程池忙的工作线程数量* 参  数: pool---线程池* 返回值: 忙的工作线程数量**************************************************************/
int getThreadPoolBusyNum(ThreadPool* pool)
{pthread_mutex_lock(&pool->mutexBusy);int busyNum = pool->busyNum;pthread_mutex_unlock(&pool->mutexBusy);return busyNum;
}/**************************************************************** 函  数: getThreadPoolAliveNum* 功  能: 获取线程池存活的工作线程数量* 参  数: pool---线程池* 返回值: 存活的工作线程数量**************************************************************/
int getThreadPoolAliveNum(ThreadPool* pool)
{pthread_mutex_lock(&pool->mutexPool);int liveNum = pool->liveNum;pthread_mutex_unlock(&pool->mutexPool);return liveNum;
}/**************************************************************** 函  数: worker* 功  能: 工作线程的执行函数* 参  数: arg---实参传入,这里传入的是线程池* 返回值: 空指针**************************************************************/
void* worker(void* arg)
{ThreadPool* pool = (ThreadPool*)arg;while (1) {/* 1.取出任务队列中的队头任务 */pthread_mutex_lock(&pool->mutexPool);//无任务就阻塞线程while (pool->queueSize == 0 && !pool->shutdown) {pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);//唤醒后,判断是不是要销毁线程if (pool->exitNum > 0) {//线程自杀pool->exitNum--;//销毁指标-1if (pool->liveNum > pool->minNum) {pool->liveNum--;//活着的工作线程-1pthread_mutex_unlock(&pool->mutexPool);threadExit(pool);}}}//线程池关闭了就退出线程if (pool->shutdown) {pthread_mutex_unlock(&pool->mutexPool);threadExit(pool);}//取出pool中taskQ的任务Task task;task.func = pool->taskQ[pool->queueFront].func;task.arg = pool->taskQ[pool->queueFront].arg;pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;//移动队头pool->queueSize--;//通知生产者添加任务pthread_cond_signal(&pool->notFull);pthread_mutex_unlock(&pool->mutexPool);/* 2.设置pool的busyNum+1 */pthread_mutex_lock(&pool->mutexBusy);pool->busyNum++;pthread_mutex_unlock(&pool->mutexBusy);/* 3.执行取出的任务 */printf("thread %ld start working ...\n", pthread_self());task.func(task.arg);free(task.arg);task.arg = NULL;printf("thread %ld end working ...\n", pthread_self());/* 4.设置pool的busyNum-1 */pthread_mutex_lock(&pool->mutexBusy);pool->busyNum--;pthread_mutex_unlock(&pool->mutexBusy);}return NULL;
}/**************************************************************** 函  数: manager* 功  能: 管理者线程的执行函数* 参  数: arg---实参传入,这里传入的是线程池* 返回值: 空指针**************************************************************/
void* manager(void* arg)
{ThreadPool* pool = (ThreadPool*)arg;while (!pool->shutdown) {/* 每隔3秒检测一次 */sleep(3);/* 获取pool中相关变量 */pthread_mutex_lock(&pool->mutexPool);int taskNum = pool->queueSize;	//任务队列中的任务数量int liveNum = pool->liveNum;	//存活的工作线程数量int busyNum = pool->busyNum;	//忙碌的工作线程数量pthread_mutex_unlock(&pool->mutexPool);/* 功能一:增加工作线程,每次增加NUMBER个 *///当任务个数大于存活工作线程数,且存活工作线程数小于最大值if (taskNum > liveNum && liveNum < pool->maxNum) {pthread_mutex_lock(&pool->mutexPool);int counter = 0;for (int i = 0; i < pool->maxNum && counter < NUMBER&& pool->liveNum < pool->maxNum; ++i){if (pool->threadIDs[i] == 0) {pthread_create(&pool->threadIDs[i], NULL, worker, pool);counter++;pool->liveNum++;}}pthread_mutex_unlock(&pool->mutexPool);}/* 功能二:销毁工作线程,每次销毁NUMBER个 *///当忙的线程数*2 < 存活线程数,且存活线程数 > 最小线程数if (busyNum * 2 < liveNum && liveNum > pool->minNum) {pthread_mutex_lock(&pool->mutexPool);pool->exitNum = NUMBER;//唤醒NUMBER个工作线程,让其解除阻塞,在worker函数中自杀for (int i = 0; i < NUMBER; ++i) {pthread_cond_signal(&pool->notEmpty);}pthread_mutex_unlock(&pool->mutexPool);}}return NULL;
}/**************************************************************** 函  数: threadExit* 功  能: 工作线程退出函数,将工作线程的ID置为0,然后退出* 参  数: pool---线程池* 返回值: 无**************************************************************/
void threadExit(ThreadPool* pool)
{//将pool->threadIDs中的ID改为0pthread_t tid = pthread_self();for (int i = 0; i < pool->maxNum; i++) {if (pool->threadIDs[i] == tid) {pool->threadIDs[i] = 0;printf("threadExit() called, %ld exiting...\n", tid);break;}}pthread_exit(NULL);//退出
}

【2】threadpool.h:

#ifndef _THREADPOOL_H
#define _THREADPOOL_Htypedef struct ThreadPool ThreadPool;//创建线程池并初始化
ThreadPool* threadPoolCreate(int min, int max, int capacity);//销毁线程池
int threadPoolDestroy(ThreadPool* pool);//给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);//获取当前忙碌的工作线程的数量
int getThreadPoolBusyNum(ThreadPool* pool);//获取当前存活的工作线程的数量
int getThreadPoolAliveNum(ThreadPool* pool);/*********************其它函数**********************/
void* worker(void* arg);//工作线程的执行函数
void* manager(void* arg);//管理者线程的执行函数
void threadExit(ThreadPool* pool);//线程退出函数#endif

【3】main.c:

#include <stdio.h>
#include "threadpool.h"
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>//任务函数,所有线程都执行此任务
void testFunc(void* arg)
{int* num = (int*)arg;printf("thread %ld is working, number = %d\n", pthread_self(), *num);sleep(1);
}int main()
{//创建线程池: 最少3个工作线程,最多10个,任务队列容量为100ThreadPool* pool = threadPoolCreate(3, 10, 100);//加入100个任务于任务队列for (int i = 0; i < 100; ++i) {int* num = (int*)malloc(sizeof(int));*num = i + 100;threadPoolAdd(pool, testFunc, num);}//销毁线程池sleep(30);//保证任务全部运行完毕threadPoolDestroy(pool);return 0;
}

【4】运行结果:

......

相关文章:

  • Linux 生产消费者模型
  • 力扣61. 旋转链表(java)
  • panda处理数据
  • 苹果电脑虚拟机运行Windows Mac环境安装Win PD19虚拟机 parallels desktop19虚拟机安装教程免费密钥激活
  • Linux rpm与yum
  • UE5基本操作(二)
  • CAN学习笔记
  • flink的窗口
  • 实现Linux C++进程意外退出时信号处理与堆栈打印
  • 视频监控汇聚和融合平台的特点、功能、接入方式、应用场景
  • 1.4 ROS2集成开发环境搭建
  • 容器部署rabbitmq集群迁移
  • 李白的酒量之谜
  • 【Spring Boot】JPA 的查询方式
  • 【Linux】多线程(一万六千字)
  • 【Leetcode】101. 对称二叉树
  • 【399天】跃迁之路——程序员高效学习方法论探索系列(实验阶段156-2018.03.11)...
  • 08.Android之View事件问题
  • Fastjson的基本使用方法大全
  • iOS仿今日头条、壁纸应用、筛选分类、三方微博、颜色填充等源码
  • Java精华积累:初学者都应该搞懂的问题
  • learning koa2.x
  • Quartz初级教程
  • react-core-image-upload 一款轻量级图片上传裁剪插件
  • Work@Alibaba 阿里巴巴的企业应用构建之路
  • 彻底搞懂浏览器Event-loop
  • 搞机器学习要哪些技能
  • 回顾2016
  • 聚类分析——Kmeans
  • 深入体验bash on windows,在windows上搭建原生的linux开发环境,酷!
  • 一个6年java程序员的工作感悟,写给还在迷茫的你
  • 一个JAVA程序员成长之路分享
  • linux 淘宝开源监控工具tsar
  • PostgreSQL之连接数修改
  • 昨天1024程序员节,我故意写了个死循环~
  • ​软考-高级-系统架构设计师教程(清华第2版)【第9章 软件可靠性基础知识(P320~344)-思维导图】​
  • #define与typedef区别
  • #鸿蒙生态创新中心#揭幕仪式在深圳湾科技生态园举行
  • $().each和$.each的区别
  • (01)ORB-SLAM2源码无死角解析-(56) 闭环线程→计算Sim3:理论推导(1)求解s,t
  • (Matlab)基于蝙蝠算法实现电力系统经济调度
  • (Redis使用系列) Springboot 使用redis的List数据结构实现简单的排队功能场景 九
  • (创新)基于VMD-CNN-BiLSTM的电力负荷预测—代码+数据
  • (附源码)小程序 交通违法举报系统 毕业设计 242045
  • (七)glDrawArry绘制
  • (一)pytest自动化测试框架之生成测试报告(mac系统)
  • (转)大型网站架构演变和知识体系
  • (转载)从 Java 代码到 Java 堆
  • (最简单,详细,直接上手)uniapp/vue中英文多语言切换
  • . NET自动找可写目录
  • ..thread“main“ com.fasterxml.jackson.databind.JsonMappingException: Jackson version is too old 2.3.1
  • .CSS-hover 的解释
  • .java 指数平滑_转载:二次指数平滑法求预测值的Java代码
  • .NET CORE 第一节 创建基本的 asp.net core
  • .net 流——流的类型体系简单介绍