浅谈【多线程与并发】之线程池
目录
1、多线程/并发处理形式
1.1线程池的工作原理
2、项目-线程池
2.1数据说明
2.2PTP线程池的API
2.2.1 初始化线程池
2.2.2为线程池增加任务
2.2.3销毁线程池
谢谢帅气美丽且优秀的你看完我的文章还要点赞、收藏加关注
没错,说的就是你,不用再怀疑!!!
希望我的文章内容能对你有帮助,一起努力吧!!!
该篇文章主要内容为设计实现一个线程池库,适用于线程并发执行任务
1、多线程/并发处理形式
线程池:是一种多线程/并发处理形式,他运行将任务添加队列里面,并在创建线程之后自动启动这些任 务。
线程池的作用:
- 复用线程:减少线程的创建和销毁的开销。
- 通过预先创建一定数量的线程并且存储在线程池中,可以避免频繁的创建和销毁操作。
- 有效的控制线程的数量:方便的进行线程管理,可以控制同时运行的线程数量,防止线程过多导致 资源耗尽的问题。
- 提高系统响应速度:当有新任务的时候,不需要创建线程,立马执行。
1.1线程池的工作原理
- 在应用程序启动时候,创建一定数量的线程存储在线程池中。
- 这些线程是待命状态(休眠状态)
- 当需要执行任务的时候,从线程池中抓取一个空闲的线程,将任务分配给该线程去执行
- 当有任务存在,就随机唤醒一个线程去执行
- 任务执行完毕之后,线程返回到线程池中,继续待命(休眠)。
- 任务结束,就休眠
2、项目-线程池
需求:设计一个线程池库,适用于线程并发执行任务,可以动态规划线程池内线程数目。基于 POSIX Thread 进行开发
项目名: POSIX-Thread-Pool简称PTP
2.1数据说明
任务类型: task_t
- task_point :任务函数指针,即用户自定义的任务函数(线程需要去执行的任务函数指针)
- args :线程在执行 task_point 指向的任务的时候,需要使用到的参数
- next :指向下一个任务结点
线程池类型: ptp_t
- thread_count :线程池中最大线程数量,即能够支持的最高并发数目
- thread_status :线程池当前状态,启动或停止状态
- thread_id :线程池中服役线程集合
- thread_mutex :线程池中线程的共享互斥锁
- thread_cond :线程池中线程共享的条件变量
- thread_tasks :线程池中线程所需要执行的任务链表
- loop_task :线程池中轮询线程池任务链表的线程
2.2PTP线程池的API
2.2.1 初始化线程池
2.2.2为线程池增加任务
2.2.3销毁线程池
main.cpp
#include <iostream>
#include "POSIX_Thread_Pool.h"/*** @brief 任务函数* @param 参数* */void task(void *args)
{std::cout << "嘿嘿<" << pthread_self() << ">我开始执行任务了!!!" << std::endl;// 模拟执行任务的过程sleep(5);std::cout << "嘿嘿<" << pthread_self() << ">我执行完毕!!!" << std::endl;
}int main()
{// 创建并启动一个线程池ptp_t * thread_pool = ptp_init(10);// 为其添加任务for(int i = 0;i < 20;i++){ptp_add_task(thread_pool,task,NULL);}while(ptp_task_is_null(thread_pool));ptp_destroy(thread_pool);return 0;
}
POSIX_Thread_Pool.h
#ifndef __POSIX_THREAD_POOL_H__
#define __POSIX_THREAD_POOL_H__#include <pthread.h>
#include <unistd.h>typedef void (*task_point_t)(void *);/*- `task_point`:任务函数指针,即用户自定义的任务函数(线程需要去执行的任务函数指针)- `args`:线程在执行`task_point`指向的任务的时候,需要使用到的参数- `next`:指向下一个任务结点
*/
// 任务结构类型
typedef struct tasks
{// 任务指针task_point_t task_point;/*// 任务函数需要符合这个规则void task(void *data){// 需要执行的任务}*/// 任务执行所需要参数void *args;// 下一个任务struct tasks *next;
}task_t;/*- `thread_count`:线程池中最大线程数量,即能够支持的最高并发数目- `thread_status`:线程池当前状态,启动或停止状态- `thread_id`:线程池中服役线程集合- `thread_mutex`:线程池中线程的共享互斥锁- `thread_cond`:线程池中线程共享的条件变量- `thread_tasks`:线程池中线程所需要执行的任务链表
*/
typedef struct posix_thread_pool
{// 线程的个数int thread_count; // 线程池状态bool thread_status;// 线程集合pthread_t *thread_id;// 线程池中线程共享的互斥锁pthread_mutex_t thread_mutex;// 线程池中线程共享的条件变量,即通知pthread_cond_t thread_cond;// 线程任务链表task_t *thread_tasks;// 轮询任务的线程pthread_t loop_task;/*最大线程的数目:表示可以支持线程并发的最大线程数当前服役的线程数目:表示当前能够并发的线程数量当前休眠的线程数目:表示当前正在待命且可以执行任务的线程数量。...*/
}ptp_t;/*** @brief 创建并初始化一个POSIX线程池* @param count 设置线程池的最大并发线程数量* @return 成功返回创建并初始化完毕的线程池指针,失败返回NULL
*/
ptp_t *ptp_init(int count);/*** @brief 往指定的线程池中增加任务* @param thread_pool 需要增加任务的线程池* @param task 增加的任务指针* @param args 任务执行所需要的参数* @return NULL*/
void ptp_add_task(ptp_t *thread_pool,task_point_t task,void *args);/*** @brief 销毁一个已经存在的线程池* @param thread_pool 需要销毁的线程池指针引用*/
void ptp_destroy(ptp_t *&thread_pool);/*** @brief 判断任务是否为空* @param thread_pool 需要判断任务的线程池指针* @return 成功返回true,失败返回false*/
bool ptp_task_is_null(ptp_t *thread_pool);#endif //__POSIX_THREAD_POOL_H__
POSIX_Thread_Pool.cpp
#include "POSIX_Thread_Pool.h"/*** @brief 任务线程函数* @param args 表示线程执行任务所需要参数* @return NULL*/
void *ptp_start_routine(void *args);/*** @brief 轮询任务链表,如果任务链表不为空,就持续线程* @param thread_pool 需要轮询任务链表的线程池 * */
void *loop_task(void * args)
{// 先获取线程池指针ptp_t *thread_pool = (ptp_t*)args;while(thread_pool->thread_status){pthread_mutex_lock(&thread_pool->thread_mutex);if(thread_pool->thread_tasks)pthread_cond_signal(&thread_pool->thread_cond);pthread_mutex_unlock(&thread_pool->thread_mutex);usleep(1000);}return NULL;
}/*** @brief 创建并初始化一个POSIX线程池* @param count 设置线程池的最大并发线程数量* @return 成功返回创建并初始化完毕的线程池指针,失败返回NULL
*/
ptp_t *ptp_init(int count)
{// 申请一个线程池空间ptp_t *thread_pool = new ptp_t;/*初始化成员变量1、线程池的线程个数2、通过线程池中线程个数,为线程集合申请线程ID集合的空间3、设置线程池状态4、任务列表初始化5、初始化线程互斥锁6、初始化条件变量7、创建线程池中的线程8、创建任务轮询线程*/thread_pool->thread_count = count;thread_pool->thread_id = new pthread_t[count];thread_pool->thread_status = true;thread_pool->thread_tasks = NULL;pthread_mutex_init(&thread_pool->thread_mutex,NULL);pthread_cond_init(&thread_pool->thread_cond,NULL);/*创建线程*/for(int i = 0;i < count;i++){// 判断线程是否创建失败if(pthread_create((thread_pool->thread_id+i),NULL,ptp_start_routine,thread_pool) != 0){i--;continue; // 重新创建线程}}while(pthread_create(&thread_pool->loop_task,NULL,loop_task,thread_pool)!=0);// 返回创建好的线程池指针return thread_pool;
}/*** @brief 任务线程函数* @param args 表示线程执行任务所需要参数* @return NULL*/
void *ptp_start_routine(void *args)
{// 先获取线程池指针ptp_t *thread_pool = (ptp_t*)args;/*循环:循环执行任务和等待阻塞:休眠等待条件变化唤醒线程去执行任务,执行完之后重新休眠*/do{/*上锁:创建临界区*/pthread_mutex_lock(&thread_pool->thread_mutex);/*进入休眠:等待被条件唤醒*/pthread_cond_wait(&thread_pool->thread_cond,&thread_pool->thread_mutex);/*被唤醒之后要做的事情1、从线程池的任务链表摘取一个任务2、去执行这个任务3、执行完毕之后重新休眠*/task_t *task = NULL;// 没有任务的情况if(thread_pool->thread_tasks == NULL){pthread_mutex_unlock(&thread_pool->thread_mutex);goto cont; // 继续休眠}// 摘取任务task = thread_pool->thread_tasks;thread_pool->thread_tasks = thread_pool->thread_tasks->next;pthread_mutex_unlock(&thread_pool->thread_mutex);/*执行任务*/(task->task_point)(task->args); // 等价 task_point(task->args);/*执行完毕之后:销毁任务,释放空间*/task->next = NULL;delete task;
cont:task = NULL;} while (thread_pool->thread_status);return NULL;
}/*** @brief 往指定的线程池中增加任务* @param thread_pool 需要增加任务的线程池* @param task 增加的任务指针* @param args 任务执行所需要的参数* @return NULL*/
void ptp_add_task(ptp_t *thread_pool,task_point_t task,void *args)
{/*任务是存储在一个任务结点中的1、申请一个任务结点空间*/task_t *task_node = new task_t;/*2、初始化任务结点*/task_node->args = args;task_node->task_point = task;task_node->next = NULL;/*添加到线程池中线程池中的任务链表是线程共享的(共享资源)需要用到互斥锁*/pthread_mutex_lock(&thread_pool->thread_mutex);// 判断任务链表中是否为空if(thread_pool->thread_tasks == NULL)thread_pool->thread_tasks = task_node;else{/* 找任务链表的尾结点:进行尾插 */task_t *task_temp = thread_pool->thread_tasks;while(task_temp->next)task_temp = task_temp->next;task_temp->next = task_node;}/* 添加完成任务之后:任务区有任务了,就需要唤醒线程去执行任务*/pthread_cond_signal(&thread_pool->thread_cond);pthread_mutex_unlock(&thread_pool->thread_mutex);
}/*** @brief 销毁一个已经存在的线程池* @param thread_pool 需要销毁的线程池指针引用*/
void ptp_destroy(ptp_t *&thread_pool)
{/*关闭线程池*/pthread_mutex_lock(&thread_pool->thread_mutex);thread_pool->thread_status = false;pthread_mutex_unlock(&thread_pool->thread_mutex);/*任务轮询线程回收*/pthread_join(thread_pool->loop_task,NULL);/*唤醒所有的线程*/pthread_cond_broadcast(&thread_pool->thread_cond);/*等待线程结束*/for(int i = 0;i < thread_pool->thread_count;i++)pthread_join(thread_pool->thread_id[i],NULL);/*销毁任务链表*/task_t *task = thread_pool->thread_tasks;while(task){thread_pool->thread_tasks = thread_pool->thread_tasks->next;task->next = NULL;delete task;task = thread_pool->thread_tasks;}thread_pool->thread_tasks = NULL;/*销毁线程ID数组空间*/delete []thread_pool->thread_id;thread_pool->thread_id = NULL;/*销毁条件变量和互斥锁*/pthread_mutex_destroy(&thread_pool->thread_mutex);pthread_cond_destroy(&thread_pool->thread_cond);/*将线程池释放,并置空*/delete thread_pool;thread_pool = NULL;
}/*** @brief 判断任务是否为空* @param thread_pool 需要判断任务的线程池指针* @return 成功返回true,失败返回false*/
bool ptp_task_is_null(ptp_t *thread_pool)
{if(thread_pool == NULL)return false;else if(thread_pool->thread_tasks == NULL)return false;elsereturn true;
}