参考https://github.com/cloudnativeto/sig-k8s-source-code/issues/11

一 DeltaFIFO队列为什么需要Resync

为什么需要 Resync 机制呢?因为在处理 SharedInformer 事件回调时,可能存在处理失败的情况,定时的 Resync 让这些处理失败的事件有了重新 onUpdate 处理的机会。
主要的目的是为了不丢数据,处理 resync 机制还有边缘触发与水平获取的设计,一起来保证不丢事件、数据同步并能及时响应事件。
在这里插入图片描述
图来自于《Programming Kubernetes》

二 代码实现

1. Resync

// 重新同步将为每个项目发送一个同步事件
func (f *DeltaFIFO) Resync() error {
	f.lock.Lock()
	defer f.lock.Unlock()

	if f.knownObjects == nil {
		return nil
	}
// ListKeys返回当前在FIFO中的对象的所有键的列表。
	keys := f.knownObjects.ListKeys()
	for _, k := range keys {
		if err := f.syncKeyLocked(k); err != nil {
			return err
		}
	}
	return nil
}

2. ListKeys

// ListKeys返回当前在FIFO中的对象的所有键的列表。
func (f *DeltaFIFO) ListKeys() []string {
	f.lock.RLock()
	defer f.lock.RUnlock()
	list := make([]string, 0, len(f.items))
	for key := range f.items {
		list = append(list, key)
	}
	return list
}

3. syncKeyLocked

func (f *DeltaFIFO) syncKeyLocked(key string) error {
	obj, exists, err := f.knownObjects.GetByKey(key)
	if err != nil {
		klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
		return nil
	} else if !exists {
		klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
		return nil
	}

	//如果我们正在执行Resync(),并且已经为该对象排队了一个事件,我们将忽略该对象的Resync。 
	//这是为了避免竞争,其中重新同步带有对象的先前值(因为对对象的事件进行排队不会触发更改基础存储<knownObjects>。
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	//如果队列中已存在 则不进行resync
	if len(f.items[id]) > 0 {
		return nil
	}

	if err := f.queueActionLocked(Sync, obj); err != nil {
		return fmt.Errorf("couldn't queue object: %v", err)
	}
	return nil
}

4. queueActionLocked

// queueActionLocked追加到对象的增量列表中。 呼叫者必须先锁定。
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	//初始化一个fifo 并且给我们对应的obj 添加进items
	//items map[string]Deltas
	newDeltas := append(f.items[id], Delta{actionType, obj})
	//去重
	newDeltas = dedupDeltas(newDeltas)
	//如果对应obj 的deltas 长度大于0
	if len(newDeltas) > 0 {
		//判断是否存在
		if _, exists := f.items[id]; !exists {
		//如果不存在先在队列中添加
			f.queue = append(f.queue, id)
		}
		//然后重新赋给items对应key 的value
		f.items[id] = newDeltas
		//唤醒监听在队列的goroutine
		f.cond.Broadcast()
	} else {
//我们需要将其从地图中删除(如果队列中没有其他项目,则它们将被忽略)。
		delete(f.items, id)
	}
	return nil
}

三 总结

resync => listkeys
   	   => synckeylocked => queueActionLocked

listkeys拿到当前在FIFO中的对象的所有键的列表
synckeylocked 判断需要做同步的obj 是否有事件在队列中 如果有丢弃,没有的话就调用queueactionlocked
queueactionlocked 主要是做队列的插入以及分发

Resync 机制的引入,定时将 Indexer 缓存事件重新同步到 Delta FIFO 队列中,在处理 SharedInformer 事件回调时,让处理失败的事件得到重新处理。并且通过入队前判断 FIFO 队列中是否已经有了更新版本的 event,来决定是否丢弃 Indexer 缓存不进行 Resync 入队。在处理 Delta FIFO 队列中的 Resync 的事件数据时,触发 onUpdate 回调来让事件重新处理。

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐