当前位置: 首页 > news >正文

[k8s源码]8.deltaFIFO

deltaFIFO 

DeltaFIFO: 这是一个特殊类型的队列,它结合了FIFO(先进先出)队列的特性和增量(Delta)处理的能力。DeltaFIFO 中是按顺序存储的,但它们不必严格按照发生的顺序逐个处理。这种设计提供了处理的灵活性和优化的机会,允许控制器根据实际需求选择最有效的处理策略。这是 DeltaFIFO 设计的一个重要特性,使其能够高效地处理复杂的资源变化场景。

 Delta 实际上是一个结构体(struct),它在 Kubernetes 的 client-go 库中定义。

type Delta struct {Type   DeltaTypeObject interface{}
}

interface{} 允许 Delta 结构体存储任何类型的 Kubernetes 对象,不仅仅是 Pod。这使得 Delta 结构体可以用于所有类型的 Kubernetes 资源。 

type DeltaFIFO struct {// 用于存储对象键的队列queue []string// 存储每个键对应的 Delta 列表items map[string][]Delta// 其他字段,如锁、条件变量等
}

 

 DeltaType 的可能值:
const (
    Added   DeltaType = "Added"
    Updated DeltaType = "Updated"
    Deleted DeltaType = "Deleted"
    // 可能还有其他类型,如 Sync
)
可以看到图中一个key对应一个delta值,当需要使用存储的对象时,通常需要进行类型断言:

if pod, ok := delta.Object.(*v1.Pod); ok {// 使用 pod 对象
}
// 存储 Pod
podDelta := Delta{Type: Added,Object: &v1.Pod{Metadata: metav1.ObjectMeta{Name: "mypod"}},
}// 存储 Service
serviceDelta := Delta{Type: Updated,Object: &v1.Service{Metadata: metav1.ObjectMeta{Name: "myservice"}},
}
controller消费 

Controller 通过调用 DeltaFIFO 的 Pop 方法来消费队列中的项目。这个方法通常在控制器的主循环中被调用。

func (c *Controller) processNextItem() bool {obj, shutdown := c.queue.Get()if shutdown {return false}defer c.queue.Done(obj)err := func(obj interface{}) error {deltas, ok := obj.(cache.Deltas)if !ok {return fmt.Errorf("expected cache.Deltas, got %v", obj)}for _, delta := range deltas {switch delta.Type {case cache.Added:// 首先更新 Indexerif err := c.indexer.Add(delta.Object); err != nil {return err}// 然后调用事件处理函数c.handleAddition(delta.Object)case cache.Updated:// 首先更新 Indexerif err := c.indexer.Update(delta.Object); err != nil {return err}// 然后调用事件处理函数c.handleUpdate(delta.Object)case cache.Deleted:// 首先更新 Indexerif err := c.indexer.Delete(delta.Object); err != nil {return err}// 然后调用事件处理函数c.handleDeletion(delta.Object)}}return nil}(obj)if err != nil {utilruntime.HandleError(err)c.queue.AddRateLimited(obj)return true}c.queue.Forget(obj)return true
}

初始化阶段:
创建 Informer(通常是 SharedInformer)
创建 Controller
将 Controller 的事件处理函数注册到 Informer
数据流动:
a. API 服务器 -> Reflector:
Reflector 通过 client-go API 监听 Kubernetes API 服务器
获取资源对象(如 Pod)的变化
b. Reflector -> DeltaFIFO:
Reflector 将这些变化(Delta)放入 DeltaFIFO 队列
c. DeltaFIFO -> Controller:
Controller 的 processLoop 方法从 DeltaFIFO 队列中取出数据
使用 Pop 方法,该方法包含一个 process 回调函数
d. Controller -> Indexer:
process 回调函数处理 Delta
更新 Indexer(本地缓存)
e. Controller -> 事件处理:
调用相应的事件处理函数(如 OnAdd, OnUpdate, OnDelete)

func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error {s.blockDeltas.Lock()defer s.blockDeltas.Unlock()if deltas, ok := obj.(Deltas); ok {return processDeltas(s, s.indexer, deltas, isInInitialList)}return errors.New("object given as Process argument is not Deltas")
}// Multiplexes updates in the form of a list of Deltas into a Store, and informs
// a given handler of events OnUpdate, OnAdd, OnDelete
func processDeltas(// Object which receives event notifications from the given deltashandler ResourceEventHandler,clientState Store,deltas Deltas,isInInitialList bool,
) error {// from oldest to newestfor _, d := range deltas {obj := d.Objectswitch d.Type {case Sync, Replaced, Added, Updated:if old, exists, err := clientState.Get(obj); err == nil && exists {if err := clientState.Update(obj); err != nil {return err}handler.OnUpdate(old, obj)} else {if err := clientState.Add(obj); err != nil {return err}handler.OnAdd(obj, isInInitialList)}case Deleted:if err := clientState.Delete(obj); err != nil {return err}handler.OnDelete(obj)}}return nil
}

Resync机制会将Indexer本地存储中的资源对象同步到DeltaFIFO中,并将这些资源对象设置为Sync的操作类型。Resync函数在Reflector中定时执行,它的执行周期由NewReflector函数传入的resyncPeriod参数设定。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • Vue Router基础
  • BSV区块链在人工智能时代的数字化转型中的角色
  • 【快速实践 OpenCV morphology】形态学操作:腐蚀、膨胀、开运算、闭运算
  • 无人机飞行姿态俯仰、横滚、偏航、油门详解
  • vite+react+ts+Rust来进行前后端web开发(hello world)
  • 后端返回一个图片链接,前端如何实现下载功能?
  • 零基础入门:创建一个简单的Python爬虫管理系统
  • 杰发科技AC7840——SENT数据解析及软件Sent发送的实现
  • 【Node.js基础04】包的理解与使用
  • 如何使用 API list 极狐GitLab 容器镜像仓库中的 tag?
  • SVN文件夹没有图标(绿钩子和红感叹号)
  • 【C# WInForm】将TextBox从输入框设置为文本框
  • Nginx笔记(一)
  • 在Mac M1上面使用Dockerfile打x86_64镜像
  • nng协议nni_taskq_sys_init(void) 对nni_taskq_systq 初始化
  • (十五)java多线程之并发集合ArrayBlockingQueue
  • 【vuex入门系列02】mutation接收单个参数和多个参数
  • 【从零开始安装kubernetes-1.7.3】2.flannel、docker以及Harbor的配置以及作用
  • Centos6.8 使用rpm安装mysql5.7
  • Django 博客开发教程 16 - 统计文章阅读量
  • Dubbo 整合 Pinpoint 做分布式服务请求跟踪
  • JAVA多线程机制解析-volatilesynchronized
  • Just for fun——迅速写完快速排序
  • mongo索引构建
  • MySQL的数据类型
  • Python 反序列化安全问题(二)
  • python学习笔记-类对象的信息
  • SAP云平台运行环境Cloud Foundry和Neo的区别
  • vue的全局变量和全局拦截请求器
  • 包装类对象
  • 关于Flux,Vuex,Redux的思考
  • 利用阿里云 OSS 搭建私有 Docker 仓库
  • 聊一聊前端的监控
  • 使用docker-compose进行多节点部署
  • 小程序01:wepy框架整合iview webapp UI
  • 学习ES6 变量的解构赋值
  • Java数据解析之JSON
  • ​人工智能之父图灵诞辰纪念日,一起来看最受读者欢迎的AI技术好书
  • #pragma multi_compile #pragma shader_feature
  • #中国IT界的第一本漂流日记 传递IT正能量# 【分享得“IT漂友”勋章】
  • ${factoryList }后面有空格不影响
  • ()、[]、{}、(())、[[]]命令替换
  • (1)Hilt的基本概念和使用
  • (八十八)VFL语言初步 - 实现布局
  • (附源码)spring boot校园拼车微信小程序 毕业设计 091617
  • (四)Linux Shell编程——输入输出重定向
  • (已解决)什么是vue导航守卫
  • .class文件转换.java_从一个class文件深入理解Java字节码结构
  • .MSSQLSERVER 导入导出 命令集--堪称经典,值得借鉴!
  • .net core webapi 部署iis_一键部署VS插件:让.NET开发者更幸福
  • .net core 调用c dll_用C++生成一个简单的DLL文件VS2008
  • .Net 基于MiniExcel的导入功能接口示例
  • .NET命令行(CLI)常用命令
  • .NET周刊【7月第4期 2024-07-28】
  • /var/log/cvslog 太大