当前位置: 首页 > news >正文

『 Linux 』线程池与 POSIX 线程的封装编码实现

文章目录

    • 线程池概念
      • 线程池的编码实现
      • 线程池的测试
      • 参考代码
    • 线程的封装
      • 使用测试封装后的线程
      • 参考代码


线程池概念

请添加图片描述

池化技术是一种资源管理方法,通过预先创建和管理一组资源以便在需要使用时快速分配这些资源;

线程池是池化技术的一种典型应用;

  • 资源分配

    在线程池中预先创建一定数量的线程,并使其运行;

  • 资源复用

    在线程池中完成的线程不会被销毁,而是返回池中等待下一个任务;

  • 资源管理

    在线程池中统一管理线程的创建分配和回收,以集中管理资源的生命周期,包括创建,分配,回收和销毁;

  • 性能优化

    线程池能够避免频繁创建和销毁线程的开销,从而提高整体效率;

  • 资源限制

    在线程池中一般需要限制最大线程数以防止系统资源耗尽;

  • 可配置性

    在线程池中可配置核心线程数,最大线程数,空闲线程存活时间等;

线程池本质上是一个生产者消费者模型的典型应用;

其中用户即为生产者,线程池中的线程即为消费者,通过调用线程池的代码并向线程传递任务信息使得线程池中的线程能够获取对应的任务并进行消费处理;


线程池的编码实现

请添加图片描述

#ifndef THREADPOOL_HPP
#define THREADPOOL_HPP
#include <pthread.h>
#include <unistd.h>#include <iostream>
#include <queue>
#include <string>
#include <vector>// 定义线程信息结构体,包含线程ID和名称
struct ThreadInfo {pthread_t tid;        // 线程IDstd::string name;     // 线程名称
};// 线程池类模板,T 为任务类型
template <class T>
class ThreadPool {const static int defaultnum = 7;  // 默认线程数量public:// 线程同步相关方法void Lock() { pthread_mutex_lock(&mutex_); }    // 加锁void Unlock() { pthread_mutex_unlock(&mutex_); } // 解锁void Wake() { pthread_cond_signal(&cond_); }    // 唤醒等待的线程void Wait() { pthread_cond_wait(&cond_, &mutex_); } // 等待条件变量bool Isempty() { return tasks_.empty(); }       // 检查任务队列是否为空// 根据线程ID获取线程名称std::string Getname(pthread_t tid) {for (const auto &ti : threads_) {if (ti.tid == tid) {return ti.name;}}return "null";}public:// 构造函数,初始化线程池ThreadPool(int volume = defaultnum) : threads_(volume) {pthread_mutex_init(&mutex_, nullptr);  // 初始化互斥锁pthread_cond_init(&cond_, nullptr);    // 初始化条件变量}// 启动线程池void Start() {int num = threads_.size();for (int i = 0; i < num; ++i) {threads_[i].name = "Thread-" + std::to_string(i);  // 设置线程名称// 创建线程,并传入处理任务的静态函数pthread_create(&(threads_[i].tid), nullptr, HanderTask, this);}}// 向任务队列添加任务void Push(const T &t) {Lock();        // 加锁保护共享资源tasks_.push(t); // 添加任务到队列Wake();        // 唤醒等待的线程Unlock();      // 解锁}// 从任务队列取出任务T Pop() {T t = tasks_.front(); // 获取队首任务tasks_.pop();         // 移除队首任务return t;}// 静态任务处理函数static void *HanderTask(void *args) {// 将参数转换为线程池指针ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);std::string name = tp->Getname(pthread_self());  // 获取当前线程名称while (1) {tp->Lock();while (tp->Isempty()) {tp->Wait();  // 如果任务队列为空,则等待}T t = tp->Pop();  // 取出任务tp->Unlock();// 执行任务t();// 打印任务执行结果(这里假设任务类型 T 有特定的方法)printf("The %s handler a task ,the result is %2d %c %2d = %3d ,the exit code "":%d\n",name.c_str(), t.getnum1(), t.getoper(), t.getnum2(), t.getresult(),t.getexitcode());}return nullptr;}// 析构函数,清理资源~ThreadPool() {pthread_mutex_destroy(&mutex_);  // 销毁互斥锁pthread_cond_destroy(&cond_);    // 销毁条件变量}private:std::vector<ThreadInfo> threads_;  // 存储线程信息的向量std::queue<T> tasks_;              // 任务队列pthread_mutex_t mutex_;            // 互斥锁,用于线程同步pthread_cond_t cond_;              // 条件变量,用于线程同步
};#endif

以该段代码为例设计了一个建议的线程池,主要思路为使用POSIX线程库通过预先创建一组线程和使用任务队列管理并发任务;

封装了一个ThreadInfo结构体用于存储每个线程的基本信息包括线程的tid与线程名;

// 定义线程信息结构体,包含线程ID和名称
struct ThreadInfo {pthread_t tid;        // 线程IDstd::string name;     // 线程名称
};
  • ThreadPool类模板

    使用了模板参数T定义任务的类型,使得线程可以处理不同类型的任务;

    并将线程池中常用的操作pthread_mutex_lock(),pthread_mutex_unlock(),pthread_cond_wait()pthread_cond_signal()分装为了Lock(),Unlock(),Wait()Wake()以方便后续在线程池中的编码;

    其中该线程池中设计了几个私有成员:

    • std::vector<ThreadInfo> threads_

      用于存储线程基本信息的容器;

    • std::queue<T> tasks_

      用于存储任务的任务队列;

    • pthread_mutex_t mutex_

      声明了一个互斥锁,用于处理线程池中线程间的互斥;

    • pthread_cond_t cond_

      声明了一个条件变量,用于处理线程池中线程间的同步关系;

    • const static int defaultnum = 7

      设置了一个静态成员变量并初始化作为线程池默认情况下的线程数量;

    同时定义了两个类内函数Isempty()Getname()分别用来检查任务队列状态与线程名称的获取;

  • 构造函数

    在构造函数中初始化了线程向量和同步语句(互斥锁和条件变量);

  • Start()函数

    该函数用于创建指定数量的线程,每个线程执行对应的HanderTask()函数;

    其中因为HanderTask()函数被static声明为静态,无法直接调用类内成员函数,故需要在使用pthread_create()函数时传参this指针;

  • Push()函数与Pop()函数

    分别用来向任务队列添加任务和从队列中取出任务,其中Push()的实现依靠互斥锁与条件变量使线程在进行该动作时保证其同步与互斥关系;

  • static void* HanderTask(void* args)

    该函数为线程主要的执行函数,为线程的执行入口;

    使用静态static声明函数的主要原因是该函数用于设计为必须为void* (*) (void*)的函数;

    而类内成员函数的第一个参数必定存在一个隐含的this指针,从而导致参数数量不匹配,故需要使用static将其修饰为静态成员函数;

    对应的静态成员函数的特点为如下:

    • 不依赖于类的实例
    • 不能直接访问非静态成员变量或调用非静态成员函数
    • 不隐含this指针
  • 析构函数

    析构函数用于负责清理同步语句;

在该代码仅供参考,在HanderTask()函数中所执行的任务必须是定义的,即该处的T应有特定的方法,线程将执行该T类型特定的方法并打印对应信息;


线程池的测试

请添加图片描述

假设线程池的任务为一个Task类所封装的任务:

/* Task.hpp */#ifndef TASK_HPP
#define TASK_HPP
#include <iostream>// 定义错误代码枚举
enum { DIV_ERR = 1, MOD_ERR, NONE };class Task {public:Task() {}  // 默认构造// 便于环形生产者消费者模型能够进行默认构造初始化并进行默认拷贝构造// 构造函数:初始化所有成员变量Task(int num1, int num2, char oper): num1_(num1), num2_(num2), exit_code_(0), result_(0), oper_(oper) {}// 析构函数(当前为空)~Task() {}// 执行任务的主要函数void run() {switch (oper_) {case '+':result_ = num1_ + num2_;break;case '-':result_ = num1_ - num2_;break;case '*':result_ = num1_ * num2_;break;case '/': {if (num2_ == 0) {exit_code_ = DIV_ERR;  // 设置除零错误result_ = -1;          // 除零时结果设为-1} elseresult_ = num1_ / num2_;break;}case '%': {if (num2_ == 0) {exit_code_ = MOD_ERR;  // 设置模零错误result_ = -1;          // 模零时结果设为-1} elseresult_ = num1_ % num2_;break;}default:exit_code_ = NONE;  // 未知操作符break;}}// 重载()运算符,使对象可以像函数一样被调用void operator()() { run(); }// 获取计算结果int getresult() { return result_; }// 获取退出代码int getexitcode() { return exit_code_; }// 获取第一个操作数int getnum1() { return num1_; }// 获取第二个操作数int getnum2() { return num2_; }// 获取操作符char getoper() { return oper_; }private:int num1_;       // 第一个操作数int num2_;       // 第二个操作数int exit_code_;  // 退出代码,用于表示操作是否成功int result_;     // 计算结果char oper_;      // 操作符
};#endif

该段代码封装了一个简单的加减乘除取模任务用于测试线程池;

对应的测试代码如下:

#include "Task.hpp"
#include "ThreadPool.hpp"
using namespace std;int main() {srand(time(nullptr));ThreadPool<Task> *tp =new ThreadPool<Task>();  // 若参数传5 表示需要创建的线程池线程数量为5tp->Start();                 // 运行线程池string opers = "+-*/%";int len = opers.size();while (true) {// 1. 构建任务int num1 = rand() % 15;usleep(10);int num2 = rand() % 15;char op = opers[rand() % len];Task task(num1, num2, op);// 2. 将任务发送给线程池使其进行处理tp->Push(task);printf("The main thread send a task: %d %c %d = ?\n", num1, op, num2);sleep(1);}return 0;
}

种下一个随机数种子;

使用new ThreadPool<Task>实例化出一个线程池对象(默认为7),并调用其Start()函数将其运行;

定义了一个string对象并初始化为"+-*/%"便于模拟随机生成任务发送给线程池;

while()循环中不断创建新的任务并调用线程池中的Push()接口发送给线程池让线程池进行处理任务并打印对应信息;

运行结果为:

$ ./threadpool 
The main thread send a task: 5 % 6 = ?
The Thread-0 handler a task ,the result is  5 %  6 =   5 ,the exit code :0
The main thread send a task: 14 - 5 = ?
The Thread-1 handler a task ,the result is 14 -  5 =   9 ,the exit code :0
The main thread send a task: 3 % 6 = ?
The Thread-2 handler a task ,the result is  3 %  6 =   3 ,the exit code :0
The main thread send a task: 4 / 9 = ?
The Thread-3 handler a task ,the result is  4 /  9 =   0 ,the exit code :0
The main thread send a task: 12 + 7 = ?
The Thread-4 handler a task ,the result is 12 +  7 =  19 ,the exit code :0
The main thread send a task: 3 / 9 = ?
The Thread-5 handler a task ,the result is  3 /  9 =   0 ,the exit code :0
The main thread send a task: 14 * 1 = ?
The Thread-6 handler a task ,the result is 14 *  1 =  14 ,the exit code :0
The main thread send a task: 5 % 2 = ?
The Thread-0 handler a task ,the result is  5 %  2 =   1 ,the exit code :0
...
...

参考代码

请添加图片描述

(供参考) CSDN - Dio夹心小面包 / Gitee - 半介莽夫


线程的封装

请添加图片描述

一切皆为对象,对应的POSIX线程pthread_t也可封装为一个类,同时可通过拓展这个类使线程的使用更加便捷;

#ifndef THREAD_HPP
#define THREAD_HPP#include <pthread.h>
#include <ctime>
#include <iostream>
#include <string>// 定义回调函数类型
typedef void (*callback_t)();class Thread {private:static int num_;  // 静态成员,用于给线程命名private:// 静态线程入口函数,符合 pthread_create 的要求static void *Routine(void *args) {Thread *thread = static_cast<Thread *>(args);thread->Entery();  // 调用实际的线程入口函数return nullptr;}public:// 构造函数,初始化线程对象Thread(callback_t cb): tid_(0), name_(""), start_timestamp_(0), isrunning_(false), cb_(cb) {}// 启动线程void Run() {name_ = "thread-" + std::to_string(num_++);  // 设置线程名start_timestamp_ = time(nullptr);            // 记录启动时间戳isrunning_ = true;pthread_create(&tid_, nullptr, Routine, this);  // 创建线程}// 等待线程结束void Join() {pthread_join(tid_, nullptr);isrunning_ = false;}// 实际的线程入口函数,调用回调函数void Entery() { cb_(); }// 分离线程void Detach() {if (isrunning_) {pthread_detach(tid_);isrunning_ = false;}}// 析构函数,确保线程正确结束~Thread() {if (isrunning_) {Join();}}// 获取线程名std::string getName() { return name_; }// 获取线程启动时间戳uint64_t getStartTimeStamp() { return start_timestamp_; }// 检查线程是否正在运行bool isRunning() { return isrunning_; }private:pthread_t tid_;             // 线程IDstd::string name_;          // 线程名称uint64_t start_timestamp_;  // 线程启动时间戳bool isrunning_;            // 线程运行状态标志callback_t cb_;             // 回调函数
};// 初始化静态成员变量
int Thread::num_ = 1;#endif

该代码为一个Thread类的实现,封装了POSIX线程的基本操作;

定义了几个成员函数分别为tid_,name_,start_timestamp_,isrunning_cb_,分别用来存储线程的tid,线程名,线程启动时的时间戳,线程的运行状态以及用户传入的函数;

  • callback_t

    定义了一个函数指针类型,用于存储线程要执行的回调函数;

  • 静态成员num_

    该参数用于为每个线程生成唯一的名字,这里可能涉及到num_被所有线程共享,导致产生临界资源的竞争的问题;

  • Routine()

    该函数是线程的一个入口点,即pthread_create()函数的第三个参数;

    该函数被声明为一个静态函数,本质原因是因为POSIX标准定义传入的函数必须为void *(*)(void*)类型的函数,而类内函数存在隐含的this指针,故声明为静态函数;

    为了避免这个问题也可将将该函数防止与类外;

    该函数将传入的参数转换回Thread对象指针,并调用该对象的Entery()函数;

  • 构造函数

    构造函数用于初始化线程对象,并设置回调函数;

  • Run()

    该函数用于设置线程名称和启动时间戳,并调用pthread_create()创建实际的POSIX线程;

    由于该函数需配合Routine()函数故在调用pthread_create()时需传入自身的this指针;

  • Join()

    该函数用于等待线程结束,并更新运行状态;

  • Entery()

    该函数为函数的入口点,本质上在Run()中调用pthread_create()传入对应的Routine()函数时Routine()函数将调用类内封装的Entery()函数,而Entery()函数将直接通过调用cb_()来运行;

    其中cb_本质上赋的就是用户所传入的函数指针;

  • Detach()

    该方法用于用户选择是否将该线程进行分离;

    当分离过后即不可再对其进行Join()操作;

  • 析构函数

    析构函数确保线程在对象销毁时正常结束;

  • 其他方法

    提供了获取线程名称,启动时间戳和运行状态等接口;


使用测试封装后的线程

请添加图片描述

#include <unistd.h>
#include <iostream>
#include <vector>
#include "Thread.hpp"using namespace std;// 线程执行的函数
void run() {while (1) {cout << "thread running" << endl;sleep(1);  // 每秒打印一次}
}int main() {// 创建一个存储Thread对象的vectorvector<Thread> threads;// 创建5个Thread对象,每个对象都使用run函数作为回调for (int i = 0; i < 5; ++i) {threads.push_back(Thread(run));}// 启动所有线程for (auto& th : threads) {th.Run();}// 等待所有线程结束// 注意:由于run函数中是无限循环,这些线程实际上不会结束for (auto& th : threads) {th.Join();}return 0;
}

以该段代码为例,定义了一个run()函数作为线程的执行函数;

这个函数将会无限循环打印一次thread running;

main()函数中创建了一个vector<Thread>来存储Thread对象,并创建5个对象都调用run()作为回调;

启动所有的创建的线程;

可用shell脚本进行观察:

$ while : ; do ps -aL | head -1 && ps -aL |  grep mythread ; echo "------------------------------------" ; sleep 1 ;done

最终运行结果为:

# 程序所在会话$ ./mythread 
thread runningthread running
thread running
thread running
thread runningthread runningthread running
thread runningthread running
thread running
thread running
thread running
thread running
thread running
thread running
...
...# shell 脚本会话PID   LWP TTY          TIME CMD8639  8639 pts/0    00:00:00 mythread8639  8640 pts/0    00:00:00 mythread8639  8641 pts/0    00:00:00 mythread8639  8642 pts/0    00:00:00 mythread8639  8643 pts/0    00:00:00 mythread8639  8644 pts/0    00:00:00 mythread
------------------------------------PID   LWP TTY          TIME CMD8639  8639 pts/0    00:00:00 mythread8639  8640 pts/0    00:00:00 mythread8639  8641 pts/0    00:00:00 mythread8639  8642 pts/0    00:00:00 mythread8639  8643 pts/0    00:00:00 mythread8639  8644 pts/0    00:00:00 mythread
------------------------------------
...
...

结果如预期(打印错乱是因为不同线程对显示器资源进行打印所导致);

除主线程外创建了5个线程;


参考代码

请添加图片描述

(供参考) CSDN - Dio夹心小面包 / Gitee - 半介莽夫

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 无人机PX4飞控 | 电源系统详解与相关代码
  • Flask+LayUI开发手记(一):LayUI表格的前端数据分页展现
  • 高级java每日一道面试题-2024年8月06日-web篇-cookie,session,token有什么区别?
  • 【Material-UI】Autocomplete中的禁用选项:Disabled options
  • 基于Python的脑电图(EEG)信号分析(5)
  • Golang | Leetcode Golang题解之第312题戳气球
  • python 实现粒子群算法
  • 日志和守护进程
  • 人大金仓(Kingbase)数据库高阶函数详解
  • Java中的网络协议实现:HTTP/2与gRPC
  • 计算机的错误计算(五十四)
  • SpringBoot依赖之Quartz Scheduler定时调度器
  • Vue 3+Vite+Eectron从入门到实战系列之(三)一Electron热身运动(一)
  • 智慧公厕系统解决方案实现更人性化的服务
  • 基于深度学习的数据并行与模型并行
  • IE9 : DOM Exception: INVALID_CHARACTER_ERR (5)
  • 收藏网友的 源程序下载网
  • Android组件 - 收藏集 - 掘金
  • create-react-app项目添加less配置
  • css系列之关于字体的事
  • docker-consul
  • Flex布局到底解决了什么问题
  • Javascript 原型链
  • JavaScript设计模式系列一:工厂模式
  • k8s如何管理Pod
  • mac修复ab及siege安装
  • Magento 1.x 中文订单打印乱码
  • Meteor的表单提交:Form
  • Terraform入门 - 1. 安装Terraform
  • TiDB 源码阅读系列文章(十)Chunk 和执行框架简介
  • 不发不行!Netty集成文字图片聊天室外加TCP/IP软硬件通信
  • 等保2.0 | 几维安全发布等保检测、等保加固专版 加速企业等保合规
  • 多线程事务回滚
  • 分享一份非常强势的Android面试题
  • 搞机器学习要哪些技能
  • 坑!为什么View.startAnimation不起作用?
  • 前端存储 - localStorage
  • 浅谈Golang中select的用法
  • 入门到放弃node系列之Hello Word篇
  • 收藏好这篇,别再只说“数据劫持”了
  • 微信公众号开发小记——5.python微信红包
  • 一道闭包题引发的思考
  • 与 ConTeXt MkIV 官方文档的接驳
  • 云大使推广中的常见热门问题
  • 再谈express与koa的对比
  • mysql面试题分组并合并列
  • 从如何停掉 Promise 链说起
  • ​LeetCode解法汇总518. 零钱兑换 II
  • ​渐进式Web应用PWA的未来
  • #etcd#安装时出错
  • (11)MSP430F5529 定时器B
  • (13):Silverlight 2 数据与通信之WebRequest
  • (附源码)ssm高校升本考试管理系统 毕业设计 201631
  • (黑马点评)二、短信登录功能实现
  • (六)库存超卖案例实战——使用mysql分布式锁解决“超卖”问题