Client-go Informer之 DeltaFIFO队列

在这里插入图片描述
欢迎关注微信公众号“云原生手记”

背景

上一篇讲到reflector监控指定的k8s资源,当监控的资源发生变化时,将资源对象的变化存放到DeltaFIFO队列中。本篇的内容就是剖析DeltaFIFO队列,顺便再看下goland如何实现FIFO队列。
在这里插入图片描述

队列

client-go中有两个队列,一个是FIFO队列,另一个是DeltaFIFO队列。我们通过学习其中的FIFO队列来了解Golang语言中设计FIFO队列的基本技巧,而学习DeltaFIFO队列是深入理解Inform机制所需,为后面的文章打下基础。
看下client-go中是如何设计FIFO队列的,首先有2个Interface接口叫Store和Queue,Queue中包含了Store,而DeltaFIFO和FIFO都是queue接口的实现,Store接口的定义如下:

type Store interface {
	Add(obj interface{}) error
	Update(obj interface{}) error
	Delete(obj interface{}) error
	List() []interface{}
	ListKeys() []string
	Get(obj interface{}) (item interface{}, exists bool, err error)
	GetByKey(key string) (item interface{}, exists bool, err error)
	Replace([]interface{}, string) error // 用于在List-watch机制中,从api-server那边list完后,要将对象数据放入DeltaFIFO队列
	Resync() error // 用于定时同步,以免数据不一致
}

Queue接口如下:

type Queue interface {
	Store
	Pop(PopProcessFunc) (interface{}, error)
	AddIfNotPresent(interface{}) error
	HasSynced() bool
	Close()
}

FIFO

在这里插入图片描述

FIFO,大家学过计算机课程的都应该知道,是 First In, First Out的缩写,意为先入先出,这个通常是队列的主要特性。
看下client-go中FIFO的结构体实现:

源码摘自:client-go/tools/cache/fifo.go

type FIFO struct { // store 接口的实现
	lock sync.RWMutex // 读写锁 针对整个对象
	cond sync.Cond // 条件变量
	items map[string]interface{} // 存储key到元素对象的Map 
	queue []string // 队列索引,是个数组 保证有序
	// 如果已经填充了Replace()插入的第一批项目,或者首先调用了Delete / Add / Update,则populated为true。
	populated bool
	// 是第一次调用Replace()插入的项目数
	initialPopulationCount int
	keyFunc KeyFunc //keyFunc像是个对对象的hash函数,获取对象Id
	closed bool // 队列是否关闭
}

该队列该如何初始化:

func NewFIFO(keyFunc KeyFunc) *FIFO { // 新建FIFO队列时,只需传入keyFunc就行了,keyFunc就是对象的Hash函数,计算对象唯一的对象键用的
	f := &FIFO{
		items:   map[string]interface{}{},
		queue:   []string{},
		keyFunc: keyFunc,
	}
	f.cond.L = &f.lock
	return f
}

下面我们看下队列的增删改以及pop数据等核心操作的源码。
1、插入元素

func (f *FIFO) Add(obj interface{}) error {
	id, err := f.keyFunc(obj) // 拿到对象ID
	if err != nil {
		return KeyError{obj, err}
	}
	f.lock.Lock() // 加锁
	defer f.lock.Unlock()
	f.populated = true // 设置标志位
	if _, exists := f.items[id]; !exists { // 判断是否已存在
		f.queue = append(f.queue, id) // 不存在,就放入queue数组的最后
	}
	f.items[id] = obj // 放入Map.,万一是重复的就是直接替换了
	f.cond.Broadcast() // 广播元素入队了,等在在pop操作的协程可以去元素了
	return nil
}

2、更新操作
就是使用了上面的Add方法

func (f *FIFO) Update(obj interface{}) error {
	return f.Add(obj)
}

3、删除操作

func (f *FIFO) Delete(obj interface{}) error {
	id, err := f.keyFunc(obj)// 获取对象的Key
	if err != nil {
		return KeyError{obj, err}
	}
	f.lock.Lock() // 加锁
	defer f.lock.Unlock()
	f.populated = true
	delete(f.items, id) // 直接从map中删除元素,那数组中的索引怎么办,pop取元素的时候有额外处理
	return err
}

4、获取对象

获取的是该对象的最新更改

func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
	key, err := f.keyFunc(obj) // 获取对象Key
	if err != nil {
		return nil, false, KeyError{obj, err}
	}
	return f.GetByKey(key)// 通过Key检查对象存不存在队列
}
func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
	f.lock.RLock()
	defer f.lock.RUnlock()
	item, exists = f.items[key] // 从items中拿数据
	return item, exists, nil
}

5、判断队列是否关闭

func (f *FIFO) IsClosed() bool {
	f.lock.Lock()
	defer f.lock.Unlock()
	if f.closed { // 检查这个标志位
		return true
	}
	return false
}

6、Pop函数,队列中取元素专有的函数

这边取元素的同时传入处理元素的函数process。

取出的对象是最新的。

func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()
	for { // 一个循环,只在取到元素或者队列关闭时退出
		for len(f.queue) == 0 {// 队列为空时,就一直等待
			if f.closed { // 队列关闭,就退出循环
				return nil, ErrFIFOClosed
			}

			f.cond.Wait() // 否则一直等待,直到广播通知队列有元素了;阻塞
		}
		id := f.queue[0] // 拿出队列首位
		f.queue = f.queue[1:] // 队首元素出队后修正有序数组
		if f.initialPopulationCount > 0 {
			f.initialPopulationCount-- // 队列元素总数计数
		}
		item, ok := f.items[id]
		if !ok {// 有可能已经被删除了,请见delete 函数,之前被删除的,就不管了
			continue
		}
		delete(f.items, id) // 从Map中删除
		err := process(item) // 用传进来处理函数process来处理出队的元素,要是处理失败,就再塞回队列
		if e, ok := err.(ErrRequeue); ok {
			f.addIfNotPresent(id, item)
			err = e.Err
		}
		return item, err
	}
}

7、替换队列元素
传入参数是list和资源版本

func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
	items := make(map[string]interface{}, len(list)) // 初始化一个map充当之后的队列
	for _, item := range list { // 遍历list
		key, err := f.keyFunc(item) // 获取对象的Key
		if err != nil {
			return KeyError{item, err}
		}
		items[key] = item // 放入items中
	}

	f.lock.Lock() // 获取锁
	defer f.lock.Unlock()

	if !f.populated { // 未进行replace/add/update等操作
		f.populated = true
		f.initialPopulationCount = len(items)
	}

	f.items = items // 替换队列的所有元素
	f.queue = f.queue[:0] // 删除队列的之前的排序
	for id := range items {
		f.queue = append(f.queue, id) // 重新录入排序
	}
	if len(f.queue) > 0 {// 排序数组有数据
		f.cond.Broadcast()// 广播
	}
	return nil
}

8、重新同步
从代码上看,f.items中的Key可能和f.queue中所包含的Key不一致,所以需要重新同步,让两者在key上保持一致。网上的说法是保证不丢事件、数据同步并能及时响应事件。个人看法,觉得这种同步机制是必要的,但是同步频率需要把控好,不然会影响队列的效率吧。

func (f *FIFO) Resync() error {
	f.lock.Lock() // 获取锁
	defer f.lock.Unlock()

	inQueue := sets.NewString() // 初始化是个Map map[string]Empty
	for _, id := range f.queue { // 遍历索引数组
		inQueue.Insert(id) // inQueue复制f.queue
	}
	for id := range f.items { // 遍历队列元素
		if !inQueue.Has(id) { // items map中的可以在queue数组中不存在,就添加进去。
			f.queue = append(f.queue, id) // 补足f.queue缺失的Id
		}
	}
	if len(f.queue) > 0 {
		f.cond.Broadcast() // 广播
	}
	return nil
}

上面就基本讲完了FIFO的实现。其实如果你项目中要自己实现FIFO,可以把这段抄进去直接使用,client-go都帮你验证过了,实际使用问题不大的,但是注意一点,就是client-go中的FIFO队列是针对对象的,重复对象添加是会覆盖的。要是你的应用不需要这个特性,就需要改改了。

DeltaFIFO

什么是 DeltaFIFO

FIFO的意思是先入先出,而Delta的意思是增量。合起来,DeltaFIFO可意为增量先入先出队列,就是该队列存储的数据是增量数据。这边补充下维基百科增量计算的概念:增量计算是一种软件功能 。当一部分的数据产生了变化,就仅对该产生变化的部分进行计算和更新,以节省计算时间。相比于简单地重复计算完整的输出内容,增量计算能够显著地节省计算时间。 比如,电子表格会在实现重计算功能时使用增量计算,只重新计算并更新那些含有公式且被直接或间接地改变了的单元格。我想这边的增量队列也是考虑到节省计算时间吧。那在client-go中什么是增量数据,看下源码中对于Delta的定义:
源码均摘自:client-go/tools/cache/delta_fifo.go

type Delta struct { // 记录对于对象的增量操作
	Type   DeltaType // 增量类型
	Object interface{} // 对象
}
type DeltaType string // 增量类型是个String
// 有哪些增量类型呢,增删改,替换,和同步
const (
	Added   DeltaType = "Added"
	Updated DeltaType = "Updated"
	Deleted DeltaType = "Deleted"
	Replaced DeltaType = "Replaced"
	Sync DeltaType = "Sync"
)

所以所谓的DeltaFIFO就是一个装有Delta类型和对象数据的先入先出队列。
看下DeltaFIFO结构体的属性有哪些:

type DeltaFIFO struct {
	lock sync.RWMutex // 读写锁,方便读操作的数据读取,锁粒度更细
	cond sync.Cond // 条件变量,用于通知和阻塞
	items map[string]Deltas //objectkey映射对象的增量数组
	queue []string // 保证有序,里面会放入ObjectKey.从队列取数据时先从这个数组中拿key,再去items中拿对象
	populated bool // 标记队列是否add/update/delete/replace过了。用处不明
	initialPopulationCount int // 第一次replace的元素数量,用处不明
	keyFunc KeyFunc // 相当于Hash函数,从一个object中计算出唯一的key
	knownObjects KeyListerGetter // knownObjects是新建队列时传进来的,并在delete, replace,resync中被使用。是Indexer,是本地存储,就是list-watch后的对象数据要放入DeltaFIFO队列中,reflector会将数据从队列中取出并放入本地存储Indexer中。之后要是用户想获取哪个对象,就直接从本地存储Indexer中获取就行了,不用专门去请求api-server了
	closed bool // 标记该队列是否关闭
	emitDeltaTypeReplaced bool // Replace() 是否调用过的标记
}

在看完属性后,看下是如何创建DeltaFIFO队列的, 这边提供了两种方式,核心只用了NewDeltaFIFOWithOptions方法:

// 需要传入类似哈希函数的KeyFunc和KeyListerGetter,KeyListerGetter是个Indexer本地存储。后面的文章会讲
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
	return NewDeltaFIFOWithOptions(DeltaFIFOOptions{ // 调用了下面这个函数
		KeyFunction:  keyFunc,
		KnownObjects: knownObjects,
	})
}

func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
	if opts.KeyFunction == nil {
		opts.KeyFunction = MetaNamespaceKeyFunc
	}
    // 开始封装DeltaFIFO
	f := &DeltaFIFO{
		items:        map[string]Deltas{},
		queue:        []string{},
		keyFunc:      opts.KeyFunction,
		knownObjects: opts.KnownObjects,

		emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
	}
	f.cond.L = &f.lock // 设置条件变量
	return f
}

整个DeltaFIFO队列的方法有很多,我主要讲几个核心方法:
1、添加操作

func (f *DeltaFIFO) Add(obj interface{}) error {
	f.lock.Lock() // 获取写锁
	defer f.lock.Unlock() // 释放写锁
	f.populated = true // 设置标记位
	return f.queueActionLocked(Added, obj)
}
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
	id, err := f.KeyOf(obj) // 获取对象的唯一Key
	if err != nil {
		return KeyError{obj, err}
	}

	newDeltas := append(f.items[id], Delta{actionType, obj}) // 将新的对象增量操作放入Items中对象的增量数组中 
	newDeltas = dedupDeltas(newDeltas) // 返回修正后的增量数组,数组中的最后两个增量操作可能时一样的,这边需要删除重复的一个,一般重复的操作都是删除操作

	if len(newDeltas) > 0 {
		if _, exists := f.items[id]; !exists {
			f.queue = append(f.queue, id) // 入队
		}
		f.items[id] = newDeltas // 放好map
		f.cond.Broadcast() // 广播通知,可能有协程在等待队列的元素,所以这边需要广播通知
	} else { // 一般不会发生这种情况
		delete(f.items, id) // 删除该Key
	}
	return nil
}

2、更新操作

func (f *DeltaFIFO) Update(obj interface{}) error {
	f.lock.Lock() // 上写锁
	defer f.lock.Unlock() // 解锁
	f.populated = true
	return f.queueActionLocked(Updated, obj)
}

这边的流程和添加操作是一样的,唯一的不同就是传入的操作类型是Updated
3、删除操作
基本逻辑:查看本地存储和队列中是否存在该对象,不存在就不继续删除操作了,存在,那就添加删除的增量操作。

func (f *DeltaFIFO) Delete(obj interface{}) error {
	id, err := f.KeyOf(obj) // 获取object的唯一key
	if err != nil {
		return KeyError{obj, err}
	}
	f.lock.Lock() // 上写锁
	defer f.lock.Unlock() // 释放写锁
	f.populated = true // 这是标记位
	if f.knownObjects == nil { // 本地存储为空
		if _, exists := f.items[id]; !exists {
			return nil
		}
	} else { // 本地存储不为空
		_, exists, err := f.knownObjects.GetByKey(id) // 从本地存储中查看对象是否存在
		_, itemsExist := f.items[id] // 队列中对象是否存在
		if err == nil && !exists && !itemsExist { // 本地存储不存在和队列中也不存在
			// Presumably, this was deleted when a relist happened.
			// Don't provide a second report of the same deletion.
			return nil
		}
	}

	// exist in items and/or KnownObjects
	return f.queueActionLocked(Deleted, obj) // 这个之前讲过了,加入删除的增量操作
}

4、只添加不存在的添加操作

func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
	deltas, ok := obj.(Deltas) // 转换成delta
	if !ok {
		return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
	}
	id, err := f.KeyOf(deltas.Newest().Object) // 获取Key
	if err != nil {
		return KeyError{obj, err}
	}
	f.lock.Lock()
	defer f.lock.Unlock()
	f.addIfNotPresent(id, deltas)
	return nil
}
func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
	f.populated = true
	if _, exists := f.items[id]; exists { // 对象id只要存在就不添加
		return
	}

	f.queue = append(f.queue, id)
	f.items[id] = deltas // 只添加之前未添加过的Key的对象
	f.cond.Broadcast()
}

5、 从队列中取出元素 Pop
输入参数是PopProcessFunc函数,这个设计挺棒的,就是把处理元素的逻辑带进来,其他代码可以完全复用。这个Pop函数的使用是在SharedInformer ,后面的文章会讲到。

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock() // 这边尝试获取锁
	defer f.lock.Unlock()
	for {
		for len(f.queue) == 0 {
			if f.closed { // 队列关闭的话,就退出
				return nil, ErrFIFOClosed
			}

			f.cond.Wait() // 没有数据就一直等待
		}
		id := f.queue[0] // 获取队首元素的key
		f.queue = f.queue[1:] // 修正有序队列
		if f.initialPopulationCount > 0 {
			f.initialPopulationCount-- // 减1
		}
		item, ok := f.items[id] // 获取对象的增量数组,该对象所有的改变都在这了
		if !ok { // 如果queue数组中的不存在于items中,说明该对象已经被删除了
			// Item may have been deleted subsequently.
			continue
		}
		delete(f.items, id) // 删除该id
		err := process(item) // 这个设计技巧挺赞的,处理函数是动态传入的,方便解耦。
		if e, ok := err.(ErrRequeue); ok {// 队列错误,就把元素再塞回去?!
			f.addIfNotPresent(id, item) // 再塞回队列
			err = e.Err
		}
		return item, err
	}
}

6、替换操作
一般是在List-watch中的list后被使用的,将获取的数据存入队列。

func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
	f.lock.Lock() // 上锁
	defer f.lock.Unlock()
	keys := make(sets.String, len(list))

	// keep backwards compat for old clients
	action := Sync
	if f.emitDeltaTypeReplaced {
		action = Replaced
	}

	// Add Sync/Replaced action for each new item.
	for _, item := range list {
		key, err := f.KeyOf(item)// 获取Key
		if err != nil {
			return KeyError{item, err}
		}
		keys.Insert(key)
		if err := f.queueActionLocked(action, item); err != nil {
			return fmt.Errorf("couldn't enqueue object: %v", err)
		}
	}


	if f.knownObjects == nil { // 这层逻辑和下面逻辑的区别是什么?
		// Do deletion detection against our own list.
		queuedDeletions := 0
		for k, oldItem := range f.items { // 遍历队列里的元素
			if keys.Has(k) {
				continue
			}
			// Delete pre-existing items not in the new list.
			// This could happen if watch deletion event was missed while
			// disconnected from apiserver.
			// 相当于一层补充机制,万一有对象已删除,但是没监控到,要做好同步
			var deletedObj interface{}
			if n := oldItem.Newest(); n != nil {
				deletedObj = n.Object
			}
			queuedDeletions++
			if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
				return err
			}
		}

		if !f.populated {
			f.populated = true
			// While there shouldn't be any queued deletions in the initial
			// population of the queue, it's better to be on the safe side.
			f.initialPopulationCount = len(list) + queuedDeletions
		}

		return nil
	}

	//  检查,要是还有对象未删除,就删除她
	knownKeys := f.knownObjects.ListKeys()
	queuedDeletions := 0
	for _, k := range knownKeys {
		if keys.Has(k) {
			continue
		}

		deletedObj, exists, err := f.knownObjects.GetByKey(k)
		if err != nil {
			deletedObj = nil
			klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
		} else if !exists {
			deletedObj = nil
			klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
		}
		queuedDeletions++
		if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
			return err
		}
	}

	if !f.populated {
		f.populated = true
		f.initialPopulationCount = len(list) + queuedDeletions
	}

	return nil
}

7、Resync操作
会被周期性调用,检查本地存储和队列中的数据是否一致

func (f *DeltaFIFO) Resync() error {
	f.lock.Lock() // 获取写锁了
	defer f.lock.Unlock()

	if f.knownObjects == nil { // 本地存储为空,就退出
		return nil
	}
	// 重新同步一次 Indexer 缓存数据到 Delta FIFO 队列中
	keys := f.knownObjects.ListKeys() // 获取本地存储的key
	for _, k := range keys {
		if err := f.syncKeyLocked(k); err != nil {
			return err
		}
	}
	return nil
}

func (f *DeltaFIFO) syncKeyLocked(key string) error {
	obj, exists, err := f.knownObjects.GetByKey(key) // 从本地存储拿对象
	if err != nil {
		klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
		return nil
	} else if !exists { // 本地存储没有该对象
		klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
		return nil // 退出
	}

	// 如果我们正在执行Resync(),并且已经有一个事件在排队等待该对象,那么我们将忽略该对象的Resync。
	// 这是为了避免竞争,即重新同步带有object的先前值(因为将对象的事件排队不会触发更改底层store的
	//  竞争)。
	// key存在才进行的逻辑
	id, err := f.KeyOf(obj) // 又获取key是什么意思,上面不是有Key了嘛,不理解,难道是因为本地存储的key值和这边的Key值计算方式一样? 不过这样做只是为了确保使用了正确的key。
	if err != nil {
		return KeyError{obj, err}
	}
	// key存在
	if len(f.items[id]) > 0 { // 该key的增量记录不为0,就不需要更新了
		return nil // 退出
	}
	// 该key的增量记录为空才做同步操作
	if err := f.queueActionLocked(Sync, obj); err != nil { // 放入的增量类型是Sync
		return fmt.Errorf("couldn't queue object: %v", err)
	}
	return nil
}

总结

本篇主要讲了client-go中FIFO队列的实现和DeltaFIFO队列的实现,相信大家对于如何实现FIFO已经有了了解,对于DeltaFIFO队列,在这看到的只是队列相关的操作,跟其他模块的互动比较少,可以看到的是DeltaFIFO队列用到了本地存储Indexer(对应代码中的knownObjects),可以从本地存储indexer中查数据,但是未涉及knownObjects的写入。我这里说下我自己对于deltafifo队列的理解,DeltaFIFO队列的作用到底是什么,直接使用FIFO队列有什么不好的地方吗?我们可以看到对于FIFO队列items中只存储对象的最新信息,而过程信息是没有的。反而DeltaFIFO队列会完美的保存对象变化的全过程信息,对于需要时刻感知变化过程和变化操作的应用场景,这种DeltaFIFO更合适。后面的文章将介绍本地存储Indexer,敬请期待。
在这里插入图片描述

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐