当前位置: 首页 > news >正文

[k8s源码]6.reflector

Reflector 和 Informer 是 Kubernetes 客户端库中两个密切相关但职责不同的组件。Reflector 是一个较低级别的组件,主要负责与 Kubernetes API 服务器进行交互,执行资源的初始列表操作和持续的监视操作,将获取到的数据放入队列中。而 Informer 是一个更高级别的抽象,它内部使用了 Reflector,但提供了更全面的功能。Informer 不仅负责数据同步,还维护了资源对象的本地缓存,并提供了事件处理机制,允许开发者注册自定义的事件处理函数。

Informer 实际上通过一个称为 Controller 的内部组件来管理 Reflector。在这个架构中,Reflector 负责从 Kubernetes API 服务器获取数据并将其放入一个 DeltaFIFO queue。Controller 则从这个 queue 中取出数据,更新 Informer 的 LocalStore(这是一个持久化的缓存),并触发相应的事件处理器。这种设计允许 Informer 提供一个高级抽象,隐藏了底层的复杂性。虽然 Informer 不直接调用 Reflector 的方法,但它们通过 Controller 紧密集成。这种架构确保了各个组件职责单一:Reflector 专注于 API 交互,DeltaFIFO 管理变更队列,Controller 协调数据流,而 Informer 则提供高级 API 和缓存管理。这种设计使得开发者可以方便地使用 Informer,而无需直接处理 Reflector 或了解内部 queue 和 store 的细节。

以kubernetes源码中的reflector的一个测试为例:
执行这个test-reflector.go文件,这种test文件可以封装一些测试的代码,更好的帮助我们理解reflector的工作原理。


func TestReflectorListAndWatch(t *testing.T) {createdFakes := make(chan *watch.FakeWatcher)// The ListFunc says that it's at revision 1. Therefore, we expect our WatchFunc// to get called at the beginning of the watch with 1, and again with 3 when we// inject an error.expectedRVs := []string{"1", "3"}lw := &testLW{WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {rv := options.ResourceVersionfw := watch.NewFake()if e, a := expectedRVs[0], rv; e != a {t.Errorf("Expected rv %v, but got %v", e, a)}expectedRVs = expectedRVs[1:]// channel is not buffered because the for loop below needs to block. But// we don't want to block here, so report the new fake via a go routine.go func() { createdFakes <- fw }()return fw, nil},ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil},}s := NewFIFO(MetaNamespaceKeyFunc)r := NewReflector(lw, &v1.Pod{}, s, 0)go r.ListAndWatch(wait.NeverStop)ids := []string{"foo", "bar", "baz", "qux", "zoo"}var fw *watch.FakeWatcherfor i, id := range ids {if fw == nil {fw = <-createdFakes}sendingRV := strconv.FormatUint(uint64(i+2), 10)fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: sendingRV}})if sendingRV == "3" {// Inject a failure.fw.Stop()fw = nil}}// Verify we received the right ids with the right resource versions.for i, id := range ids {pod := Pop(s).(*v1.Pod)if e, a := id, pod.Name; e != a {t.Errorf("%v: Expected %v, got %v", i, e, a)}if e, a := strconv.FormatUint(uint64(i+2), 10), pod.ResourceVersion; e != a {t.Errorf("%v: Expected %v, got %v", i, e, a)}}if len(expectedRVs) != 0 {t.Error("called watchStarter an unexpected number of times")}
}

第一部分初始化一个通道createdFakes,初始化一个watcher,有两个方法,为watch和list。watch方法会创建watcher,并传递到createdFakes通道里面。这里的watchFunc是一个闭包:

闭包(Closure)是指一个函数可以捕获并“记住”其作用域外部的变量,即使这个变量在闭包函数的作用域之外。这种特性使得闭包能够访问和操作其外部函数中的变量。捕获外部变量:闭包可以捕获外部函数的局部变量,并且在闭包函数内使用这些变量,即使外部函数已经执行完毕。持久性:闭包会“记住”它捕获的变量,即使外部函数已经返回。状态共享:通过闭包,可以在不同的函数调用之间共享状态。

这里的fw是闭包里面创建的watcher,并不和外面的fw冲突,如果不理解,这里的fw也可以改为别的名字。

fw := watch.NewFake()
go func() { createdFakes <- fw }()

 然后初始化reflector

s := NewFIFO(MetaNamespaceKeyFunc)
r := NewReflector(lw, &v1.Pod{}, s, 0)
go r.ListAndWatch(wait.NeverStop)

然后下面是测试部分,首先为fw赋值为watcher,一开始为nil,而在上面r=NewReflector初始化的时候,就已经自动调用了watchFunc,创建了一个watcher。随后这个watcher被拿出来给了fw。sendingRV随机生成一个资源的版本,随着从ids拿出来的值,作为pod的属性用来创建pod,在测试中,fw.Add(...) 模拟了 Kubernetes API 服务器添加新 Pod 的行为。在实际环境中,当新 Pod 被创建时,Watch 操作会接收到这个事件。测试使用 fw.Add(...) 来模拟这个过程,向 FakeWatcher 发送一个新 Pod 被添加的事件。

 ids := []string{"foo", "bar", "baz", "qux", "zoo"}
    var fw *watch.FakeWatcher
    for i, id := range ids {
        if fw == nil {
            fw = <-createdFakes
        }
        sendingRV := strconv.FormatUint(uint64(i+2), 10)
        fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: sendingRV}})
        if sendingRV == "3" {
            // Inject a failure.
            fw.Stop()
            fw = nil
        }
    }

如果sendingRV=3,那么就会将fw停止并设为nil。这模拟了如果resourceVersion错误会发生什么,但是reflector已经设为neverStop,那么此时ListAndWatch会重新调用watchFunc,然后创建新的watcher,放入createdFakes中,从而fw可以重新拿取。 

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 开发面试算法题求教
  • Mojo模型魔法:动态定制特征转换的艺术
  • C#中栈和堆以及修饰符
  • 系统架构设计师教程 第3章 信息系统基础知识-3.8 典型信息系统架构模型-解读
  • Kafka Producer之事务性
  • VSCode STM32嵌入式开发插件记录
  • 计算机毕业设计hadoop+spark+hive物流大数据分析平台 仓储数据分析 物流预测系统 物流信息爬虫 物流大数据 机器学习 深度学习 知识图谱 大数据
  • 软件物料清单科普 | SBOM对开源管理的意义
  • Apache POI-Excel入门与实战
  • 数据仓库中的数据治理流程
  • 什么是离线语音识别芯片?与在线语音识别的区别
  • nfs和samba
  • 服务器上使用Docker部署sonarQube,并集成到Jenkins实现自动化。
  • 网站验证:确保网络安全与信任的重要步骤
  • C2W3.Assignment.Language Models: Auto-Complete.Part1
  • JavaScript 如何正确处理 Unicode 编码问题!
  • 【5+】跨webview多页面 触发事件(二)
  • 【跃迁之路】【519天】程序员高效学习方法论探索系列(实验阶段276-2018.07.09)...
  • angular2开源库收集
  • create-react-app做的留言板
  • LintCode 31. partitionArray 数组划分
  • maven工程打包jar以及java jar命令的classpath使用
  • PAT A1120
  • UEditor初始化失败(实例已存在,但视图未渲染出来,单页化)
  • windows下如何用phpstorm同步测试服务器
  • XML已死 ?
  • 不上全站https的网站你们就等着被恶心死吧
  • 分布式事物理论与实践
  • 看图轻松理解数据结构与算法系列(基于数组的栈)
  • 网页视频流m3u8/ts视频下载
  • 一个普通的 5 年iOS开发者的自我总结,以及5年开发经历和感想!
  • 硬币翻转问题,区间操作
  • media数据库操作,可以进行增删改查,实现回收站,隐私照片功能 SharedPreferences存储地址:
  • 微龛半导体获数千万Pre-A轮融资,投资方为国中创投 ...
  • ​ 无限可能性的探索:Amazon Lightsail轻量应用服务器引领数字化时代创新发展
  • ​比特币大跌的 2 个原因
  • ###51单片机学习(1)-----单片机烧录软件的使用,以及如何建立一个工程项目
  • #{} 和 ${}区别
  • $.ajax()方法详解
  • (pytorch进阶之路)扩散概率模型
  • (二)WCF的Binding模型
  • (附源码)计算机毕业设计ssm-Java网名推荐系统
  • (汇总)os模块以及shutil模块对文件的操作
  • (计算机网络)物理层
  • (十二)Flink Table API
  • (一)Kafka 安全之使用 SASL 进行身份验证 —— JAAS 配置、SASL 配置
  • (转)ABI是什么
  • (转)MVC3 类型“System.Web.Mvc.ModelClientValidationRule”同时存在
  • .net Application的目录
  • .NET C# 配置 Options
  • .Net mvc总结
  • .net 使用$.ajax实现从前台调用后台方法(包含静态方法和非静态方法调用)
  • .net 怎么循环得到数组里的值_关于js数组
  • .NET/C# 使用反射注册事件
  • .Net+SQL Server企业应用性能优化笔记4——精确查找瓶颈