Client-go Informer之 DeltaFIFO队列
Client-go Informer之 DeltaFIFO队列欢迎关注微信公众号“云原生手记”背景上一篇讲到reflector监控指定的k8s资源,当监控的资源发生变化时,将资源对象的变化存放到DeltaFIFO队列中。本篇的内容就是剖析DeltaFIFO队列,顺便再看下goland如何实现FIFO队列。队列client-go中有两个队列,一个是FIFO队列,另一个是DeltaFIFO队列。我们通
Client-go Informer之 DeltaFIFO队列
欢迎关注微信公众号“云原生手记”
背景
上一篇讲到reflector监控指定的k8s资源,当监控的资源发生变化时,将资源对象的变化存放到DeltaFIFO队列中。本篇的内容就是剖析DeltaFIFO队列,顺便再看下goland如何实现FIFO队列。
队列
client-go中有两个队列,一个是FIFO队列,另一个是DeltaFIFO队列。我们通过学习其中的FIFO队列来了解Golang语言中设计FIFO队列的基本技巧,而学习DeltaFIFO队列是深入理解Inform机制所需,为后面的文章打下基础。
看下client-go中是如何设计FIFO队列的,首先有2个Interface接口叫Store和Queue,Queue中包含了Store,而DeltaFIFO和FIFO都是queue接口的实现,Store接口的定义如下:
type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
Replace([]interface{}, string) error // 用于在List-watch机制中,从api-server那边list完后,要将对象数据放入DeltaFIFO队列
Resync() error // 用于定时同步,以免数据不一致
}
Queue接口如下:
type Queue interface {
Store
Pop(PopProcessFunc) (interface{}, error)
AddIfNotPresent(interface{}) error
HasSynced() bool
Close()
}
FIFO
FIFO,大家学过计算机课程的都应该知道,是 First In, First Out的缩写,意为先入先出,这个通常是队列的主要特性。
看下client-go中FIFO的结构体实现:
源码摘自:client-go/tools/cache/fifo.go
type FIFO struct { // store 接口的实现
lock sync.RWMutex // 读写锁 针对整个对象
cond sync.Cond // 条件变量
items map[string]interface{} // 存储key到元素对象的Map
queue []string // 队列索引,是个数组 保证有序
// 如果已经填充了Replace()插入的第一批项目,或者首先调用了Delete / Add / Update,则populated为true。
populated bool
// 是第一次调用Replace()插入的项目数
initialPopulationCount int
keyFunc KeyFunc //keyFunc像是个对对象的hash函数,获取对象Id
closed bool // 队列是否关闭
}
该队列该如何初始化:
func NewFIFO(keyFunc KeyFunc) *FIFO { // 新建FIFO队列时,只需传入keyFunc就行了,keyFunc就是对象的Hash函数,计算对象唯一的对象键用的
f := &FIFO{
items: map[string]interface{}{},
queue: []string{},
keyFunc: keyFunc,
}
f.cond.L = &f.lock
return f
}
下面我们看下队列的增删改以及pop数据等核心操作的源码。
1、插入元素
func (f *FIFO) Add(obj interface{}) error {
id, err := f.keyFunc(obj) // 拿到对象ID
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock() // 加锁
defer f.lock.Unlock()
f.populated = true // 设置标志位
if _, exists := f.items[id]; !exists { // 判断是否已存在
f.queue = append(f.queue, id) // 不存在,就放入queue数组的最后
}
f.items[id] = obj // 放入Map.,万一是重复的就是直接替换了
f.cond.Broadcast() // 广播元素入队了,等在在pop操作的协程可以去元素了
return nil
}
2、更新操作
就是使用了上面的Add方法
func (f *FIFO) Update(obj interface{}) error {
return f.Add(obj)
}
3、删除操作
func (f *FIFO) Delete(obj interface{}) error {
id, err := f.keyFunc(obj)// 获取对象的Key
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock() // 加锁
defer f.lock.Unlock()
f.populated = true
delete(f.items, id) // 直接从map中删除元素,那数组中的索引怎么办,pop取元素的时候有额外处理
return err
}
4、获取对象
获取的是该对象的最新更改
func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
key, err := f.keyFunc(obj) // 获取对象Key
if err != nil {
return nil, false, KeyError{obj, err}
}
return f.GetByKey(key)// 通过Key检查对象存不存在队列
}
func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
f.lock.RLock()
defer f.lock.RUnlock()
item, exists = f.items[key] // 从items中拿数据
return item, exists, nil
}
5、判断队列是否关闭
func (f *FIFO) IsClosed() bool {
f.lock.Lock()
defer f.lock.Unlock()
if f.closed { // 检查这个标志位
return true
}
return false
}
6、Pop函数,队列中取元素专有的函数
这边取元素的同时传入处理元素的函数process。
取出的对象是最新的。
func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for { // 一个循环,只在取到元素或者队列关闭时退出
for len(f.queue) == 0 {// 队列为空时,就一直等待
if f.closed { // 队列关闭,就退出循环
return nil, ErrFIFOClosed
}
f.cond.Wait() // 否则一直等待,直到广播通知队列有元素了;阻塞
}
id := f.queue[0] // 拿出队列首位
f.queue = f.queue[1:] // 队首元素出队后修正有序数组
if f.initialPopulationCount > 0 {
f.initialPopulationCount-- // 队列元素总数计数
}
item, ok := f.items[id]
if !ok {// 有可能已经被删除了,请见delete 函数,之前被删除的,就不管了
continue
}
delete(f.items, id) // 从Map中删除
err := process(item) // 用传进来处理函数process来处理出队的元素,要是处理失败,就再塞回队列
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
return item, err
}
}
7、替换队列元素
传入参数是list和资源版本
func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
items := make(map[string]interface{}, len(list)) // 初始化一个map充当之后的队列
for _, item := range list { // 遍历list
key, err := f.keyFunc(item) // 获取对象的Key
if err != nil {
return KeyError{item, err}
}
items[key] = item // 放入items中
}
f.lock.Lock() // 获取锁
defer f.lock.Unlock()
if !f.populated { // 未进行replace/add/update等操作
f.populated = true
f.initialPopulationCount = len(items)
}
f.items = items // 替换队列的所有元素
f.queue = f.queue[:0] // 删除队列的之前的排序
for id := range items {
f.queue = append(f.queue, id) // 重新录入排序
}
if len(f.queue) > 0 {// 排序数组有数据
f.cond.Broadcast()// 广播
}
return nil
}
8、重新同步
从代码上看,f.items中的Key可能和f.queue中所包含的Key不一致,所以需要重新同步,让两者在key上保持一致。网上的说法是保证不丢事件、数据同步并能及时响应事件。个人看法,觉得这种同步机制是必要的,但是同步频率需要把控好,不然会影响队列的效率吧。
func (f *FIFO) Resync() error {
f.lock.Lock() // 获取锁
defer f.lock.Unlock()
inQueue := sets.NewString() // 初始化是个Map map[string]Empty
for _, id := range f.queue { // 遍历索引数组
inQueue.Insert(id) // inQueue复制f.queue
}
for id := range f.items { // 遍历队列元素
if !inQueue.Has(id) { // items map中的可以在queue数组中不存在,就添加进去。
f.queue = append(f.queue, id) // 补足f.queue缺失的Id
}
}
if len(f.queue) > 0 {
f.cond.Broadcast() // 广播
}
return nil
}
上面就基本讲完了FIFO的实现。其实如果你项目中要自己实现FIFO,可以把这段抄进去直接使用,client-go都帮你验证过了,实际使用问题不大的,但是注意一点,就是client-go中的FIFO队列是针对对象的,重复对象添加是会覆盖的。要是你的应用不需要这个特性,就需要改改了。
DeltaFIFO
什么是 DeltaFIFO
FIFO的意思是先入先出,而Delta的意思是增量。合起来,DeltaFIFO可意为增量先入先出队列,就是该队列存储的数据是增量数据。这边补充下维基百科增量计算的概念:增量计算是一种软件功能 。当一部分的数据产生了变化,就仅对该产生变化的部分进行计算和更新,以节省计算时间。相比于简单地重复计算完整的输出内容,增量计算能够显著地节省计算时间。 比如,电子表格会在实现重计算功能时使用增量计算,只重新计算并更新那些含有公式且被直接或间接地改变了的单元格。我想这边的增量队列也是考虑到节省计算时间吧。那在client-go中什么是增量数据,看下源码中对于Delta的定义:
源码均摘自:client-go/tools/cache/delta_fifo.go
type Delta struct { // 记录对于对象的增量操作
Type DeltaType // 增量类型
Object interface{} // 对象
}
type DeltaType string // 增量类型是个String
// 有哪些增量类型呢,增删改,替换,和同步
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
Replaced DeltaType = "Replaced"
Sync DeltaType = "Sync"
)
所以所谓的DeltaFIFO就是一个装有Delta类型和对象数据的先入先出队列。
看下DeltaFIFO结构体的属性有哪些:
type DeltaFIFO struct {
lock sync.RWMutex // 读写锁,方便读操作的数据读取,锁粒度更细
cond sync.Cond // 条件变量,用于通知和阻塞
items map[string]Deltas //objectkey映射对象的增量数组
queue []string // 保证有序,里面会放入ObjectKey.从队列取数据时先从这个数组中拿key,再去items中拿对象
populated bool // 标记队列是否add/update/delete/replace过了。用处不明
initialPopulationCount int // 第一次replace的元素数量,用处不明
keyFunc KeyFunc // 相当于Hash函数,从一个object中计算出唯一的key
knownObjects KeyListerGetter // knownObjects是新建队列时传进来的,并在delete, replace,resync中被使用。是Indexer,是本地存储,就是list-watch后的对象数据要放入DeltaFIFO队列中,reflector会将数据从队列中取出并放入本地存储Indexer中。之后要是用户想获取哪个对象,就直接从本地存储Indexer中获取就行了,不用专门去请求api-server了
closed bool // 标记该队列是否关闭
emitDeltaTypeReplaced bool // Replace() 是否调用过的标记
}
在看完属性后,看下是如何创建DeltaFIFO队列的, 这边提供了两种方式,核心只用了NewDeltaFIFOWithOptions方法:
// 需要传入类似哈希函数的KeyFunc和KeyListerGetter,KeyListerGetter是个Indexer本地存储。后面的文章会讲
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{ // 调用了下面这个函数
KeyFunction: keyFunc,
KnownObjects: knownObjects,
})
}
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
if opts.KeyFunction == nil {
opts.KeyFunction = MetaNamespaceKeyFunc
}
// 开始封装DeltaFIFO
f := &DeltaFIFO{
items: map[string]Deltas{},
queue: []string{},
keyFunc: opts.KeyFunction,
knownObjects: opts.KnownObjects,
emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
}
f.cond.L = &f.lock // 设置条件变量
return f
}
整个DeltaFIFO队列的方法有很多,我主要讲几个核心方法:
1、添加操作
func (f *DeltaFIFO) Add(obj interface{}) error {
f.lock.Lock() // 获取写锁
defer f.lock.Unlock() // 释放写锁
f.populated = true // 设置标记位
return f.queueActionLocked(Added, obj)
}
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj) // 获取对象的唯一Key
if err != nil {
return KeyError{obj, err}
}
newDeltas := append(f.items[id], Delta{actionType, obj}) // 将新的对象增量操作放入Items中对象的增量数组中
newDeltas = dedupDeltas(newDeltas) // 返回修正后的增量数组,数组中的最后两个增量操作可能时一样的,这边需要删除重复的一个,一般重复的操作都是删除操作
if len(newDeltas) > 0 {
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id) // 入队
}
f.items[id] = newDeltas // 放好map
f.cond.Broadcast() // 广播通知,可能有协程在等待队列的元素,所以这边需要广播通知
} else { // 一般不会发生这种情况
delete(f.items, id) // 删除该Key
}
return nil
}
2、更新操作
func (f *DeltaFIFO) Update(obj interface{}) error {
f.lock.Lock() // 上写锁
defer f.lock.Unlock() // 解锁
f.populated = true
return f.queueActionLocked(Updated, obj)
}
这边的流程和添加操作是一样的,唯一的不同就是传入的操作类型是Updated
3、删除操作
基本逻辑:查看本地存储和队列中是否存在该对象,不存在就不继续删除操作了,存在,那就添加删除的增量操作。
func (f *DeltaFIFO) Delete(obj interface{}) error {
id, err := f.KeyOf(obj) // 获取object的唯一key
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock() // 上写锁
defer f.lock.Unlock() // 释放写锁
f.populated = true // 这是标记位
if f.knownObjects == nil { // 本地存储为空
if _, exists := f.items[id]; !exists {
return nil
}
} else { // 本地存储不为空
_, exists, err := f.knownObjects.GetByKey(id) // 从本地存储中查看对象是否存在
_, itemsExist := f.items[id] // 队列中对象是否存在
if err == nil && !exists && !itemsExist { // 本地存储不存在和队列中也不存在
// Presumably, this was deleted when a relist happened.
// Don't provide a second report of the same deletion.
return nil
}
}
// exist in items and/or KnownObjects
return f.queueActionLocked(Deleted, obj) // 这个之前讲过了,加入删除的增量操作
}
4、只添加不存在的添加操作
func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
deltas, ok := obj.(Deltas) // 转换成delta
if !ok {
return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
}
id, err := f.KeyOf(deltas.Newest().Object) // 获取Key
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock()
defer f.lock.Unlock()
f.addIfNotPresent(id, deltas)
return nil
}
func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
f.populated = true
if _, exists := f.items[id]; exists { // 对象id只要存在就不添加
return
}
f.queue = append(f.queue, id)
f.items[id] = deltas // 只添加之前未添加过的Key的对象
f.cond.Broadcast()
}
5、 从队列中取出元素 Pop
输入参数是PopProcessFunc函数,这个设计挺棒的,就是把处理元素的逻辑带进来,其他代码可以完全复用。这个Pop函数的使用是在SharedInformer ,后面的文章会讲到。
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock() // 这边尝试获取锁
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
if f.closed { // 队列关闭的话,就退出
return nil, ErrFIFOClosed
}
f.cond.Wait() // 没有数据就一直等待
}
id := f.queue[0] // 获取队首元素的key
f.queue = f.queue[1:] // 修正有序队列
if f.initialPopulationCount > 0 {
f.initialPopulationCount-- // 减1
}
item, ok := f.items[id] // 获取对象的增量数组,该对象所有的改变都在这了
if !ok { // 如果queue数组中的不存在于items中,说明该对象已经被删除了
// Item may have been deleted subsequently.
continue
}
delete(f.items, id) // 删除该id
err := process(item) // 这个设计技巧挺赞的,处理函数是动态传入的,方便解耦。
if e, ok := err.(ErrRequeue); ok {// 队列错误,就把元素再塞回去?!
f.addIfNotPresent(id, item) // 再塞回队列
err = e.Err
}
return item, err
}
}
6、替换操作
一般是在List-watch中的list后被使用的,将获取的数据存入队列。
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
f.lock.Lock() // 上锁
defer f.lock.Unlock()
keys := make(sets.String, len(list))
// keep backwards compat for old clients
action := Sync
if f.emitDeltaTypeReplaced {
action = Replaced
}
// Add Sync/Replaced action for each new item.
for _, item := range list {
key, err := f.KeyOf(item)// 获取Key
if err != nil {
return KeyError{item, err}
}
keys.Insert(key)
if err := f.queueActionLocked(action, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
if f.knownObjects == nil { // 这层逻辑和下面逻辑的区别是什么?
// Do deletion detection against our own list.
queuedDeletions := 0
for k, oldItem := range f.items { // 遍历队列里的元素
if keys.Has(k) {
continue
}
// Delete pre-existing items not in the new list.
// This could happen if watch deletion event was missed while
// disconnected from apiserver.
// 相当于一层补充机制,万一有对象已删除,但是没监控到,要做好同步
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
if !f.populated {
f.populated = true
// While there shouldn't be any queued deletions in the initial
// population of the queue, it's better to be on the safe side.
f.initialPopulationCount = len(list) + queuedDeletions
}
return nil
}
// 检查,要是还有对象未删除,就删除她
knownKeys := f.knownObjects.ListKeys()
queuedDeletions := 0
for _, k := range knownKeys {
if keys.Has(k) {
continue
}
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
if !f.populated {
f.populated = true
f.initialPopulationCount = len(list) + queuedDeletions
}
return nil
}
7、Resync操作
会被周期性调用,检查本地存储和队列中的数据是否一致
func (f *DeltaFIFO) Resync() error {
f.lock.Lock() // 获取写锁了
defer f.lock.Unlock()
if f.knownObjects == nil { // 本地存储为空,就退出
return nil
}
// 重新同步一次 Indexer 缓存数据到 Delta FIFO 队列中
keys := f.knownObjects.ListKeys() // 获取本地存储的key
for _, k := range keys {
if err := f.syncKeyLocked(k); err != nil {
return err
}
}
return nil
}
func (f *DeltaFIFO) syncKeyLocked(key string) error {
obj, exists, err := f.knownObjects.GetByKey(key) // 从本地存储拿对象
if err != nil {
klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
return nil
} else if !exists { // 本地存储没有该对象
klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
return nil // 退出
}
// 如果我们正在执行Resync(),并且已经有一个事件在排队等待该对象,那么我们将忽略该对象的Resync。
// 这是为了避免竞争,即重新同步带有object的先前值(因为将对象的事件排队不会触发更改底层store的
// 竞争)。
// key存在才进行的逻辑
id, err := f.KeyOf(obj) // 又获取key是什么意思,上面不是有Key了嘛,不理解,难道是因为本地存储的key值和这边的Key值计算方式一样? 不过这样做只是为了确保使用了正确的key。
if err != nil {
return KeyError{obj, err}
}
// key存在
if len(f.items[id]) > 0 { // 该key的增量记录不为0,就不需要更新了
return nil // 退出
}
// 该key的增量记录为空才做同步操作
if err := f.queueActionLocked(Sync, obj); err != nil { // 放入的增量类型是Sync
return fmt.Errorf("couldn't queue object: %v", err)
}
return nil
}
总结
本篇主要讲了client-go中FIFO队列的实现和DeltaFIFO队列的实现,相信大家对于如何实现FIFO已经有了了解,对于DeltaFIFO队列,在这看到的只是队列相关的操作,跟其他模块的互动比较少,可以看到的是DeltaFIFO队列用到了本地存储Indexer(对应代码中的knownObjects),可以从本地存储indexer中查数据,但是未涉及knownObjects的写入。我这里说下我自己对于deltafifo队列的理解,DeltaFIFO队列的作用到底是什么,直接使用FIFO队列有什么不好的地方吗?我们可以看到对于FIFO队列items中只存储对象的最新信息,而过程信息是没有的。反而DeltaFIFO队列会完美的保存对象变化的全过程信息,对于需要时刻感知变化过程和变化操作的应用场景,这种DeltaFIFO更合适。后面的文章将介绍本地存储Indexer,敬请期待。
更多推荐
所有评论(0)