当前位置: 首页 > news >正文

线程池详解并使用Go语言实现 Pool

写在前面

在线程池中存在几个概念:核心线程数最大线程数任务队列

  • 核心线程数指的是线程池的基本大小;也就是指worker的数量
  • 最大线程数指的是,同一时刻线程池中线程的数量最大不能超过该值;实际上就是指task任务的数量。
  • 任务队列是当任务较多时,线程池中线程的数量已经达到了核心线程数,这时候就是用任务队列来存储我们提交的任务。相当于缓冲作用。

与其他池化技术不同的是,线程池是基于生产者-消费者模式来实现的,任务的提交方是生产者,线程池是消费者 。当我们需要执行某个任务时,只需要把任务扔到线程池中即可。

池化技术:这里的池化和卷积的池化不一样,这里的池化技术简单点来说,就是提前保存大量的资源,以备不时之需

线程池中执行任务的流程如下图如下。
在这里插入图片描述

那么使用线程池可以带来一系列好处:

  1. 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  2. 提高响应速度:任务到达时,无需等待线程创建即可立即执行
  3. 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
  4. 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。

任务调度

首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。

  1. 如果 taskCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。

在这里插入图片描述

  1. 如果 taskCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。

在这里插入图片描述

  1. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据 拒绝策略 来处理该任务, 默认的处理方式是直接抛异常。

在这里插入图片描述

常见的拒绝策略有以下几种

  • AbortPolicy 中止策略:丢弃任务并抛出异常
  • DiscardPolicy 丢弃策略:丢弃任务,但是不抛出异常。如果线程队列已满,则后续提交的任务都会被丢弃,且是静默丢弃。
  • DiscardOldestPolicy 弃老策略:丢弃队列最前面的任务,然后重新提交被拒绝的任务。

简单实现

定义任务Task 并 定义NewTask来新建Task对象

type Task struct {f func() error
}func NewTask(f func() error) *Task {return &Task{f: f}
}

定义 WorkPool 线程池

type WorkPool struct {TaskQueue chan *Task // Task队列workNum   int        // 协程池中最大的worker数量shop      chan struct{} // 停止工作标识
}

创建 WorkPool 的函数

func NewWorkPool(cap int) *WorkPool {if cap <= 0 {cap = 10}return &WorkPool{TaskQueue: make(chan *Task),workNum:   cap,shop:      make(chan struct{}),}
}

具体的协程池中的工作节点

func (p *WorkPool) worker(workId int) {for task := range p.TaskQueue {err := task.Execute()if err != nil {fmt.Println(err)continue}fmt.Printf(" work id %d finished \n", workId) // 打印出具体是哪个节点进行工作}
}

协程池启动函数

func (p *WorkPool) run() {// 根据work num 去创建worker工作for i := 0; i < p.workNum; i++ {go p.worker(i)}<-p.shop
}

协程池关闭函数

func (p *WorkPool) close() {p.shop <- struct{}{}
}

测试一下,使用定时器,每2秒进行一次投放,并且投放超过5个之后开始停止。

func TestWorkPool(t *testing.T) {task := NewTask(func() error {fmt.Print(time.Now())return nil})taskCount := 0ticker := time.NewTicker(2 * time.Second)p := NewWorkPool(3)go func(c *time.Ticker) {for {p.TaskQueue <- task<-c.CtaskCount++if taskCount == 5 {p.close()break}}return}(ticker)p.run()
}

结果:

可以看到结果是每两秒进行一次打印,并且worker对象都不一样。

完整代码

package gorountine_poolimport ("fmt""testing""time"
)func TestWorkPool(t *testing.T) {task := NewTask(func() error {fmt.Print(time.Now())return nil})taskCount := 0ticker := time.NewTicker(2 * time.Second)p := NewWorkPool(3)go func(c *time.Ticker) {for {p.TaskQueue <- task<-c.CtaskCount++if taskCount == 5 {p.close()break}}return}(ticker)p.run()
}type Task struct {f func() error
}func NewTask(f func() error) *Task {return &Task{f: f}
}// Execute 执行业务方法
func (t *Task) Execute() error {return t.f()
}type WorkPool struct {TaskQueue chan *Task // task队列workNum   int        // 携程池中最大的worker数量shop      chan struct{} // 停止标识
}// 创建Pool的函数
func NewWorkPool(cap int) *WorkPool {if cap <= 0 {cap = 10}return &WorkPool{TaskQueue: make(chan *Task),workNum:   cap,shop:      make(chan struct{}),}
}func (p *WorkPool) worker(workId int) {// 具体的工作for task := range p.TaskQueue {err := task.Execute()if err != nil {fmt.Println(err)continue}fmt.Printf(" work id %d finished \n", workId)}
}// 携程池开始工作
func (p *WorkPool) run() {// 根据work num 去创建worker工作for i := 0; i < p.workNum; i++ {go p.worker(i)}<-p.shop
}func (p *WorkPool) close() {p.shop <- struct{}{}
}

参考链接

[1] https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
[2] https://blog.csdn.net/weixin_44688301/article/details/123292211
[3] https://www.bilibili.com/video/BV1Nf4y137na

相关文章:

  • python docx 添加动态表格
  • 从汇编看函数调用
  • 008 CSS盒子模型
  • 如何成为一名嵌入式C语言高手?
  • 突破编程_前端_SVG(概述)
  • 通俗易懂的理解 ADC(2)
  • zabbix绑定钉钉进行通知,网页端添加JavaScript,无脑式操作
  • sharo反序列化漏洞
  • 算法| ss 双指针
  • CentOS7安装Tomcat
  • 如何在plesk面板安装域名付费SSL证书
  • 云原生架构(微服务、容器云、DevOps、不可变基础设施、声明式API、Serverless、Service Mesh)
  • 大语言模型中常见小模型LLM垂直领域应用微调数据集
  • C++20 semaphore(信号量) 详解
  • 摄影杂记一
  • (ckeditor+ckfinder用法)Jquery,js获取ckeditor值
  • DOM的那些事
  • iOS仿今日头条、壁纸应用、筛选分类、三方微博、颜色填充等源码
  • LeetCode18.四数之和 JavaScript
  • Lucene解析 - 基本概念
  • PyCharm搭建GO开发环境(GO语言学习第1课)
  • Redis的resp协议
  • V4L2视频输入框架概述
  • Vim 折腾记
  • Zsh 开发指南(第十四篇 文件读写)
  • 不上全站https的网站你们就等着被恶心死吧
  • 利用DataURL技术在网页上显示图片
  • 目录与文件属性:编写ls
  • 浅谈Golang中select的用法
  • 区块链技术特点之去中心化特性
  • 如何将自己的网站分享到QQ空间,微信,微博等等
  • 使用SAX解析XML
  • 数据仓库的几种建模方法
  • 算法-图和图算法
  • 我的zsh配置, 2019最新方案
  • 【云吞铺子】性能抖动剖析(二)
  • 阿里云重庆大学大数据训练营落地分享
  • ​sqlite3 --- SQLite 数据库 DB-API 2.0 接口模块​
  • (超详细)2-YOLOV5改进-添加SimAM注意力机制
  • (二)springcloud实战之config配置中心
  • (十)c52学习之旅-定时器实验
  • (十六)一篇文章学会Java的常用API
  • (四)linux文件内容查看
  • (转)linux自定义开机启动服务和chkconfig使用方法
  • (转)使用VMware vSphere标准交换机设置网络连接
  • ../depcomp: line 571: exec: g++: not found
  • .net core 6 集成和使用 mongodb
  • .NET Core 控制台程序读 appsettings.json 、注依赖、配日志、设 IOptions
  • .NET delegate 委托 、 Event 事件,接口回调
  • .NET 编写一个可以异步等待循环中任何一个部分的 Awaiter
  • .net 程序 换成 java,NET程序员如何转行为J2EE之java基础上(9)
  • .NET大文件上传知识整理
  • .NET的数据绑定
  • /deep/和 >>>以及 ::v-deep 三者的区别
  • [ vulhub漏洞复现篇 ] ECShop 2.x / 3.x SQL注入/远程执行代码漏洞 xianzhi-2017-02-82239600