第四课 k8s源码学习和二次开发-DeltaFIFO和Indexer原理学习
第四课 k8s源码学习和二次开发-DeltaFIFO和Indexer原理学习tags:k8s源码学习categories:源码学习二次开发文章目录第四课 k8s源码学习和二次开发-DeltaFIFO和Indexer原理学习第一节 DeltaFIFO学习1.1 Delta介绍1.2 FIFO介绍1.3 FIFO简单方法实现1.4 DeltaFIFO的实现1.5 DeltaFIFO和Reflector
第四课 k8s源码学习和二次开发-DeltaFIFO和Indexer原理学习
tags:
- k8s
- 源码学习
categories:
- 源码学习
- 二次开发
文章目录
第一节 DeltaFIFO学习
1.1 Delta介绍
- Reflector 中通过 ListAndWatch 获取到数据后传入到了本地的存储中,也就是 DeltaFIFO 中
- 从 DeltaFIFO 的名字可以看出它是一个 FIFO,也就是一个先进先出的队列,而Delta 表示的是变化的资源对象存储,包含操作资源对象的类型和数据,Reflector 就是这个队列的生产者。
- 了解Delta在client-go 中是如何定义的,Delta 的数据结构定义位于
staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
文件中。
// k8s.io/client-go/tools/cache/delta_fifo.go
// DeltaType 是变化的类型(添加、删除等)
type DeltaType string
// 变化的类型定义
const (
Added DeltaType = "Added" // 增加
Updated DeltaType = "Updated" // 更新
Deleted DeltaType = "Deleted" // 删除
// 当遇到 watch 错误,不得不进行重新list时,就会触发 Replaced。
// 我们不知道被替换的对象是否发生了变化。
//
// 注意:以前版本的 DeltaFIFO 也会对 Replace 事件使用 Sync。
// 所以只有当选项 EmitDeltaTypeReplaced 为真时才会触发 Replaced。
Replaced DeltaType = "Replaced"
// Sync 是针对周期性重新同步期间的合成事件
Sync DeltaType = "Sync" // 同步
)
// Delta 是 DeltaFIFO 存储的类型。
// 它告诉你发生了什么变化,以及变化后对象的状态。
// [*] 除非变化是删除操作,否则你将得到对象被删除前的最终状态。
type Delta struct {
Type DeltaType
Object interface{}
}
- Delta 其实就是 Kubernetes 系统中带有变化类型的资源对象,如下图所示:
- 比如我们现在添加了一个 Pod,那么这个 Delta 就是带有 Added 这个类型的 Pod,如果是删除了一个 Deployment,那么这个 Delta 就是带有 Deleted 类型的 Deployment,为什么要带上类型object, 因为我们需要根据不同的类型去执行不同的操作,增加、更新、删除的动作显然是不一样的。
1.2 FIFO介绍
- FIFO 很好理解,就是一个先进先出的队列,Reflector 是其生产者,其数据结构定义位于
staging/src/k8s.io/client-go/tools/cache/fifo.go
文件中。 - FIFO 数据结构中定义了** items 和 queue 两个属性**来保存队列中的数据。
- 其中 queue 中存的是资源对象的 key 列表,
- 而 items 是一个 map 类型,其 key 就是 queue 中保存的 key,value 值是真正的资源对象数据。
// k8s.io/client-go/tools/cache/fifo.go
type FIFO struct {
lock sync.RWMutex
cond sync.Cond
// items 中的每一个 key 也在 queue 中
items map[string]interface{}
queue []string
// 如果第一批 items 被 Replace() 插入或者先调用了 Deleta/Add/Update
// 则 populated 为 true。
populated bool
// 第一次调用 Replace() 时插入的 items 数
initialPopulationCount int
// keyFunc 用于生成排队的 item 插入和检索的 key。
keyFunc KeyFunc
// 标识队列已关闭,以便在队列清空时控制循环可以退出。
closed bool
closedLock sync.Mutex
}
var (
_ = Queue(&FIFO{}) // FIFO 是一个 Queue
)
- 既然是先进先出的队列,那么就要具有队列的基本功能,结构体下面其实就有一个类型断言,表示当前的 FIFO 实现了 Queue 这个接口,所以 FIFO 要实现的功能都是在 Queue 中定义的,Queue 接口和 FIFO 位于同一文件中:
// k8s.io/client-go/tools/cache/fifo.go
// Queue 扩展了 Store // with a collection of Store keys to "process".
// 每一次添加、更新或删除都可以将对象的key放入到该集合中。
// Queue 具有使用给定的 accumulator 来推导出相应的 key 的方法
// Queue 可以从多个 goroutine 中并发访问
// Queue 可以被关闭,之后 Pop 操作会返回一个错误
type Queue interface {
Store
// Pop 一直阻塞,直到至少有一个key要处理或队列被关闭,队列被关闭会返回一个错误。
// 在前面的情况下 Pop 原子性地选择一个 key 进行处理,从 Store 中删除关联(key、accumulator)的数据,
// 并处理 accumulator。Pop 会返回被处理的 accumulator 和处理的结果。
// PopProcessFunc 函数可以返回一个 ErrRequeue{inner},在这种情况下,Pop 将
//(a)把那个(key,accumulator)关联作为原子处理的一部分返回到 Queue 中
// (b) 从 Pop 返回内部错误。
Pop(PopProcessFunc) (interface{}, error)
// 仅当该 key 尚未与一个非空的 accumulator 相关联的时候,AddIfNotPresent 将给定的 accumulator 放入 Queue(与 accumulator 的 key 相关联的)
AddIfNotPresent(interface{}) error
// 如果第一批 keys 都已经 Popped,则 HasSynced 返回 true。
// 如果在添加、更新、删除之前发生了第一次 Replace 操作,则第一批 keys 为 true
// 否则为空。
HasSynced() bool
// 关闭该队列
Close()
}
- 从上面的定义中可以看出 Queue 这个接口扩展了 Store 这个接口,这个就是前面我们说的本地存储,队列实际上也是一种存储,然后在 Store 的基础上增加 Pop、AddIfNotPresent、HasSynced、Close 4个函数就变成了 Queue 队列了,所以我们优先来看下 Store 这个接口的定义,该数据结构定义位于文件
k8s.io/client-go/tools/cache/store.go
中:
// k8s.io/client-go/tools/cache/store.go
// Store 是一个通用的对象存储和处理的接口。
// Store 包含一个从字符串 keys 到 accumulators 的映射,并具有 to/from 当前
// 给定 key 关联的 accumulators 添加、更新和删除给定对象的操作。
// 一个 Store 还知道如何从给定的对象中获取 key,所以很多操作只提供对象。
//
// 在最简单的 Store 实现中,每个 accumulator 只是最后指定的对象,或者删除后为空,
// 所以 Store 只是简单的存储。
//
// Reflector 反射器知道如何 watch 一个服务并更新一个 Store 存储,这个包提供了 Store 的各种实现。
type Store interface {
// Add 将指定对象添加到与指定对象的 key 相关的 accumulator(累加器)中。
Add(obj interface{}) error
// Update 与指定对象的 key 相关的 accumulator 中更新指定的对象
Update(obj interface{}) error
// Delete 根据指定的对象 key 删除指定的对象
Delete(obj interface{}) error
// List 返回当前所有非空的 accumulators 的列表
List() []interface{}
// ListKeys 返回当前与非空 accumulators 关联的所有 key 的列表
ListKeys() []string
// Get 根据指定的对象获取关联的 accumulator
Get(obj interface{}) (item interface{}, exists bool, err error)
// GetByKey 根据指定的对象 key 获取关联的 accumulator
GetByKey(key string) (item interface{}, exists bool, err error)
// Replace 会删除原来Store中的内容,并将新增的list的内容存入Store中,即完全替换数据
// Store 拥有 list 列表的所有权,在调用此函数后,不应该引用它了。
Replace([]interface{}, string) error
// Resync 在 Store 中没有意义,但是在 DeltaFIFO 中有意义。
Resync() error
}
// KeyFunc 就是从一个对象中生成一个唯一的 Key 的函数,上面的 FIFO 中就有用到
type KeyFunc func(obj interface{}) (string, error)
// MetaNamespaceKeyFunc 是默认的 KeyFunc,生成的 key 格式为:
// <namespace>/<name>
// 如果是全局的,则namespace为空,那么生成的 key 就是 <name>
// 当然要从 key 拆分出 namespace 和 name 也非常简单
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
if key, ok := obj.(ExplicitKey); ok {
return string(key), nil
}
meta, err := meta.Accessor(obj)
if err != nil {
return "", fmt.Errorf("object has no meta: %v", err)
}
if len(meta.GetNamespace()) > 0 {
return meta.GetNamespace() + "/" + meta.GetName(), nil
}
return meta.GetName(), nil
}
- **Store 就是一个通用的对象存储和处理的接口,可以用来写入对象和获取对象。**其中 cache 数据结构就实现了上面的 Store 接口,但是这个属于后面的 Indexer 部分的知识点,这里我们就不展开说明了。
- 我们说 Queue 扩展了 Store 接口,所以 Queue 本身也是一个存储,只是在存储的基础上增加了 Pop 这样的函数来实现弹出对象,是不是就变成了一个队列了。
- FIFO 就是一个具体的 Queue 实现,按照顺序弹出对象是不是就是一个先进先出的队列了?如下图所示:
1.3 FIFO简单方法实现
- FIFO 是如何实现存储和 Pop 的功能的。首先是实现 Store 存储中最基本的方法,第一个就是添加对象:
// k8s.io/client-go/tools/cache/fifo.go
// Add 插入一个对象,将其放入队列中,只有当元素不在集合中时才会插入队列。
func (f *FIFO) Add(obj interface{}) error {
// 获取对象的 key
id, err := f.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
// 元素不在队列中的时候才插入队列
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
// items 是一个 map,所以直接赋值给这个 key,这样对更新元素也同样适用
f.items[id] = obj
f.cond.Broadcast()
return nil
}
- 更新对象,实现非常简单,因为上面的 Add 方法就包含了 Update 的实现,因为 items 属性是一个 Map,对象有更新直接将对应 key 的 value 值替换成新的对象即可:
// k8s.io/client-go/tools/cache/fifo.go
// Update 和 Add 相同的实现
func (f *FIFO) Update(obj interface{}) error {
return f.Add(obj)
}
- 接着就是删除 Delete 方法的实现,这里可能大家会有一个疑问,下面的删除实现只删除了 items 中的元素,那这样岂不是 queue 和 items 中的 key 会不一致。的确会这样,但是这是一个队列,下面的 Pop() 函数会根据 queue 里面的元素一个一个的弹出 key,没有对象就不处理了,相当于下面的 Pop() 函数中实现了 queue 的 key 的删除 :
// k8s.io/client-go/tools/cache/fifo.go
// Delete 从队列中移除一个对象。
// 不会添加到 queue 中去,这个实现是假设消费者只关心对象
// 不关心它们被创建或添加的顺序。
func (f *FIFO) Delete(obj interface{}) error {
// 获取对象的 key
id, err := f.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
// 删除 items 中 key 为 id 的元素,就是删除队列中的对象
delete(f.items, id)
//?为什么不直接处理 queue 这个 slice 呢?
return err
}
- 然后是获取队列中所有对象的 List 方法的实现:
// k8s.io/client-go/tools/cache/fifo.go
// List 获取队列中的所有对象
func (f *FIFO) List() []interface{} {
f.lock.RLock()
defer f.lock.RUnlock()
list := make([]interface{}, 0, len(f.items))
// 获取所有的items的values值(items是一个Map)
for _, item := range f.items {
list = append(list, item)
}
return list
}
- 然后是一个 Replace 替换函数的实现:
// k8s.io/client-go/tools/cache/fifo.go
// Replace 将删除队列中的内容,'f' 拥有 map 的所有权,调用该函数过后,不应该再引用 map。
// 'f' 的队列也会被重置,返回时,队列将包含 map 中的元素,没有特定的顺序。
func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
// 从 list 中提取出 key 然后和里面的元素重新进行映射
items := make(map[string]interface{}, len(list))
for _, item := range list {
key, err := f.keyFunc(item)
if err != nil {
return KeyError{item, err}
}
items[key] = item
}
f.lock.Lock()
defer f.lock.Unlock()
if !f.populated {
f.populated = true
f.initialPopulationCount = len(items)
}
// 重新设置 items 和 queue 的值
f.items = items
f.queue = f.queue[:0]
for id := range items {
f.queue = append(f.queue, id)
}
if len(f.queue) > 0 {
f.cond.Broadcast()
}
return nil
}
1.4 DeltaFIFO的实现
- 上面了解 了FIFO,下面看下 DeltaFIFO 是如何实现的,DeltaFIFO 和 FIFO 一样也是一个队列,但是也有不同的地方,里面的元素是一个 Delta的数组,Delta 上面我们已经提到表示的是带有变化类型的资源对象。
- DeltaFIFO 的数据结构定义位于
staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
文件中:
// k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
// lock/cond 保护访问的 items 和 queue
lock sync.RWMutex
cond sync.Cond
// 用来存储 Delta 数据 -> 对象key: Delta数组
items map[string]Deltas
// 用来存储资源对象的key
queue []string
// 通过 Replace() 接口将第一批对象放入队列,或者第一次调用增、删、改接口时标记为true
populated bool
// 通过 Replace() 接口(全量)将第一批对象放入队列的对象数量
initialPopulationCount int
// 对象键的计算函数
keyFunc KeyFunc
// knownObjects 列出 "known" 的键 -- 影响到 Delete(),Replace() 和 Resync()
// knownObjects 其实就是 Indexer,里面存有已知全部的对象
knownObjects KeyListerGetter
// 标记 queue 被关闭了
closed bool
closedLock sync.Mutex
// emitDeltaTypeReplaced 当 Replace() 被调用的时候,是否要 emit Replaced 或者 Sync
// DeltaType(保留向后兼容)。
emitDeltaTypeReplaced bool
}
// KeyListerGetter 任何知道如何列出键和按键获取对象的东西
type KeyListerGetter interface {
KeyLister
KeyGetter
}
// 获取所有的键
type KeyLister interface {
ListKeys() []string
}
// 根据键获取对象
type KeyGetter interface {
GetByKey(key string) (interface{}, bool, error)
}
- DeltaFIFO 与 FIFO 一样都是一个 Queue,所以他们都实现了 Queue,所以我们这里来看下 DeltaFIFO 是如何实现 Queue 功能的,当然和 FIFO 一样都是实现 Queue 接口里面的所有方法。
- 虽然实现流程和 FIFO 是一样的,但是具体的实现是不一样的,比如 DeltaFIFO 的对象键计算函数就不同:
// k8s.io/client-go/tools/cache/delta_fifo.go
// DeltaFIFO 的对象键计算函数
func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
// 用 Deltas 做一次转换,判断是否是 Delta 切片
if d, ok := obj.(Deltas); ok {
if len(d) == 0 {
return "", KeyError{obj, ErrZeroLengthDeltasObject}
}
// 使用最新版本的对象进行计算
obj = d.Newest().Object
}
if d, ok := obj.(DeletedFinalStateUnknown); ok {
return d.Key, nil
}
// 具体计算还是要看初始化 DeltaFIFO 传入的 KeyFunc 函数
return f.keyFunc(obj)
}
// Newest 返回最新的 Delta,如果没有则返回 nil。
func (d Deltas) Newest() *Delta {
if n := len(d); n > 0 {
return &d[n-1]
}
return nil
}
- DeltaFIFO 的计算对象键的函数为什么要先做一次 Deltas 的类型转换呢?那是因为 Pop() 出去的对象很可能还要再添加进来(比如处理失败需要再放进来),此时添加的对象就是已经封装好的 Deltas 对象了。
- 然后同样按照上面的方式来分析 DeltaFIFO 的实现,首先查看 Store 存储部分的实现,也就是增、删、改、查功能。
- 同样的 Add、Update 和 Delete 的实现方法基本上是一致的:
// k8s.io/client-go/tools/cache/delta_fifo.go
// Add 插入一个元素放入到队列中
func (f *DeltaFIFO) Add(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true // 队列第一次写入操作都要设置标记
return f.queueActionLocked(Added, obj)
}
// Update 和 Add 一样,只是是 Updated 一个 Delta
func (f *DeltaFIFO) Update(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true // 队列第一次写入操作都要设置标记
return f.queueActionLocked(Updated, obj)
}
// 删除和添加一样,但会产生一个删除的 Delta。如果给定的对象还不存在,它将被忽略。
// 例如,它可能已经被替换(重新list)删除了。
// 在这个方法中,`f.knownObjects` 如果不为nil,则提供(通过GetByKey)被认为已经存在的 _additional_ 对象。
func (f *DeltaFIFO) Delete(obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock()
defer f.lock.Unlock()
// 队列第一次写入操作都要设置这个标记
f.populated = true
// 相当于没有 Indexer 的时候,就通过自己的存储对象检查下
if f.knownObjects == nil {
if _, exists := f.items[id]; !exists {
// 自己的存储里面都没有,那也就不用处理了
return nil
}
} else
// 相当于 Indexer 里面和自己的存储里面都没有这个对象,那么也就相当于不存在了,就不处理了。
_, exists, err := f.knownObjects.GetByKey(id)
_, itemsExist := f.items[id]
if err == nil && !exists && !itemsExist {
return nil
}
}
// 同样调用 queueActionLocked 将数据放入队列
return f.queueActionLocked(Deleted, obj)
}
- 可以看出 Add 、Update、Delete 方法最终都是调用的
queueActionLocked
函数来实现:
// k8s.io/client-go/tools/cache/delta_fifo.go
// queueActionLocked 追加到对象的 delta 列表中。
// 调用者必须先 lock。
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj) // 获取对象键
if err != nil {
return KeyError{obj, err}
}
// 将 actionType 和资源对象 obj 构造成 Delta,添加到 items 中
newDeltas := append(f.items[id], Delta{actionType, obj})
// 去重
newDeltas = dedupDeltas(newDeltas)
if len(newDeltas) > 0 {
// 新对象的 key 不在队列中则插入 queue 队列
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
// 重新更新 items
f.items[id] = newDeltas
// 通知所有的消费者解除阻塞
f.cond.Broadcast()
} else {
// 这种情况不会发生,因为给定一个非空列表时,dedupDeltas 永远不会返回一个空列表。
// 但如果真的返回了一个空列表,那么我们就需要从 map 中删除这个元素。
delete(f.items, id)
}
return nil
}
// ==============排重==============
// 重新list和watch可以以任何顺序多次提供相同的更新。
// 如果最近的两个 Delta 相同,则将它们合并。
func dedupDeltas(deltas Deltas) Deltas {
n := len(deltas)
if n < 2 { // 小于两个 delta 没必要合并了
return deltas
}
// Deltas是[]Delta,新的对象是追加到Slice后面
// 所以取最后两个元素来判断是否相同
a := &deltas[n-1]
b := &deltas[n-2]
// 执行去重操作
if out := isDup(a, b); out != nil {
// 将去重保留下来的delta追加到前面n-2个delta中去
d := append(Deltas{}, deltas[:n-2]...)
return append(d, *out)
}
return deltas
}
// 判断两个 Delta 是否是重复的
func isDup(a, b *Delta) *Delta {
// 这个函数应该应该可以判断多种类型的重复,目前只有删除这一种能够合并
if out := isDeletionDup(a, b); out != nil {
return out
}
return nil
}
// 判断是否为删除类型的重复
func isDeletionDup(a, b *Delta) *Delta {
// 二者类型都是删除那肯定有一个是重复的,则返回一个即可
if b.Type != Deleted || a.Type != Deleted {
return nil
}
// 更复杂的检查还是这样就够了?
if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
return a
}
return b
}
- 因为系统对于删除的对象有
DeletedFinalStateUnknown
这个状态,所以会存在两次删除的情况,但是两次添加同一个对象由于 APIServer 可以保证对象的唯一性,所以这里没有考虑合并两次添加操作的情况。然后看看其他几个主要方法的实现:
// k8s.io/client-go/tools/cache/delta_fifo.go
// 列举接口实现
func (f *DeltaFIFO) List() []interface{} {
f.lock.RLock()
defer f.lock.RUnlock()
return f.listLocked()
}
// 真正的列举实现
func (f *DeltaFIFO) listLocked() []interface{} {
list := make([]interface{}, 0, len(f.items))
for _, item := range f.items {
list = append(list, item.Newest().Object)
}
return list
}
// 返回现在 FIFO 中所有的对象键。
func (f *DeltaFIFO) ListKeys() []string {
f.lock.RLock()
defer f.lock.RUnlock()
list := make([]string, 0, len(f.items))
for key := range f.items {
list = append(list, key)
}
return list
}
// 根据对象获取FIFO中对应的元素
func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
key, err := f.KeyOf(obj)
if err != nil {
return nil, false, KeyError{obj, err}
}
return f.GetByKey(key)
}
// 通过对象键获取FIFO中的元素(获取到的是 Delta 数组)
func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
f.lock.RLock()
defer f.lock.RUnlock()
d, exists := f.items[key]
if exists {
// 复制元素的slice,这样对这个切片的操作就不会影响返回的对象了。
d = copyDeltas(d)
}
return d, exists, nil
}
// copyDeltas 返回 d 的浅拷贝,也就是说它拷贝的是切片,而不是切片中的对象。
// Get/List 可以返回一个不会被后续修改影响的对象。
func copyDeltas(d Deltas) Deltas {
d2 := make(Deltas, len(d))
copy(d2, d)
return d2
}
// 判断队列是否关闭了
func (f *DeltaFIFO) IsClosed() bool {
f.closedLock.Lock()
defer f.closedLock.Unlock()
return f.closed
}
- 接下来我们来看看 Replace 函数的时候,这个也是 Store 里面的定义的接口:
// k8s.io/client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
f.lock.Lock()
defer f.lock.Unlock()
keys := make(sets.String, len(list))
// keep 对老客户端的向后兼容
action := Sync
if f.emitDeltaTypeReplaced {
action = Replaced
}
// 遍历 list
for _, item := range list {
// 计算对象键
key, err := f.KeyOf(item)
if err != nil {
return KeyError{item, err}
}
// 记录处理过的对象键,使用 set 集合存储
keys.Insert(key)
// 重新同步一次对象
if err := f.queueActionLocked(action, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
// 如果没有 Indexer 存储的话,自己存储的就是所有的老对象
// 目的要看看那些老对象不在全量集合中,那么就是删除的对象了
if f.knownObjects == nil {
// 针对自己的列表进行删除检测。
queuedDeletions := 0
// 遍历所有元素
for k, oldItem := range f.items {
// 如果元素在输入的对象中存在就忽略了。
if keys.Has(k) {
continue
}
// 到这里证明当前的 oldItem 元素不在输入的列表中,证明对象已经被删除了
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
}
queuedDeletions++
// 因为可能队列中已经存在 Deleted 类型的元素了,避免重复,所以采用 DeletedFinalStateUnknown 来包装下对象
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
// 如果 populated 没有设置,说明是第一次并且还没有任何修改操作执行过
if !f.populated {
// 这个时候需要标记下
f.populated = true
// 记录第一次设置的对象数量
f.initialPopulationCount = len(list) + queuedDeletions
}
return nil
}
// 检测已经删除但是没有在队列中的元素。
// 从 Indexer 中获取所有的对象键
knownKeys := f.knownObjects.ListKeys()
queuedDeletions := 0
for _, k := range knownKeys {
// 对象存在就忽略
if keys.Has(k) {
continue
}
// 到这里同样证明当前的对象键对应的对象被删除了
// 获取被删除的对象键对应的对象
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
// 累加删除的对象数量
queuedDeletions++
// 把对象删除的 Delta 放入队列,和上面一样避免重复,使用 DeletedFinalStateUnknown 包装下对象
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
// 和上面一致
if !f.populated {
f.populated = true
f.initialPopulationCount = len(list) + queuedDeletions
}
return nil
}
- Replace() 主要用于实现对象的全量更新,由于 DeltaFIFO 对外输出的就是所有目标的增量变化,所以每次全量更新都要判断对象是否已经删除,因为在全量更新前可能没有收到目标删除的请求。这一点与 cache 不同,cache 的Replace() 相当于重建,因为 cache 就是对象全量的一种内存映射,所以Replace() 就等于重建。接下来就是实现 DeltaFIFO 特性的 Pop 函数的实现了:
// k8s.io/client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
// 队列中是否有数据
for len(f.queue) == 0 {
// 如果队列关闭了这直接返回错误
if f.IsClosed() {
return nil, ErrFIFOClosed
}
// 没有数据就一直等待
f.cond.Wait()
}
// 取出第一个对象键
id := f.queue[0]
// 更新下queue,相当于把第一个元素弹出去了
f.queue = f.queue[1:]
// 对象计数减一,当减到0就说明外部已经全部同步完毕了
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
// 取出真正的对象,queue里面是对象键
item, ok := f.items[id]
if !ok {
// Item 可能后来被删除了。
continue
}
// 删除对象
delete(f.items, id)
// 调用处理对象的函数
err := process(item)
// 如果处理出错,那就重新入队列
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// 这里不需要 copyDeltas,因为我们要把所有权转移给调用者。
return item, err
}
}
- 然后再简单看下其他几个函数的实现:
// k8s.io/client-go/tools/cache/delta_fifo.go
// AddIfNotPresent 插入不存在的对象到队列中
func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
// 放入的必须是 Delta 数组,就是通过 Pop 弹出的对象
deltas, ok := obj.(Deltas)
if !ok {
return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
}
// 多个 Delta 都是同一个对象,所以用最新的来获取对象键即可
id, err := f.KeyOf(deltas.Newest().Object)
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock()
defer f.lock.Unlock()
// 调用真正的插入实现
f.addIfNotPresent(id, deltas)
return nil
}
// 插入对象的真正实现
func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
f.populated = true
// 如果对象已经存在了,则忽略
if _, exists := f.items[id]; exists {
return
}
// 不在队列中,则插入队列
f.queue = append(f.queue, id)
f.items[id] = deltas
// 通知消费者解除阻塞
f.cond.Broadcast()
}
// Resync 重新同步,带有 Sync 类型的 Delta 对象。
func (f *DeltaFIFO) Resync() error {
f.lock.Lock()
defer f.lock.Unlock()
// Indexer 为空,重新同步无意义
if f.knownObjects == nil {
return nil
}
// 获取 Indexer 中所有的对象键
keys := f.knownObjects.ListKeys()
// 循环对象键,为每个对象产生一个同步的 Delta
for _, k := range keys {
if err := f.syncKeyLocked(k); err != nil {
return err
}
}
return nil
}
// 对象同步接口的真正实现
func (f *DeltaFIFO) syncKeyLocked(key string) error {
// 获取 Indexer 中的对象
obj, exists, err := f.knownObjects.GetByKey(key)
if err != nil {
klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
return nil
} else if !exists {
klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
return nil
}
// 计算对象的键值,对象键不是已经传入了么?
// 其实传入的是存在 Indexer 里面的对象键,可能与这里的计算方式不同
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
// 对象已经在存在,说明后续会通知对象的新变化,所以再加更新也没意义
if len(f.items[id]) > 0 {
return nil
}
// 添加对象同步的这个 Delta
if err := f.queueActionLocked(Sync, obj); err != nil {
return fmt.Errorf("couldn't queue object: %v", err)
}
return nil
}
// HasSynced 如果 Add/Update/Delete/AddIfNotPresent 第一次被调用则会返回 true。
// 或者通过 Replace 插入的元素都已经 Pop 完成了,则也会返回 true。
func (f *DeltaFIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
// 同步就是全量内容已经进入 Indexer,Indexer 已经是系统中对象的全量快照了
// 相当于就是全量对象从队列中全部弹出进入 Indexer,证明已经同步完成了
return f.populated && f.initialPopulationCount == 0
}
// 关闭队列
func (f *DeltaFIFO) Close() {
f.closedLock.Lock()
defer f.closedLock.Unlock()
f.closed = true
f.cond.Broadcast()
}
- 这里是否已同步是根据
populated
和initialPopulationCount
这两个变量来判断的,是否同步指的是第一次从 APIServer 中获取全量的对象是否全部 Pop 完成,全局同步到了缓存中,也就是 Indexer 中去了,因为 Pop 一次initialPopulationCount
就会减1,当为0的时候就表示 Pop 完成了。
1.5 DeltaFIFO和Reflector过程
- 上面说了DeltaFIFO的实现,然后加上前面的 Reflector 反射器,就可以结合起来了:
- Reflector 通过ListAndWatch首先获取全量的资源对象数据,然后调用DeltaFIFO 的 Replace() 方法全量插入队列,然后后续通过 Watch 操作根据资源对象的操作类型调用 DeltaFIFO 的 Add、Update、Delete 方法,将数据更新到队列中。我们可以用下图来总结这两个组件之间的关系:
- 至于 Pop 出来的元素如何处理,就要看 Pop 的回调函数
PopProcessFunc
了。我们可以回到最初的 SharedInformer 中,在sharedIndexInformer
的 Run 函数中就初始化了 DeltaFIFO,也配置了用于 Pop 回调处理的函数:
// k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// 初始化 DeltaFIFO,这里就可以看出来 KnownObjects 就是一个 Indexer
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas, // 指定 Pop 函数的回调处理函数
}
......
}
// 真正的 Pop 回调处理函数
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
......
} else {
// 将对象添加到 Indexer 中
if err := s.indexer.Add(d.Object); err != nil {
return err
}
......
}
case Deleted:
// 删除 Indexer 中的对象
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
......
}
}
return nil
}
- 从上面可以看出DeltaFIFO中的元素被弹出来后被同步到了 Indexer 存储中,而在 DeltaFIFO 中的
KnownObjects
也就是这个指定的 Indexer,所以接下来我们就需要重点分析 Indexer 组件的实现了。
第二节 Indexer学习
2.1 Indexer概念说明
- 我们知道 DeltaFIFO 中的元素通过 Pop 函数弹出后,在指定的回调函数中将元素添加到了 Indexer 中。
- Indexer 是什么?字面意思是索引器,它就是 Informer 中的 LocalStore 部分,我们可以和数据库进行类比,数据库是建立在存储之上的,索引也是构建在存储之上,只是和数据做了一个映射,使得按照某些条件查询速度会非常快,所以说Indexer 本身也是一个存储,只是它在存储的基础上扩展了索引功能。从 Indexer 接口的定义可以证明这一点:
// k8s.io/client-go/tools/cache/indexer.go
// Indexer 使用多个索引扩展了 Store,并限制了每个累加器只能容纳当前对象
// 这里有3种字符串需要说明:
// 1. 一个存储键,在 Store 接口中定义(其实就是对象键)
// 2. 一个索引的名称(相当于索引分类名称)
// 3. 索引键,由 IndexFunc 生成,可以是一个字段值或从对象中计算出来的任何字符串
type Indexer interface {
Store // 继承了 Store 存储接口,所以说 Indexer 也是存储
// indexName 是索引类名称,obj 是对象,计算 obj 在 indexName 索引类中的索引键,然后通过索引键把所有的对象取出来
// 获取 obj 对象在索引类中的索引键相匹配的对象
Index(indexName string, obj interface{}) ([]interface{}, error)
// indexKey 是 indexName 索引分类中的一个索引键
// 函数返回 indexKey 指定的所有对象键 IndexKeys returns the storage keys of the stored objects whose
// set of indexed values for the named index includes the given
// indexed value
IndexKeys(indexName, indexedValue string) ([]string, error)
// ListIndexFuncValues returns all the indexed values of the given index
ListIndexFuncValues(indexName string) []string
// ByIndex returns the stored objects whose set of indexed values
// for the named index includes the given indexed value
ByIndex(indexName, indexedValue string) ([]interface{}, error)
// GetIndexer return the indexers
GetIndexers() Indexers
// 添加更多的索引在存储中
AddIndexers(newIndexers Indexers) error
}
- 在去查看 Indexer 的接口具体实现之前,我们需要了解 Indexer 中几个非常重要的概念:Indices、Index、Indexers 及 IndexFunc。
// k8s.io/client-go/tools/cache/indexer.go
// 用于计算一个对象的索引键集合
type IndexFunc func(obj interface{}) ([]string, error)
// 索引键与对象键集合的映射
type Index map[string]sets.String
// 索引器名称与 IndexFunc 的映射,相当于存储索引的各种分类
type Indexers map[string]IndexFunc
// 索引器名称与 Index 索引的映射
type Indices map[string]Index
4. 下面示例的索引数据如下所示:
// Indexers 就是包含的所有索引器(分类)以及对应实现
Indexers: {
"namespace": NamespaceIndexFunc,
"nodeName": NodeNameIndexFunc,
}
// Indices 就是包含的所有索引分类中所有的索引数据
Indices: {
"namespace": { //namespace 这个索引分类下的所有索引数据
"default": ["pod-1", "pod-2"], // Index 就是一个索引键下所有的对象键列表
"kube-system": ["pod-3"] // Index
},
"nodeName": { //nodeName 这个索引分类下的所有索引数据(对象键列表)
"node1": ["pod-1"], // Index
"node2": ["pod-2", "pod-3"] // Index
}
}
2.2 Indexer示例解释概念
- 这4个数据结构的命名非常容易让大家混淆,直接查看源码也不是那么容易的。这里我们来仔细解释下。首先什么叫索引,索引就是为了快速查找的,比如我们需要查找某个节点上的所有 Pod,那就让 Pod 按照节点名称排序列举出来,对应的就是 Index 这个类型,具体的就是
map[node]sets.pod
,但是如何去查找可以有多种方式,就是上面的 Indexers 这个类型的作用。我们可以用一个比较具体的示例来解释他们的关系和含义,如下所示:
package main
import (
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
)
const (
NamespaceIndexName = "namespace"
NodeNameIndexName = "nodeName"
)
func NamespaceIndexFunc(obj interface{}) ([]string, error) {
m, err := meta.Accessor(obj)
if err != nil {
return []string{""}, fmt.Errorf("object has no meta: %v", err)
}
return []string{m.GetNamespace()}, nil
}
func NodeNameIndexFunc(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, nil
}
return []string{pod.Spec.NodeName}, nil
}
func main() {
index := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{
NamespaceIndexName: NamespaceIndexFunc,
NodeNameIndexName: NodeNameIndexFunc,
})
pod1 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "index-pod-1",
Namespace: "default",
},
Spec: v1.PodSpec{NodeName: "node1"},
}
pod2 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "index-pod-2",
Namespace: "default",
},
Spec: v1.PodSpec{NodeName: "node2"},
}
pod3 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "index-pod-3",
Namespace: "kube-system",
},
Spec: v1.PodSpec{NodeName: "node2"},
}
_ = index.Add(pod1)
_ = index.Add(pod2)
_ = index.Add(pod3)
// ByIndex 两个参数:IndexName(索引器名称)和 indexKey(需要检索的key)
pods, err := index.ByIndex(NamespaceIndexName, "default")
if err != nil {
panic(err)
}
for _, pod := range pods {
fmt.Println(pod.(*v1.Pod).Name)
}
fmt.Println("==========================")
pods, err = index.ByIndex(NodeNameIndexName, "node2")
if err != nil {
panic(err)
}
for _, pod := range pods {
fmt.Println(pod.(*v1.Pod).Name)
}
}
// 输出结果为:
index-pod-1
index-pod-2
==========================
index-pod-2
index-pod-3
-
在上面的示例中首先通过 NewIndexer 函数实例化 Indexer 对象,
-
第一个参数就是用于计算资源对象键的函数,这里我们使用的是
MetaNamespaceKeyFunc
这个默认的对象键函数; -
第二个参数是 Indexers,也就是存储索引器,上面我们知道
Indexers
的定义为map[string]IndexFunc
,为什么要定义成一个 map 呢?我们可以类比数据库中,我们要查询某项数据,索引的方式是不是多种多样啊?为了扩展,Kubernetes 中就使用一个 map 来存储各种各样的存储索引器,至于存储索引器如何生成,就使用一个IndexFunc
暴露出去,给使用者自己实现即可。 -
我们定义的了两个索引键生成函数:
NamespaceIndexFunc
与NodeNameIndexFunc
,一个根据资源对象的命名空间来进行索引,一个根据资源对象所在的节点进行索引。然后定义了3个 Pod,前两个在 default 命名空间下面,另外一个在 kube-system 命名空间下面,然后通过index.Add
函数添加这3个 Pod 资源对象。然后通过index.ByIndex
函数查询在名为namespace
的索引器下面匹配索引键为default
的 Pod 列表。也就是查询 default 这个命名空间下面的所有 Pod,这里就是前两个定义的 Pod。 -
对上面的示例如果我们理解了,那么就很容易理解上面定义的4个数据结构了:
- IndexFunc:索引器函数,用于计算一个资源对象的索引值列表,上面示例是指定命名空间为索引值结果,当然我们也可以根据需求定义其他的,比如根据 Label 标签、Annotation 等属性来生成索引值列表。
- Index:存储数据,对于上面的示例,我们要查找某个命名空间下面的 Pod,那就要让 Pod 按照其命名空间进行索引,对应的 Index 类型就是
map[namespace]sets.pod
。 - Indexers:存储索引器,key 为索引器名称,value 为索引器的实现函数,上面的示例就是
map["namespace"]MetaNamespaceIndexFunc
。 - Indices:存储缓存器,key 为索引器名称,value 为缓存的数据,对于上面的示例就是
map["namespace"]map[namespace]sets.pod
。
-
可能最容易混淆的是 Indexers 和 Indices 这两个概念,因为平时很多时候我们没有怎么区分二者的关系,这里我们可以这样理解:dexers 是存储索引的,Indices 里面是存储的真正的数据(对象键,这样可能更好理解。
2.3 Indexer的原理-ThreadSafeMap
- 上面我们理解了 Indexer 中的几个重要的数据类型,下面我们来看下 Indexer 接口的具体实现 cache,位于文件
k8s.io/client-go/tools/cache/store.go
中:
// [k8s.io/client-go/tools/cache/store.go](http://k8s.io/client-go/tools/cache/store.go)
// cache 用一个 ThreadSafeStore 和一个关联的 KeyFunc 来实现 Indexer
type cache struct {
// cacheStorage 是一个线程安全的存储
cacheStorage ThreadSafeStore
// keyFunc 用于计算对象键
keyFunc KeyFunc
}
- 我们可以看到这个 cache 包含一个
ThreadSafeStore
的属性,这是一个并发安全的存储,因为是存储,所以自然就有存储相关的增、删、改、查等操作,Indexer 就是在 ThreadSafeMap 基础上进行封装的,实现了索引相关的功能。接下来我们先来看看 ThreadSafeStore 的定义,位于k8s.io/client-go/tools/cache/thread_safe_store.go
文件中:
type ThreadSafeStore interface {
Add(key string, obj interface{})
Update(key string, obj interface{})
Delete(key string)
Get(key string) (item interface{}, exists bool)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{}, string)
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexKey string) ([]string, error)
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
Resync() error
}
- 从接口的定义可以看出 ThreadSafeStore 和 Index 基本上差不多,但还是有一些区别的,这个接口是需要通过对象键来进行索引的。接下来我们来看看这个接口的具体实现 threadSafeMap 的定义:
// k8s.io/client-go/tools/cache/thread_safe_store.go
// threadSafeMap 实现了 ThreadSafeStore
type threadSafeMap struct {
lock sync.RWMutex
// 存储资源对象数据,key(对象键) 通过 keyFunc 得到
// 这就是真正存储的底层数据(对象键 -> 对象)
items map[string]interface{}
// indexers 索引分类与索引键函数的映射
indexers Indexers
// indices 通过索引可以快速找到对象键
indices Indices
}
- 不要把索引键和对象键搞混了,索引键是用于对象快速查找的;对象键是对象在存储中的唯一命名,对象是通过名字+对象的方式存储的。接下来我们来仔细看下接口的具体实现,首先还是比较简单的 Add、Delete、Update 几个函数的实现:
// k8s.io/client-go/tools/cache/thread_safe_store.go
// 添加对象
func (c *threadSafeMap) Add(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
// 获取老的对象
oldObject := c.items[key]
// 写入新的对象,items 中存的是 objKey -> obj 的映射
c.items[key] = obj
// 添加了新的对象,所以要更新索引
c.updateIndices(oldObject, obj, key)
}
// 更新对象,可以看到实现和 Add 是一样的
func (c *threadSafeMap) Update(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key]
c.items[key] = obj
c.updateIndices(oldObject, obj, key)
}
// 删除对象
func (c *threadSafeMap) Delete(key string) {
c.lock.Lock()
defer c.lock.Unlock()
// 判断对象是否存在,存在才执行删除操作
if obj, exists := c.items[key]; exists {
// 删除对象索引
c.deleteFromIndices(obj, key)
// 删除对象本身
delete(c.items, key)
}
}
- 可以看到基本的实现比较简单,就是添加、更新、删除对象数据后,然后更新或删除对应的索引,所以我们需要查看下更新或删除索引的具体实现:
// k8s.io/client-go/tools/cache/thread_safe_store.go
// updateIndices 更新索引
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
// 如果有旧的对象,需要先从索引中删除这个对象
if oldObj != nil {
c.deleteFromIndices(oldObj, key)
}
// 循环所有的索引器
for name, indexFunc := range c.indexers {
// 获取对象的索引键
indexValues, err := indexFunc(newObj)
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
// 得到当前索引器的索引
index := c.indices[name]
if index == nil {
// 没有对应的索引,则初始化一个索引
index = Index{}
c.indices[name] = index
}
// 循环所有的索引键
for _, indexValue := range indexValues {
// 得到索引键对应的对象键列表
set := index[indexValue]
if set == nil {
// 没有对象键列表则初始化一个空列表
set = sets.String{}
index[indexValue] = set
}
// 将对象键插入到集合中,方便索引
set.Insert(key)
}
}
}
// deleteFromIndices 删除对象索引
func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
// 循环所有的索引器
for name, indexFunc := range c.indexers {
// 获取删除对象的索引键列表
indexValues, err := indexFunc(obj)
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
// 获取当前索引器的索引
index := c.indices[name]
if index == nil {
continue
}
// 循环所有索引键
for _, indexValue := range indexValues {
// 获取索引键对应的对象键列表
set := index[indexValue]
if set != nil {
// 从对象键列表中删除当前要删除的对象键
set.Delete(key)
// 如果当集合为空的时候不删除set,那么具有高基数的短生命资源的 indices 会导致未使用的空集合随时间增加内存。
// `kubernetes/kubernetes/issues/84959`.
if len(set) == 0 {
delete(index, indexValue)
}
}
}
}
}
- 添加索引和删除索引的实现都挺简单的,其实主要还是要对 indices、indexs 这些数据结构非常了解,这样就非常容易了,我们可以将 indexFunc 当成当前对象的命名空间来看待,这样对于上面的索引更新和删除的理解就肯定没问题了。然后接下来就是几个查询相关的接口实现:
// k8s.io/client-go/tools/cache/thread_safe_store.go
// 获取对象
func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
c.lock.RLock() // 只需要读锁
defer c.lock.RUnlock()
// 直接从 map 中读取值
item, exists = c.items[key]
return item, exists
}
// 对象列举
func (c *threadSafeMap) List() []interface{} {
c.lock.RLock()
defer c.lock.RUnlock()
list := make([]interface{}, 0, len(c.items))
for _, item := range c.items {
list = append(list, item)
}
return list
}
// 返回 threadSafeMap 中所有的对象键列表
func (c *threadSafeMap) ListKeys() []string {
c.lock.RLock()
defer c.lock.RUnlock()
list := make([]string, 0, len(c.items))
for key := range c.items {
list = append(list, key)
}
return list
}
// 替换所有对象,相当于重新构建索引
func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
c.lock.Lock()
defer c.lock.Unlock()
// 直接覆盖之前的对象
c.items = items
// 重新构建索引
c.indices = Indices{}
for key, item := range c.items {
// 更新元素的索引
c.updateIndices(nil, item, key)
}
}
- 然后接下来就是和索引相关的几个接口实现,第一个就是 Index 函数:
// k8s.io/client-go/tools/cache/thread_safe_store.go
// 通过指定的索引器和对象获取符合这个对象特征的所有对象
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()
// 获得索引器 indexName 的索引键计算函数
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
// 获取指定 obj 对象的索引键
indexedValues, err := indexFunc(obj)
if err != nil {
return nil, err
}
// 获得索引器 indexName 的所有索引
index := c.indices[indexName]
// 用来存储对象键的集合
var storeKeySet sets.String
if len(indexedValues) == 1 {
// 大多数情况下只有一个值匹配(默认获取的索引键就是对象的 namespace)
// 直接拿到这个索引键的对象键集合
storeKeySet = index[indexedValues[0]]
} else {
// 由于有多个索引键,则可能有重复的对象键出现,索引需要去重
storeKeySet = sets.String{}
// 循环索引键
for _, indexedValue := range indexedValues {
// 循环索引键下面的对象键,因为要去重
for key := range index[indexedValue] {
storeKeySet.Insert(key)
}
}
}
// 拿到了所有的对象键集合过后,循环拿到所有的对象集合
list := make([]interface{}, 0, storeKeySet.Len())
for storeKey := range storeKeySet {
list = append(list, c.items[storeKey])
}
return list, nil
}
- 这个 Index 函数就是获取一个指定对象的索引键,然后把这个索引键下面的所有的对象全部获取到,比如我们要获取一个 Pod 所在命名空间下面的所有 Pod,如果更抽象一点,就是符合对象某些特征的所有对象,而这个特征就是我们指定的索引键函数计算出来的。然后接下来就是一个比较重要的 ByIndex 函数的实现:
// k8s.io/client-go/tools/cache/thread_safe_store.go
// 和上面的 Index 函数类似基本一样,只是是直接指定的索引键
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()
// 获得索引器 indexName 的索引键计算函数
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
// 获得索引器 indexName 的所有索引
index := c.indices[indexName]
// 获取指定索引键的所有所有对象键
set := index[indexedValue]
// 然后根据对象键遍历获取对象
list := make([]interface{}, 0, set.Len())
for key := range set {
list = append(list, c.items[key])
}
return list, nil
}
- 可以很清楚地看到 ByIndex 函数和 Index 函数比较类似,但是更简单了,直接获取一个指定的索引键的全部资源对象。然后是其他几个索引相关的函数:
// k8s.io/client-go/tools/cache/thread_safe_store.go
// IndexKeys 和上面的 ByIndex 几乎是一样的,只是这里是直接返回对象键列表
func (c *threadSafeMap) IndexKeys(indexName, indexedValue string) ([]string, error) {
c.lock.RLock()
defer c.lock.RUnlock()
// 获取索引器 indexName 的索引键计算函数
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
// 获取索引器 indexName 的所有索引
index := c.indices[indexName]
// 直接获取指定索引键的对象键集合
set := index[indexedValue]
return set.List(), nil
}
// 获取索引器下面的所有索引键
func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
c.lock.RLock()
defer c.lock.RUnlock()
// 获取索引器 indexName 的所有索引
index := c.indices[indexName]
names := make([]string, 0, len(index))
// 遍历索引得到索引键
for key := range index {
names = append(names, key)
}
return names
}
// 直接返回 indexers
func (c *threadSafeMap) GetIndexers() Indexers {
return c.indexers
}
// 添加一个新的 Indexers
func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
c.lock.Lock()
defer c.lock.Unlock()
if len(c.items) > 0 {
return fmt.Errorf("cannot add indexers to running index")
}
// 获取旧的索引器和新的索引器keys
oldKeys := sets.StringKeySet(c.indexers)
newKeys := sets.StringKeySet(newIndexers)
// 如果包含新的索引器,则提示冲突
if oldKeys.HasAny(newKeys.List()...) {
return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))
}
// 将新的索引器添加到 Indexers 中
for k, v := range newIndexers {
c.indexers[k] = v
}
return nil
}
// 没有真正实现 Resync 操作
func (c *threadSafeMap) Resync() error {
return nil
}
- 这里我们就将 ThreadSafeMap 的实现进行了分析说明。整体来说比较方便,一个就是将对象数据存入到一个 map 中,然后就是维护索引,方便根据索引来查找到对应的对象。
2.4 回看cache的实现原理
- 接下来再回过头去看cache 的实现就非常简单了,因为 cache 就是对 ThreadSafeStore 的一个再次封装,很多操作都是直接调用的
ThreadSafeStore
的操作实现的,如下所示:
// k8s.io/client-go/tools/cache/store.go
// Add 插入一个元素到 cache 中
func (c *cache) Add(obj interface{}) error {
key, err := c.keyFunc(obj) // 生成对象键
if err != nil {
return KeyError{obj, err}
}
// 将对象添加到底层的 ThreadSafeStore 中
c.cacheStorage.Add(key, obj)
return nil
}
// 更新cache中的对象
func (c *cache) Update(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
c.cacheStorage.Update(key, obj)
return nil
}
// 删除cache中的对象
func (c *cache) Delete(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
c.cacheStorage.Delete(key)
return nil
}
// 得到cache中所有的对象
func (c *cache) List() []interface{} {
return c.cacheStorage.List()
}
// 得到cache中所有的对象键
func (c *cache) ListKeys() []string {
return c.cacheStorage.ListKeys()
}
// 得到cache中的Indexers
func (c *cache) GetIndexers() Indexers {
return c.cacheStorage.GetIndexers()
}
// 得到对象obj与indexName索引器关联的所有对象
func (c *cache) Index(indexName string, obj interface{}) ([]interface{}, error) {
return c.cacheStorage.Index(indexName, obj)
}
func (c *cache) IndexKeys(indexName, indexKey string) ([]string, error) {
return c.cacheStorage.IndexKeys(indexName, indexKey)
}
func (c *cache) ListIndexFuncValues(indexName string) []string {
return c.cacheStorage.ListIndexFuncValues(indexName)
}
func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) {
return c.cacheStorage.ByIndex(indexName, indexKey)
}
func (c *cache) AddIndexers(newIndexers Indexers) error {
return c.cacheStorage.AddIndexers(newIndexers)
}
func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) {
key, err := c.keyFunc(obj)
if err != nil {
return nil, false, KeyError{obj, err}
}
return c.GetByKey(key)
}
func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error) {
item, exists = c.cacheStorage.Get(key)
return item, exists, nil
}
// 替换cache中所有的对象
func (c *cache) Replace(list []interface{}, resourceVersion string) error {
items := make(map[string]interface{}, len(list))
for _, item := range list {
key, err := c.keyFunc(item)
if err != nil {
return KeyError{item, err}
}
items[key] = item
}
c.cacheStorage.Replace(items, resourceVersion)
return nil
}
func (c *cache) Resync() error {
return nil
}
- 可以看到 cache 没有自己独特的实现方式,都是调用的包含的
ThreadSafeStore
操作接口。
2.5 Index的原理总结
- 前面我们已经知道了 Reflector 通过 ListAndWatch 把数据传入 DeltaFIFO 后,经过 DeltaFIFO 的 Pop 函数将资源对象存入到了本地的一个存储 Indexer 中,而这个底层真正的存储其实就是上面的 ThreadSafeStore。
- 要理解 Indexer 组件,最主要就是要把索引、索引器(索引分类)、索引键、对象键这几个概念弄清楚,有时候确实容易混乱,我们将上面的示例理解了应该就很好理解了,我们可以简单的理解为 Indexer 就是简单的把相同命名空间的对象放在一个集合中,然后基于命名空间来查找对象。
更多推荐
所有评论(0)