当前位置: 首页 > news >正文

[k8s源码]7.indexer

FIFO

在reflector中定义了一个FIFO的存储。

  • items: 一个 map,用于存储实际的对象,键是对象的唯一标识符。
  • queue: 一个字符串切片,保存了对象键的有序列表,维护了插入顺序。
  • keyFunc: 一个函数,用于从对象生成唯一键。
  • lock: 用于保护并发访问的读写锁。
  • cond: 条件变量,用于在队列状态改变时发出通知。
func NewFIFO(keyFunc KeyFunc) *FIFO {f := &FIFO{items:   map[string]interface{}{},queue:   []string{},keyFunc: keyFunc,}f.cond.L = &f.lockreturn f
}

FIFO 和 Indexer 都是 Kubernetes 客户端库中的重要组件,但它们服务于不同的目的。FIFO 主要用于保持对象的处理顺序,而 Indexer 提供了更强大的查询和索引能力。在实际应用中,它们常常一起使用:FIFO 用于初始数据接收和排序,而 Indexer 用于后续的高效数据检索和查询。

// 创建 Indexer
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc,
})// 创建 DeltaFIFO
fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, indexer)// 创建 Reflector
reflector := cache.NewReflector(listerWatcher,&api.Pod{},fifo,0,
)// 启动 Reflector
go reflector.Run(stopCh)// 处理 FIFO 中的项目
for {item, shutdown := fifo.Pop(cache.PopProcessFunc(func(obj interface{}) error {// 处理逻辑indexer.Update(obj)return nil}))if shutdown {break}
}

Reflector 持续监听 API 服务器的变更。
变更被推送到 DeltaFIFO。
DeltaFIFO 保持这些变更的顺序。
处理函数从 DeltaFIFO 取出变更,更新 Indexer。
Indexer 维护最新的对象状态,并提供查询能力。

indexer demo

indexer提供存储和索引的能力。Indexer 使用 MetaNamespaceKeyFunc 为每个对象生成一个唯一的键。同时,Indexer 使用索引函数(如 "namespace" 和 "nodeName")来创建额外的索引。

package mainimport ("fmt"v1 "k8s.io/api/core/v1"metav1 "k8s.io/apimachinery/pkg/apis/meta/v1""k8s.io/client-go/tools/cache"
)func main() {// 创建 Indexerindexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc,cache.Indexers{"namespace": func(obj interface{}) ([]string, error) {pod, ok := obj.(*v1.Pod)if !ok {return []string{}, nil}return []string{pod.Namespace}, nil},},)// 添加 nodeName 索引err := indexer.AddIndexers(cache.Indexers{"nodeName": func(obj interface{}) ([]string, error) {pod, ok := obj.(*v1.Pod)if !ok {return []string{}, nil}return []string{pod.Spec.NodeName}, nil},})if err != nil {panic(err)}// 添加一些 Podpods := []*v1.Pod{{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default"},Spec:       v1.PodSpec{NodeName: "node1"},},{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "kube-system"},Spec:       v1.PodSpec{NodeName: "node1"},},{ObjectMeta: metav1.ObjectMeta{Name: "pod3", Namespace: "default"},Spec:       v1.PodSpec{NodeName: "node2"},},}for _, pod := range pods {err := indexer.Add(pod)if err != nil {fmt.Printf("Error adding pod: %v\n", err)}}// 使用 namespace 索引defaultPods, err := indexer.ByIndex("namespace", "default")if err != nil {panic(err)}fmt.Printf("Pods in default namespace: %d\n", len(defaultPods))// 使用 nodeName 索引node1Pods, err := indexer.ByIndex("nodeName", "node1")if err != nil {panic(err)}fmt.Printf("Pods on node1: %d\n", len(node1Pods))// 获取所有索引器allIndexers := indexer.GetIndexers()for indexName := range allIndexers {fmt.Printf("Index: %s\n", indexName)}
}

 这里创建的就是indexer(索引器),加入的“namespace”和“NodeName”就是索引index。

Store {// 主存储MainStorage: {"default/pod-1": Pod-1对象,"kube-system/pod-2": Pod-2对象,"default/pod-3": Pod-3对象},// Indices(索引集)Indices: {"namespace": {// Index(命名空间索引)"default": ["default/pod-1", "default/pod-3"],"kube-system": ["kube-system/pod-2"]},"nodeName": {// Index(节点名索引)"node-A": ["default/pod-1", "kube-system/pod-2"],"node-B": ["default/pod-3"]}}
}
 library indexer demo
package mainimport ("fmt"
)type Book struct {ID     stringTitle  stringAuthor stringGenre  string
}
type IndexFunc func(Book) []string
type Index map[string][]string
type Indexers map[string]IndexFunc
type Indices map[string]Indextype Library struct {books    map[string]Bookindexers Indexersindices  Indices
}func authorIndexFunc(book Book) []string {return []string{book.Author}
}
func genreIndexFunc(book Book) []string {return []string{book.Genre}
}
func NewLibrary() *Library {return &Library{books: make(map[string]Book),indexers: Indexers{"author": authorIndexFunc,"genre":  genreIndexFunc,},indices: make(Indices),}
}
func (l *Library) AddBook(book Book) {l.books[book.ID] = bookfor indexName, indexFunc := range l.indexers {indexvalues := indexFunc(book)for _, value := range indexvalues {if l.indices[indexName] == nil {l.indices[indexName] = make(Index)}l.indices[indexName][value] = append(l.indices[indexName][value], book.ID)}}
}
func (l *Library) FindBooksByAuthor(author string) []Book {bookIDs := l.indices["author"][author]var books []Bookfor _, id := range bookIDs {books = append(books, l.books[id])}return books
}
// AddIndexer 方法用于添加新的索引器
func (l *Library) AddIndexer(name string, indexFunc IndexFunc) {l.indexers[name] = indexFuncl.indices[name] = make(Index)// 为现有的书籍创建新的索引for _, book := range l.books {l.indexBook(name, book)}
}
func (l *Library) indexBook(indexName string, book Book) {indexFunc := l.indexers[indexName]indexValues := indexFunc(book)for _, value := range indexValues {l.indices[indexName][value] = append(l.indices[indexName][value], book.ID)}
}
func main() {// 创建一个新的 Librarylibrary := NewLibrary()// 添加一些书籍library.AddBook(Book{ID: "1", Title: "Harry Potter and the Philosopher's Stone", Author: "J.K. Rowling", Genre: "Fantasy"})library.AddBook(Book{ID: "2", Title: "Harry Potter and the Chamber of Secrets", Author: "J.K. Rowling", Genre: "Fantasy"})library.AddBook(Book{ID: "3", Title: "1984", Author: "George Orwell", Genre: "Dystopian"})// 测试 FindBooksByAuthor 方法rowlingBooks := library.FindBooksByAuthor("J.K. Rowling")fmt.Println("Books by J.K. Rowling:")for _, book := range rowlingBooks {fmt.Printf("- %s\n", book.Title)}orwellBooks := library.FindBooksByAuthor("George Orwell")fmt.Println("\nBooks by George Orwell:")for _, book := range orwellBooks {fmt.Printf("- %s\n", book.Title)}
}

通过这个例子,我们可以更好的理解indexer的代码。比如,下面代码是创建一个名叫library的结构体的代码,可以看到这里indexer也是type Indexers map[string]IndexFunc,然后这里也定义了type IndexFunc func(Book) []string,那么就可以将indexer定义为“author”:authorIndexFunc,查看index.go的源码,可以看到类似的type定义。

func NewLibrary() *Library {
    return &Library{
        books: make(map[string]Book),
        indexers: Indexers{
            "author": authorIndexFunc,
            "genre":  genreIndexFunc,
        },
        indices: make(Indices),
    }

type Indexer interface {Store// Index returns the stored objects whose set of indexed values// intersects the set of indexed values of the given object, for// the named indexIndex(indexName string, obj interface{}) ([]interface{}, error)// IndexKeys returns the storage keys of the stored objects whose// set of indexed values for the named index includes the given// indexed valueIndexKeys(indexName, indexedValue string) ([]string, error)// ListIndexFuncValues returns all the indexed values of the given indexListIndexFuncValues(indexName string) []string// ByIndex returns the stored objects whose set of indexed values// for the named index includes the given indexed valueByIndex(indexName, indexedValue string) ([]interface{}, error)// GetIndexers return the indexersGetIndexers() Indexers// AddIndexers adds more indexers to this store. This supports adding indexes after the store already has items.AddIndexers(newIndexers Indexers) error
}// IndexFunc knows how to compute the set of indexed values for an object.
type IndexFunc func(obj interface{}) ([]string, error)// IndexFuncToKeyFuncAdapter adapts an indexFunc to a keyFunc.  This is only useful if your index function returns
// unique values for every object.  This conversion can create errors when more than one key is found.  You
// should prefer to make proper key and index functions.
func IndexFuncToKeyFuncAdapter(indexFunc IndexFunc) KeyFunc {return func(obj interface{}) (string, error) {indexKeys, err := indexFunc(obj)if err != nil {return "", err}if len(indexKeys) > 1 {return "", fmt.Errorf("too many keys: %v", indexKeys)}if len(indexKeys) == 0 {return "", fmt.Errorf("unexpected empty indexKeys")}return indexKeys[0], nil}
}const (// NamespaceIndex is the lookup name for the most common index function, which is to index by the namespace field.NamespaceIndex string = "namespace"
)// MetaNamespaceIndexFunc is a default index function that indexes based on an object's namespace
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {meta, err := meta.Accessor(obj)if err != nil {return []string{""}, fmt.Errorf("object has no meta: %v", err)}return []string{meta.GetNamespace()}, nil
}// Index maps the indexed value to a set of keys in the store that match on that value
type Index map[string]sets.String// Indexers maps a name to an IndexFunc
type Indexers map[string]IndexFunc// Indices maps a name to an Index
type Indices map[string]Index

同理,indexer demo中,“nodeName”:func()和上面的“author”:authorIndexFunc是一个道理。

func main() {
    // 创建 Indexer
    indexer := cache.NewIndexer(
        cache.MetaNamespaceKeyFunc,
        cache.Indexers{
            "namespace": func(obj interface{}) ([]string, error) {
                pod, ok := obj.(*v1.Pod)
                if !ok {
                    return []string{}, nil
                }
                return []string{pod.Namespace}, nil
            },
        },
    )

    // 添加 nodeName 索引
    err := indexer.AddIndexers(cache.Indexers{
        "nodeName": func(obj interface{}) ([]string, error) {
            pod, ok := obj.(*v1.Pod)
            if !ok {
                return []string{}, nil
            }
            return []string{pod.Spec.NodeName}, nil
        },
    })
    if err != nil {
        panic(err)
    }

相关文章:

  • 设计模式13-单件模式
  • OS Copilot初体验的感受与心得
  • Boost搜索引擎:如何建立 用户搜索内容 与 网页文件内容 之间的关系
  • 某某物联rabbitmqhttp二轮充电桩协议充电协议对接
  • 【.NET】asp.net core 程序重启容器后redis无法连接,连接超时
  • mariadb安装centos再次踩坑
  • 数学建模学习(1)遗传算法
  • 数据库结构之b树
  • canvas:矢量点转栅格
  • Google Cloud Platform数据工程简介
  • 网页隐藏版之一行小说阅读器
  • Pycharm软件Win 64位安装包+详细安装步骤 百度云
  • Window下安装Zookeeper
  • MYSQL存储引擎InnoDB, MyISAM简介
  • 高精度-----乘法
  • [ JavaScript ] 数据结构与算法 —— 链表
  • 【Linux系统编程】快速查找errno错误码信息
  • ➹使用webpack配置多页面应用(MPA)
  • 2018以太坊智能合约编程语言solidity的最佳IDEs
  • Android组件 - 收藏集 - 掘金
  • AWS实战 - 利用IAM对S3做访问控制
  • - C#编程大幅提高OUTLOOK的邮件搜索能力!
  • ES学习笔记(10)--ES6中的函数和数组补漏
  • java2019面试题北京
  • JS函数式编程 数组部分风格 ES6版
  • linux安装openssl、swoole等扩展的具体步骤
  • php面试题 汇集2
  • Python学习之路16-使用API
  • Quartz初级教程
  • Spring Cloud中负载均衡器概览
  • Web设计流程优化:网页效果图设计新思路
  • 技术:超级实用的电脑小技巧
  • 如何使用 OAuth 2.0 将 LinkedIn 集成入 iOS 应用
  • 如何学习JavaEE,项目又该如何做?
  • 一起来学SpringBoot | 第三篇:SpringBoot日志配置
  •  一套莫尔斯电报听写、翻译系统
  • 用 vue 组件自定义 v-model, 实现一个 Tab 组件。
  • MyCAT水平分库
  • 大数据全解:定义、价值及挑战
  • ​​​​​​​​​​​​​​汽车网络信息安全分析方法论
  • # Python csv、xlsx、json、二进制(MP3) 文件读写基本使用
  • #java学习笔记(面向对象)----(未完结)
  • #中国IT界的第一本漂流日记 传递IT正能量# 【分享得“IT漂友”勋章】
  • (1)(1.11) SiK Radio v2(一)
  • (iPhone/iPad开发)在UIWebView中自定义菜单栏
  • (附源码)spring boot儿童教育管理系统 毕业设计 281442
  • (附源码)ssm基于jsp的在线点餐系统 毕业设计 111016
  • (附源码)计算机毕业设计SSM基于java的云顶博客系统
  • (附源码)计算机毕业设计SSM教师教学质量评价系统
  • (含笔试题)深度解析数据在内存中的存储
  • (全注解开发)学习Spring-MVC的第三天
  • (未解决)macOS matplotlib 中文是方框
  • (一)VirtualBox安装增强功能
  • (转)chrome浏览器收藏夹(书签)的导出与导入
  • (转)项目管理杂谈-我所期望的新人