当前位置: 首页 > news >正文

简单剖析tRPC-Go中使用的第三方协程池ants

tRPC-Go中的tRPC.Go()方法使用了ants协程池,做个简单剖析

panjf2000/ants协程池

在tRPC.Go方法(异步启动goroutine)中看到里面使用了ants协程池去实现(具体位置:g.pool.Invoke(p)

前置知识:

我们想异步完成一个任务,首先创建一个任务,然后需要从协程池(PoolWithFunc)中获取worker(goWorkerWithFunc),假设目前队列为空,这时一个worker和一个goroutine会一起创建出来,可以认为他俩就是绑一起的,然后处理完这个任务后,处于当前goroutine中的worker会放入全局的队列中,等待被其他协程去获取这个worker。

于是深入探索了下pool.Invoke等方法:

// github.com/panjf2000/ants/v2@v2.4.6/pool_func.go:162
// Invoke submits a task to pool.
func (p *PoolWithFunc) Invoke(args interface{}) error {// 这个func的caller:p就是一个协程池对象if p.IsClosed() {return ErrPoolClosed}var w *goWorkerWithFuncif w = p.retrieveWorker(); w == nil {// <----return ErrPoolOverload}w.args <- args // 这里展现了channel的异步通信能力:告知,具体可往下看**callback**return nil
}

首先执行p.retrieveWorker()获取一个worker,这个方法里面会根据当前worker队列的数量去做不同的逻辑:

  1. 若队列为空,表示无可用的worker,则需要新建(其中包含使用原生go去启动新协程)
// 会执行下面这个函数
spawnWorker := func() {w = p.workerCache.Get().(*goWorkerWithFunc)// workerCache是个sync.pool,从中获取goWorkerWithFunc对象w.run()// 启动一个 Goroutine 来重复执行传入的函数}
// worker 的结构如下:
// goWorkerWithFunc is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and performs function calls.
type goWorkerWithFunc struct {// pool who owns this worker.pool *PoolWithFunc // 该 worker 所在协程池对象的指针// args is a job should be done.args chan interface{} // 待执行的任务// recycleTime will be update when putting a worker back into queue.recycleTime time.Time
}

为什么说这个Goroutine可以重复执行传入的函数?

答案:使用for循环不断获取无缓冲channel中的对象,获取不到则阻塞,直到管道关闭为止。

// run starts a goroutine to repeat the process
// that performs the function calls.
func (w *goWorkerWithFunc) run() {w.pool.incRunning()go func() {defer func() {// ...}()for args := range w.args {// w.args类型是chan any,用for循环不断获取无缓冲channel中的对象,获取不到则阻塞,直到管道关闭为止if args == nil {return}w.pool.poolFunc(args)// args对象就是每个传入的任务,poolFunc是这个协程池对象的执行任务的接口funcif ok := w.pool.revertWorker(w); !ok {// 将该worker放入队列中,供其他人后续获取这个worker并利用当前这个协程去执行任务return}}}()
}

所以说:新开的这个协程会一直处于for循环中不断等待并执行新的任务。

callbackg.pool.Invoke(p)方法中的w.args <- args就是用来通知某个任务协程中的for args := range w.args去执行新的任务。

这里的p和args对象就是一个任务,结构如下:

p := &goerParam{ctx:     newCtx,// context.Contextcancel:  cancel,// context.CancelFunchandler: handler,// func(context.Context) 调用方传入的闭包函数
}

sync.pool

上面可以看到 p.workerCache.Get().(*goWorkerWithFunc)中,任务p包含了一个workerCache属性,它是sync.pool类型,一个并发安全的对象池。说明worker都保存在一个对象池中,目的是减少内存分配和垃圾回收的开销。

有待深入…

相关文章:

  • 精读文献|《CATENA》新文:全球植被绿化对生态系统水分利用效率的响应
  • Python sorted()方法
  • 极具吸引力的小程序 UI 风格
  • 1. ELK日志分析
  • delmia中机器人末端固定工具
  • spring01
  • 用android如何实现计算机计算功能
  • 【机器学习】 第1章 概述
  • 从视频创意到传播策略 | 医药产品TVC新媒体传播方案
  • 运动想象 (MI) 分类学习系列 (14) :基于时空光谱特征的分类方法
  • WebRTC AudioProcessing 移植调试
  • 设计模式-迭代器模式
  • 使用SQLite
  • 从面试角度了解前端基础知识体系
  • JavaFX HTMLEditor
  • @angular/forms 源码解析之双向绑定
  • 【笔记】你不知道的JS读书笔记——Promise
  • CSS盒模型深入
  • css属性的继承、初识值、计算值、当前值、应用值
  • EOS是什么
  • java B2B2C 源码多租户电子商城系统-Kafka基本使用介绍
  • Java基本数据类型之Number
  • PHP那些事儿
  • SpiderData 2019年2月23日 DApp数据排行榜
  • SpriteKit 技巧之添加背景图片
  • vue2.0一起在懵逼的海洋里越陷越深(四)
  • 发布国内首个无服务器容器服务,运维效率从未如此高效
  • 分布式熔断降级平台aegis
  • 精彩代码 vue.js
  • 每天一个设计模式之命令模式
  • 排序(1):冒泡排序
  • 使用API自动生成工具优化前端工作流
  • -- 数据结构 顺序表 --Java
  • 微信小程序上拉加载:onReachBottom详解+设置触发距离
  • 我从编程教室毕业
  • 小试R空间处理新库sf
  • 译自由幺半群
  • 用 Swift 编写面向协议的视图
  • No resource identifier found for attribute,RxJava之zip操作符
  • Java性能优化之JVM GC(垃圾回收机制)
  • #{} 和 ${}区别
  • (~_~)
  • (1)(1.13) SiK无线电高级配置(六)
  • (2022 CVPR) Unbiased Teacher v2
  • (27)4.8 习题课
  • (带教程)商业版SEO关键词按天计费系统:关键词排名优化、代理服务、手机自适应及搭建教程
  • (二)换源+apt-get基础配置+搜狗拼音
  • (附源码)springboot工单管理系统 毕业设计 964158
  • (篇九)MySQL常用内置函数
  • (一)UDP基本编程步骤
  • (原创)可支持最大高度的NestedScrollView
  • (转)大型网站架构演变和知识体系
  • (转载)(官方)UE4--图像编程----着色器开发
  • .NET 分布式技术比较
  • .NET6 命令行启动及发布单个Exe文件