当前位置: 首页 > news >正文

线程池--简单版本和复杂版本

目录

一、引言

二、线程池头文件介绍

三、简单版本线程池

1.创建线程池

2.添加任务到线程池

3.子线程执行回调函数

4.摧毁线程池

5.简单版线程池流程分析

四、复杂版本线程池

1.结构体介绍

2.主线程

3.子线程

4.管理线程


一、引言

    多线程版服务器一个客户端就需要创建一个线程! 若客户端太多, 显然不太
合适.
    什么时候需要创建线程池呢?简单的说,如果一个应用需要频繁的创建和销
毁线程,而任务执行的时间又非常短,这样线程创建和销毁的带来的开销就不容
忽视,这时也是线程池该出场的机会了。如果线程创建和销毁时间相比任务执行
时间可以忽略不计,则没有必要使用线程池了。
    实现的时候类似于生产者和消费

线程池和任务池

线程池任务池
定义线程池是一组可重复使用的线程的集合任务池是一组待执行的任务的集合
任务管理线程池负责管理线程的生命周期,包括线程的创建、调度、执行和销毁等。务池负责管理任务的生命周期,包括任务的创建、调度、执行和销毁等
并发控制线程池可以根据需要动态调整线程的数量,可以根据系统负载、任务数量等进行调度。任务池可以根据需要动态调整任务的执行顺序和并发度,可以根据任务的优先级、依赖关系等进行调度
资源利用线程池可以重复利用线程,避免了频繁创建和销毁线程的开销,提高了系统的性能。任务池可以将多个任务分配给少量的线程执行,可以更有效地利用系统资源
使用场景线程池适用于需要管理大量并发任务的场景,例如并发计算、IO操作等任务池适用于需要管理大量独立任务的场景,例如并发请求处理、消息队列等

主线程负责向任务池添加任务,而子线程负责从任务池中取出任务并执行。任务池的作用是存放待执行的任务,每个任务都是一个结构体,其中包含了一个回调函数。尽管子线程执行的代码可以是相同的,但是它们从任务池中获取的任务元素是不同的,因此执行的回调函数也会有所不同。通过这种方式,可以实现子线程执行不同的任务,提高程序的并发性和执行效率。

线程相关函数

1. pthread_create:创建线程的函数。

2. pthread_detach:用于将线程分离。分离线程在终止时会自动释放资源,无需其他线程与之进行连接。

3. pthread_attr_t:线程属性的数据类型,用于存储线程的属性信息。

4. pthread_attr_init:用于初始化pthread_attr_t对象,将其设置为默认属性。初始化后,可以使用其他pthread_attr_*函数修改属性。

5. pthread_attr_setdetachstate:用于设置线程的分离状态属性。分离状态可以是PTHREAD_CREATE_JOINABLE(可连接状态)或PTHREAD_CREATE_DETACHED(分离状态)。可连接状态的线程可以通过pthread_join与其他线程连接,而分离状态的线程在终止时会自动释放资源。

6. pthread_exit:用于终止调用线程。可以在线程执行的任何位置调用pthread_exit。当线程调用pthread_exit时,它的资源会自动释放,并将控制返回给父线程。

互斥锁相关函数

  1. pthread_mutex_init:初始化互斥锁。使用该函数可以初始化一个互斥锁对象,设置互斥锁的属性。

  2. pthread_mutex_destroy:销毁互斥锁。使用该函数可以销毁一个互斥锁对象,释放相关资源。

  3. pthread_mutex_lock:加锁。使用该函数可以将互斥锁加锁,如果互斥锁已经被其他线程锁定,则当前线程会被阻塞,直到互斥锁被解锁。

  4. pthread_mutex_trylock:尝试加锁。使用该函数可以尝试将互斥锁加锁,如果互斥锁已经被其他线程锁定,则该函数会立即返回一个错误码。

  5. pthread_mutex_unlock:解锁。使用该函数可以将互斥锁解锁,允许其他线程对互斥锁进行加锁操作。

  6. pthread_mutexattr_init:初始化互斥锁属性对象。使用该函数可以初始化一个互斥锁属性对象,设置互斥锁的属性。

  7. pthread_mutexattr_destroy:销毁互斥锁属性对象。使用该函数可以销毁一个互斥锁属性对象,释放相关资源。

  8. pthread_mutexattr_settype:设置互斥锁类型。使用该函数可以设置互斥锁的类型,包括普通锁、递归锁等。

  9. pthread_mutexattr_gettype:获取互斥锁类型。使用该函数可以获取互斥锁的类型。

若任务池已满,主线程应该阻塞等待子线程处理任务,此时主线程需要阻塞等待

若任务池空了,子线程应该阻塞等待,等待主线程往任务池里面添加任务

pthread_cond_wait函数用于等待条件变量的信号。当一个线程调用pthread_cond_wait时,它会释放当前持有的互斥锁,并进入等待状态,直到收到与该条件变量相关的信号(通常是由其他线程调用pthread_cond_signal发送的)。一旦收到信号,该线程会重新获得互斥锁,并继续执行。

pthread_cond_signal函数用于发送条件变量的信号。当一个线程调用pthread_cond_signal时,它会唤醒等待该条件变量的一个线程(如果有多个线程在等待,则唤醒其中一个)。被唤醒的线程会重新获得互斥锁,并继续执行。

二、线程池头文件介绍

#ifndef _THREADPOOL_H
#define _THREADPOOL_H#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>typedef struct _PoolTask
{int tasknum;//模拟任务编号void *arg;//回调函数参数void (*task_func)(void *arg);//任务的回调函数
}PoolTask ;typedef struct _ThreadPool
{int max_job_num;//最大任务个数int job_num;//实际任务个数,小于等于max_job_numPoolTask *tasks;//任务队列数组  int job_push;//入队位置,在这个地方添加任务int job_pop;// 出队位置int thr_num;//线程池内线程个数pthread_t *threads;//线程池内线程数组int shutdown;//是否关闭线程池pthread_mutex_t pool_lock;//线程池的锁pthread_cond_t empty_task;//任务队列为空的条件pthread_cond_t not_empty_task;//任务队列不为空的条件}ThreadPool;   //线程池void create_threadpool(int thrnum,int maxtasknum);//创建线程池--thrnum  代表线程个数,maxtasknum 最大任务个数
void destroy_threadpool(ThreadPool *pool);//摧毁线程池
void addtask(ThreadPool *pool);//添加任务到线程池
void taskRun(void *arg);//任务回调函数#endif

三、简单版本线程池

1.创建线程池

//创建线程池
void create_threadpool(int thrnum,int maxtasknum)
{printf("begin call %s-----\n",__FUNCTION__);thrPool = (ThreadPool*)malloc(sizeof(ThreadPool));thrPool->thr_num = thrnum;thrPool->max_job_num = maxtasknum;thrPool->shutdown = 0;//是否摧毁线程池,1代表摧毁thrPool->job_push = 0;//任务队列添加的位置thrPool->job_pop = 0;//任务队列出队的位置thrPool->job_num = 0;//初始化的任务个数为0thrPool->tasks = (PoolTask*)malloc((sizeof(PoolTask)*maxtasknum));//申请最大的任务队列//初始化锁和条件变量pthread_mutex_init(&thrPool->pool_lock,NULL);pthread_cond_init(&thrPool->empty_task,NULL);pthread_cond_init(&thrPool->not_empty_task,NULL);int i = 0;thrPool->threads = (pthread_t *)malloc(sizeof(pthread_t)*thrnum);//申请n个线程id的空间pthread_attr_t attr;pthread_attr_init(&attr);pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);for(i = 0;i < thrnum;i++){pthread_create(&thrPool->threads[i],&attr,thrRun,(void*)thrPool);//创建多个线程}//printf("end call %s-----\n",__FUNCTION__);
}

 

2.添加任务到线程池

void addtask(ThreadPool *pool)
{pthread_mutex_lock(&pool->pool_lock); // 加锁,确保线程池的任务队列安全访问while(pool->max_job_num <= pool->job_num) // 当任务队列已满时,等待任务队列有空闲位置{pthread_cond_wait(&pool->empty_task,&pool->pool_lock); // 等待任务队列有空闲位置}int taskpos = (pool->job_push++)%pool->max_job_num; // 计算任务在任务队列中的位置pool->tasks[taskpos].tasknum = beginnum++; // 设置任务的编号pool->tasks[taskpos].arg = (void*)&pool->tasks[taskpos]; // 设置任务的参数pool->tasks[taskpos].task_func = taskRun; // 设置任务的函数指针pool->job_num++; // 增加任务数量pthread_mutex_unlock(&pool->pool_lock); // 解锁pthread_cond_signal(&pool->not_empty_task); // 通知线程池有新的任务可执行
}

3.子线程执行回调函数

void *thrRun(void *arg)
{//printf("begin call %s-----\n",__FUNCTION__);ThreadPool *pool = (ThreadPool*)arg;int taskpos = 0;//任务位置PoolTask *task = (PoolTask *)malloc(sizeof(PoolTask));while(1){//获取任务,先要尝试加锁pthread_mutex_lock(&thrPool->pool_lock);//无任务并且线程池不是要摧毁while(thrPool->job_num <= 0 && !thrPool->shutdown ){//如果没有任务,线程会阻塞pthread_cond_wait(&thrPool->not_empty_task,&thrPool->pool_lock);}if(thrPool->job_num){//有任务需要处理taskpos = (thrPool->job_pop++)%thrPool->max_job_num;//printf("task out %d...tasknum===%d tid=%lu\n",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self());//为什么要拷贝?避免任务被修改,生产者会添加任务memcpy(task,&thrPool->tasks[taskpos],sizeof(PoolTask));task->arg = task;thrPool->job_num--;//task = &thrPool->tasks[taskpos];pthread_cond_signal(&thrPool->empty_task);//通知生产者}if(thrPool->shutdown){//代表要摧毁线程池,此时线程退出即可//pthread_detach(pthread_self());//临死前分家pthread_mutex_unlock(&thrPool->pool_lock);free(task);pthread_exit(NULL);}//释放锁pthread_mutex_unlock(&thrPool->pool_lock);task->task_func(task->arg);//执行回调函数}//printf("end call %s-----\n",__FUNCTION__);
}

4.摧毁线程池

//摧毁线程池
void destroy_threadpool(ThreadPool *pool)
{pool->shutdown = 1;//开始自爆pthread_cond_broadcast(&pool->not_empty_task);//诱杀 int i = 0;for(i = 0; i < pool->thr_num ; i++){pthread_join(pool->threads[i],NULL);}pthread_cond_destroy(&pool->not_empty_task);pthread_cond_destroy(&pool->empty_task);pthread_mutex_destroy(&pool->pool_lock);free(pool->tasks);free(pool->threads);free(pool);
}

5.简单版线程池流程分析

四、复杂版本线程池

1.结构体介绍

typedef struct

{

    void *(*function)(void *);          /* 函数指针,回调函数 */

    void *arg;                          /* 上面函数的参数 */

} threadpool_task_t;                    /* 各子线程任务结构体 */

/* 描述线程池相关信息 */

struct threadpool_t

{

    pthread_mutex_t lock;               /* 用于锁住本结构体 */   

    pthread_mutex_t thread_counter;     /* 记录忙状态线程个数de琐 -- busy_thr_num */

    pthread_cond_t queue_not_full;      /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */

    pthread_cond_t queue_not_empty;     /* 任务队列里不为空时,通知等待任务的线程 */

    pthread_t *threads;                 /* 存放线程池中每个线程的tid。数组 */

    pthread_t adjust_tid;               /* 存管理线程tid */

    threadpool_task_t *task_queue;      /* 任务队列(数组首地址) */

    int min_thr_num;                    /* 线程池最小线程数 */

    int max_thr_num;                    /* 线程池最大线程数 */

    int live_thr_num;                   /* 当前存活线程个数 */

    int busy_thr_num;                   /* 忙状态线程个数 */

    int wait_exit_thr_num;              /* 要销毁的线程个数 */

    int queue_front;                    /* task_queue队头下标 */

    int queue_rear;                     /* task_queue队尾下标 */

    int queue_size;                     /* task_queue队中实际任务数 */

    int queue_max_size;                 /* task_queue队列可容纳任务数上限 */

    int shutdown;                       /* 标志位,线程池使用状态,true或false */

};

 

2.主线程

//threadpool_create(3,100,100);  
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{int i;threadpool_t *pool = NULL;do{if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {  printf("malloc threadpool fail");break;                                      /*跳出do while*/}pool->min_thr_num = min_thr_num;pool->max_thr_num = max_thr_num;pool->busy_thr_num = 0;pool->live_thr_num = min_thr_num;               /* 活着的线程数 初值=最小线程数 */pool->wait_exit_thr_num = 0;pool->queue_size = 0;                           /* 有0个产品 */pool->queue_max_size = queue_max_size;pool->queue_front = 0;pool->queue_rear = 0;pool->shutdown = false;                         /* 不关闭线程池 *//* 根据最大线程上限数, 给工作线程数组开辟空间, 并清零 */pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); if (pool->threads == NULL) {printf("malloc threads fail");break;}memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);/* 队列开辟空间 */pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);if (pool->task_queue == NULL) {printf("malloc task_queue fail\n");break;}/* 初始化互斥琐、条件变量 */if (pthread_mutex_init(&(pool->lock), NULL) != 0|| pthread_mutex_init(&(pool->thread_counter), NULL) != 0|| pthread_cond_init(&(pool->queue_not_empty), NULL) != 0|| pthread_cond_init(&(pool->queue_not_full), NULL) != 0){printf("init the lock or cond fail\n");break;}//启动工作线程pthread_attr_t attr;pthread_attr_init(&attr);pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);for (i = 0; i < min_thr_num; i++) {pthread_create(&(pool->threads[i]), &attr, threadpool_thread, (void *)pool);/*pool指向当前线程池*/printf("start thread 0x%x...\n", (unsigned int)pool->threads[i]);}//创建管理者线程pthread_create(&(pool->adjust_tid), &attr, adjust_thread, (void *)pool);return pool;} while (0);/* 前面代码调用失败时,释放poll存储空间 */threadpool_free(pool);return NULL;
}
/* 向线程池中 添加一个任务 */
//threadpool_add(thp, process, (void*)&num[i]);   /* 向线程池中添加任务 process: 小写---->大写*/int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
{pthread_mutex_lock(&(pool->lock));/* ==为真,队列已经满, 调wait阻塞 */while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));}if (pool->shutdown) {pthread_cond_broadcast(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0;}/* 清空 工作线程 调用的回调函数 的参数arg */if (pool->task_queue[pool->queue_rear].arg != NULL) {pool->task_queue[pool->queue_rear].arg = NULL;}/*添加任务到任务队列里*/pool->task_queue[pool->queue_rear].function = function;pool->task_queue[pool->queue_rear].arg = arg;pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size;       /* 队尾指针移动, 模拟环形 */pool->queue_size++;/*添加完任务后,队列不为空,唤醒线程池中 等待处理任务的线程*/pthread_cond_signal(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0;
}

3.子线程

/* 线程池中各个工作线程 */
void *threadpool_thread(void *threadpool)
{threadpool_t *pool = (threadpool_t *)threadpool;threadpool_task_t task;while (true) {/* Lock must be taken to wait on conditional variable *//*刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/pthread_mutex_lock(&(pool->lock));/*queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上, 若有任务,跳过该while*/while ((pool->queue_size == 0) && (!pool->shutdown)) {  printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));//暂停到这/*清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程*/if (pool->wait_exit_thr_num > 0) {pool->wait_exit_thr_num--;/*如果线程池里线程个数大于最小值时可以结束当前线程*/if (pool->live_thr_num > pool->min_thr_num) {printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());pool->live_thr_num--;pthread_mutex_unlock(&(pool->lock));//pthread_detach(pthread_self());pthread_exit(NULL);}}}/*如果指定了true,要关闭线程池里的每个线程,自行退出处理---销毁线程池*/if (pool->shutdown) {pthread_mutex_unlock(&(pool->lock));printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());//pthread_detach(pthread_self());pthread_exit(NULL);     /* 线程自行结束 */}/*从任务队列里获取任务, 是一个出队操作*/task.function = pool->task_queue[pool->queue_front].function;task.arg = pool->task_queue[pool->queue_front].arg;pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size;       /* 出队,模拟环形队列 */pool->queue_size--;/*通知可以有新的任务添加进来*/pthread_cond_broadcast(&(pool->queue_not_full));/*任务取出后,立即将 线程池琐 释放*/pthread_mutex_unlock(&(pool->lock));/*执行任务*/printf("thread 0x%x start working\n", (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter));                            /*忙状态线程数变量琐*/pool->busy_thr_num++;                                                   /*忙状态线程数+1*/pthread_mutex_unlock(&(pool->thread_counter));(*(task.function))(task.arg);                                           /*执行回调函数任务*///task.function(task.arg);                                              /*执行回调函数任务*//*任务结束处理*/printf("thread 0x%x end working\n", (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter));pool->busy_thr_num--;                                       /*处理掉一个任务,忙状态数线程数-1*/pthread_mutex_unlock(&(pool->thread_counter));}pthread_exit(NULL);
}

4.管理线程

/* 管理线程 */
void *adjust_thread(void *threadpool)
{int i;threadpool_t *pool = (threadpool_t *)threadpool;while (!pool->shutdown) {sleep(DEFAULT_TIME);                                    /*定时 对线程池管理*/pthread_mutex_lock(&(pool->lock));int queue_size = pool->queue_size;                      /* 关注 任务数 */int live_thr_num = pool->live_thr_num;                  /* 存活 线程数 */pthread_mutex_unlock(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));int busy_thr_num = pool->busy_thr_num;                  /* 忙着的线程数 */pthread_mutex_unlock(&(pool->thread_counter));/* 创建新线程 算法: 任务数大于最小线程池个数, 且存活的线程数少于最大线程个数时 如:30>=10 && 40<100*/if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) {pthread_mutex_lock(&(pool->lock));  int add = 0;/*一次增加 DEFAULT_THREAD 个线程*/for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY&& pool->live_thr_num < pool->max_thr_num; i++) {if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) {pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);add++;pool->live_thr_num++;}}pthread_mutex_unlock(&(pool->lock));}/* 销毁多余的空闲线程 算法:忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/if ((busy_thr_num * 2) < live_thr_num  &&  live_thr_num > pool->min_thr_num) {/* 一次销毁DEFAULT_THREAD个线程, 隨機10個即可 */pthread_mutex_lock(&(pool->lock));pool->wait_exit_thr_num = DEFAULT_THREAD_VARY;      /* 要销毁的线程数 设置为10 */pthread_mutex_unlock(&(pool->lock));for (i = 0; i < DEFAULT_THREAD_VARY; i++) {/* 通知处在空闲状态的线程, 他们会自行终止*/pthread_cond_signal(&(pool->queue_not_empty));}}}return NULL;
}

相关文章:

  • 【网络协议】聊聊TCP如何做到可靠传输的
  • 【UE】从UI中拖拽生成物体
  • YOLOv5独家首发改进新主干:改进版目标检测新范式骨干PPHGNetv2,百度PaddlePaddle出品,有效提升YOLOv5检测器检测精度
  • echarts的图表立体感——实现立体柱状图和立体饼图的详细教程
  • 利用移动互联、物联网、智能算法、地理信息系统、大数据分析等信息技术开发的智慧工地云平台源码
  • C/C++ 实现UDP发送或接收组播消息,并可指定接收发送网卡
  • ant Java任务的jvmargs属性和<jvmarg>内嵌元素
  • uniapp-自定义表格,右边操作栏固定
  • 探索网络攻防技术:自学之道
  • 大模型时代目标检测任务会走向何方?
  • 力扣每日一题91:解码方法
  • zigbee协议栈组播通讯 From zigbee菜鸟笔记(十 三)
  • Delphi 11.3中从一个日期时间中算出当月(当年、当季)的第一天与最后一天
  • Java17-20新特性
  • 数据库-索引
  • express + mock 让前后台并行开发
  • JavaScript创建对象的四种方式
  • Java应用性能调优
  • JDK9: 集成 Jshell 和 Maven 项目.
  • JS变量作用域
  • JS字符串转数字方法总结
  • Nodejs和JavaWeb协助开发
  • Quartz初级教程
  • Redis提升并发能力 | 从0开始构建SpringCloud微服务(2)
  • 初探 Vue 生命周期和钩子函数
  • 好的网址,关于.net 4.0 ,vs 2010
  • 漫谈开发设计中的一些“原则”及“设计哲学”
  • 写给高年级小学生看的《Bash 指南》
  • 优化 Vue 项目编译文件大小
  • #每日一题合集#牛客JZ23-JZ33
  • #我与Java虚拟机的故事#连载11: JVM学习之路
  • $con= MySQL有关填空题_2015年计算机二级考试《MySQL》提高练习题(10)
  • (2015)JS ES6 必知的十个 特性
  • (带教程)商业版SEO关键词按天计费系统:关键词排名优化、代理服务、手机自适应及搭建教程
  • (附源码)基于SpringBoot和Vue的厨到家服务平台的设计与实现 毕业设计 063133
  • (欧拉)openEuler系统添加网卡文件配置流程、(欧拉)openEuler系统手动配置ipv6地址流程、(欧拉)openEuler系统网络管理说明
  • (四)Tiki-taka算法(TTA)求解无人机三维路径规划研究(MATLAB)
  • (一)Spring Cloud 直击微服务作用、架构应用、hystrix降级
  • (中等) HDU 4370 0 or 1,建模+Dijkstra。
  • **PHP二维数组遍历时同时赋值
  • **python多态
  • .net wcf memory gates checking failed
  • .NET开源项目介绍及资源推荐:数据持久层
  • .NET开源项目介绍及资源推荐:数据持久层 (微软MVP写作)
  • :中兴通讯为何成功
  • ;号自动换行
  • [ JavaScript ] JSON方法
  • [ vulhub漏洞复现篇 ] JBOSS AS 5.x/6.x反序列化远程代码执行漏洞CVE-2017-12149
  • [android] 看博客学习hashCode()和equals()
  • [android] 天气app布局练习
  • [ASP.NET MVC]如何定制Numeric属性/字段验证消息
  • [BZOJ 1032][JSOI2007]祖码Zuma(区间Dp)
  • [BZOJ 3282] Tree 【LCT】
  • [codevs 1296] 营业额统计
  • [codevs1288] 埃及分数