当前位置: 首页 > news >正文

informer中的WorkQueue机制的实现分析与源码解读(1)

背景

client-go中的workqueue包里主要有三个队列,分别是普通队列Queue,延时队列DelayingQueue,限速队列RateLimitingQueue,后一个队列以前一个队列的实现为基础,层层添加新功能。

workqueue是整个client-go源码的重点和难点。采用层层拨开分步理解有助于理解workqueue的源码。本文重点在从源码角度了解下workqueue的add(),get(),done()方法执行的过程。关于延时队列与限速队列是如何实现的后面再单独讨论。

workqueue源码分析

代码结构

源码位于:vendor/k8s.io//client-go/util/workqueue/queue.go

queue类型的定义

下面是queue类型定义。其中queue、dirty、processing 都保存 items。它们的区别是:

  • queue是有序列表用来存储 item 的处理顺序。

  • dirty集合存储的是所有需要处理的 item,是set类型,无序,用于保证items的唯一。dirty的字面意思就是需要被处理的数据。

  • processing集合存储的是当前正在处理的 item,也是set类型,无序,用于保证items的唯一。

// Type is a work queue (see the package comment).
type Type struct {// queue defines the order in which we will work on items. Every// element of queue should be in the dirty set and not in the// processing set.queue []t     // 是一个切片保证对象入队的顺序性。每个在queue队列的对象,必须同时也在dirty集合。// dirty defines all of the items that need to be processed.dirty set    // 是一个set集合,保证对象的唯一性。存放需要被处理的对象// Things that are currently being processed are in the processing set.// These things may be simultaneously in the dirty set. When we finish// processing something and remove it from this set, we'll check if// it's in the dirty set, and if so, add it to the queue.processing set     // 也是一个set集合,保证对象的唯一性cond *sync.Cond    // 安全处理队列里面的对象shuttingDown bool    // 队列是否处理关闭中metrics queueMetrics // 用于计数统计unfinishedWorkUpdatePeriod time.Durationclock                      clock.Clock
}

要理解workqueue是的工作机制,必须要了解queue队列3个重要的方法,Add,Get,Done。接下来展开分析。

Add方法

informer机制中,当一个对象从DeltaFIFO队列中pod弹出后,会转到AddEventHandler事件处理函数处理,AddEventHandler需要调用workqueue的Add方法,先把对象加入队列,等下用户任务来处理。

Add方法是将item加入队列q.queue和待处理集合q.dirty。若该item正在被处理只加入q.dirty。

// Add marks item as needing processing.
func (q *Type) Add(item interface{}) {q.cond.L.Lock()defer q.cond.L.Unlock()// 如果队列处于shuttdingDown状态就返回if q.shuttingDown {return}// 如果dirty里面有这个item,就返回if q.dirty.has(item) {return}// 添加到metric用于计数q.metrics.add(item)// 插入到dirty队列q.dirty.insert(item)// 如果processing队列已经有这个item,就返回if q.processing.has(item) {return}// 如果processing队列没有这个item,就加入queue队列q.queue = append(q.queue, item)// 发信号让其他goroutine处理q.cond.Signal()
}

在执行Add()添加对象到workqueue时,主要有三种场景,如下图所示

场景一:当3个队列都没有这个对象时,对象插入到queue和dirty

场景二:当某个对象已经加入到了队列,但还未开始被处理时,就直接返回不再加入队列。

场景三:当某个对象处于”处理中“状态,也就是位于processing队列中时,会把元素加入到dirty队列。当处理完,执行Done()方法后,item会被重新加入queue队列。

Get方法

Get方法是从 queue队列中取出一个元素item加入正处理集合q.processing,并从queue队列中删除,从dirty中删除。

// Get blocks until it can return an item to be processed. If shutdown = true,
// the caller should end their goroutine. You must call Done with item when you
// have finished processing it.
func (q *Type) Get() (item interface{}, shutdown bool) {q.cond.L.Lock()defer q.cond.L.Unlock()// 如果queue队列为空,并且队列不是处于shuttingDown状态就阻塞等待Add()对象到队列for len(q.queue) == 0 && !q.shuttingDown {q.cond.Wait()}// 如果queue队列为空了,而且队列处于shuttingDown状态,就返回空值if len(q.queue) == 0 {// We must be shutting down.return nil, true}// 从queue队列弹出一个元素item, q.queue = q.queue[0], q.queue[1:]// 计数q.metrics.get(item)// 插入到processing队列q.processing.insert(item)q.dirty.delete(item)return item, false
}

Get方法的图示

Done方法

Done方法是表明这个元素item被处理完了,从processing队列删除。这里加了一个判断,如果dirty中还存在,还要将其加入 queue队列。

// Done marks item as done processing, and if it has been marked as dirty again
// while it was being processed, it will be re-added to the queue for
// re-processing.
func (q *Type) Done(item interface{}) {q.cond.L.Lock()defer q.cond.L.Unlock()q.metrics.done(item)// 直接从processing队列删除q.processing.delete(item)// 如果dirty队列里面还有这个对象(通常是处理元素过程中,对象再次入队了),就将元素从新加到queue队列。if q.dirty.has(item) {q.queue = append(q.queue, item)q.cond.Signal()}
}

在执行Add()添加对象到workqueue时,主要有二种场景,如下图所示

场景一:对象item完成处理,并且处理过程中该对象没有再次入队

场景二:对象item在处理过程中,还没处理完之前,这个对象又入队被加入了dirty队列(也就是此时执行了Add方法)。当执行Done()后,item会被重新添加(re-add)到queue队列

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • Netty的几种IO模式的实现与切换
  • Flask基础教程(第一阶段)
  • JAVA—面向对象编程高级
  • 《死侍与金刚狼》票房飘红! 目前全球票房总票房$7亿,预计可达$12亿,全球排名跃居第二!
  • 数据集相关类代码回顾理解 | sns.distplot\%matplotlib inline\sns.scatterplot
  • 【redis 第八篇章】链表结构
  • 新增道路查询后的最短距离
  • YOLOv8添加注意力模块并测试和训练
  • 【VS Code】launch.json与tasks.json
  • Java 并发编程:volatile 关键字介绍与使用
  • 【系统架构设计师】二十四、安全架构设计理论与实践④
  • 安装ubuntu server24.04系统
  • 浅谈 Spring AOP框架 (1)
  • 数据湖之Hudi
  • Java 技巧:将整数每一位数字转换为数组
  • 11111111
  • Android Volley源码解析
  • Laravel Mix运行时关于es2015报错解决方案
  • Laravel5.4 Queues队列学习
  • LeetCode刷题——29. Divide Two Integers(Part 1靠自己)
  • miaov-React 最佳入门
  • October CMS - 快速入门 9 Images And Galleries
  • orm2 中文文档 3.1 模型属性
  • Python学习之路16-使用API
  • react-core-image-upload 一款轻量级图片上传裁剪插件
  • -- 查询加强-- 使用如何where子句进行筛选,% _ like的使用
  • 对超线程几个不同角度的解释
  • 多线程 start 和 run 方法到底有什么区别?
  • 更好理解的面向对象的Javascript 1 —— 动态类型和多态
  • 前端每日实战 2018 年 7 月份项目汇总(共 29 个项目)
  • 以太坊客户端Geth命令参数详解
  • 白色的风信子
  • ​插件化DPI在商用WIFI中的价值
  • # dbt source dbt source freshness命令详解
  • # Kafka_深入探秘者(2):kafka 生产者
  • #include到底该写在哪
  • #pragma once与条件编译
  • $emit传递多个参数_PPC和MIPS指令集下二进制代码中函数参数个数的识别方法
  • (BFS)hdoj2377-Bus Pass
  • (Charles)如何抓取手机http的报文
  • (第三期)书生大模型实战营——InternVL(冷笑话大师)部署微调实践
  • (附源码)spring boot北京冬奥会志愿者报名系统 毕业设计 150947
  • (附源码)计算机毕业设计SSM智慧停车系统
  • (强烈推荐)移动端音视频从零到上手(下)
  • (学习日记)2024.04.04:UCOSIII第三十二节:计数信号量实验
  • (转)使用VMware vSphere标准交换机设置网络连接
  • (转载)深入super,看Python如何解决钻石继承难题
  • **PHP二维数组遍历时同时赋值
  • *p++,*(p++),*++p,(*p)++区别?
  • .NET Core 发展历程和版本迭代
  • .NET Core 中的路径问题
  • .NET 编写一个可以异步等待循环中任何一个部分的 Awaiter
  • .NET序列化 serializable,反序列化
  • .Net中间语言BeforeFieldInit
  • @Autowired多个相同类型bean装配问题