client-go之tools/cache包源码分析
tools/cache包源码分析tools/cache包controller.godelta_fifo.goexpiration_cache.gofifo.goheap.goindex.golister.golistwatch.gomutation_cache.gomutation_detector.goreflector.goshared_informer.gostore.gothread_sa
·
tools/cache包源码分析
tools/cache包
k8s认证的工具类(目前只有关于exec方式的工具类)
controller.go
- 接口
// Controller 由 Config 参数化生成并在 sharedIndexInformer 中使用的低级控制器. type Controller interface { // 运行做两件事。一件是构造并运行一个反射器将对象/通知从 Config 的 ListerWatcher转化到 Config 的DeltaFIFO队列,并可能在该队列上调用偶尔的重新同步。 // 另一件事是重复从队列中pop并使用 Config 的 ProcessFunc 进行处理。这两个一直持续到 `stopCh` 关闭. Run(stopCh <-chan struct{}) // HasSynced 委托给其中config的DeltaFIFO队列 HasSynced() bool // LastSyncResourceVersion 当有其存在时委托给反射器,否则返回空字符串 LastSyncResourceVersion() string } // ResourceEventHandler 可以处理发生在资源上的事件的通知. 事件仅供参考,因此您不能返回错误. // 注意: 处理程序不得修改接收到的对象;这不仅涉及顶级结构,还涉及从中可以访问的所有数据结构。 // * 在添加对象时调用 OnAdd。 // * OnUpdate 在对象被修改时被调用。请注意,oldObj 是对象的最后一个已知状态 -- 有可能将多个更改组合在一起,因此您无法使用它来查看每个更改。 // OnUpdate 也会在重新列表发生时被调用,即使没有发生任何变化,它也会被调用。这对于定期评估或同步某些内容很有用。 // * 如果项目已知,OnDelete 将获得该项目的最终状态,否则它将获得 DeletedFinalStateUnknown 类型的对象。 // 如果watch关闭并错过删除事件并且我们没有注意到删除,直到随后的重新列出,就会发生这种情况。 type ResourceEventHandler interface { OnAdd(obj interface{}) OnUpdate(oldObj, newObj interface{}) OnDelete(obj interface{}) }
- 函数
// New 从给定的config创建一个新的controller. func New(c *Config) Controller { ctlr := &controller{ config: *c, clock: &clock.RealClock{}, } return ctlr } // DeletionHandlingMetaNamespaceKeyFunc 在调用 MetaNamespaceKeyFunc 之前检查 DeletedFinalStateUnknown 对象. func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) { // 检查obj是否是DeletedFinalStateUnknown(本文下面分析delta_fifo.go会做分析)对象 if d, ok := obj.(DeletedFinalStateUnknown); ok { // 返回 return d.Key, nil } // 详情见本文中store.go中的MetaNamespaceKeyFunc函数,用来生成DeltaFIFO中的map中的key return MetaNamespaceKeyFunc(obj) } // NewInformer 返回一个 Store (indexer) 和一个用于填充 store的控制器controller同时还提供事件通知。 // 注意: 您应该只将返回的Store 用于 Get/List 操作;添加/修改/删除将导致事件通知出错。 // // 参数: // * lw 是您想要通知的资源来源的列表和监视功能。 // * objType 是您希望接收的类型的对象。 // * resyncPeriod:如果非零,将经常重新列出(即使没有任何更改,您也会收到 OnUpdate调用)。 // 否则,重新列表将延迟尽可能长的时间(直到上游源关闭监视或超时,或您停止控制器)。 // * h 用来处理DeltaFIFO pop出来的item. // func NewInformer( lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, h ResourceEventHandler, ) (Store, Controller) { // 正如我们所知,这将保持客户端状态(client state 即时index/cache等的对象). // 在本文的store.go中会详细分析,主要是构建一个cache对象,使用索引缓存obj clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc) // newInformer用来构建一个Informer中需要的controller return clientState, newInformer(lw, objType, resyncPeriod, h, clientState) } // NewIndexerInformer 返回一个indexer和一个controller,用于填充索引同时还提供事件通知。 // 您应该只将返回的索引用于 Get/List 操作;添加/修改/删除将导致事件通知出错。 func NewIndexerInformer( lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, h ResourceEventHandler, indexers Indexers, ) (Indexer, Controller) { // 正如我们所知,这将保持客户端状态(client state 即时index/cache等的对象). // 在本文的store.go中会详细分析,主要是构建一个cache对象,使用索引缓存obj clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers) // newInformer用来构建一个Informer中需要的controller return clientState, newInformer(lw, objType, resyncPeriod, h, clientState) } // newInformer 返回一个用于填充store(DeltaFIFO)的控制器,同时还提供事件通知。 func newInformer( lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, h ResourceEventHandler, clientState Store, ) Controller { // 用来存储resource 的event,供processor来改变index和分发notification。 fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KnownObjects: clientState, EmitDeltaTypeReplaced: true, }) // 构建controller的配置,本文前面做了介绍 cfg := &Config{ Queue: fifo, ListerWatcher: lw, ObjectType: objType, FullResyncPeriod: resyncPeriod, RetryOnError: false, Process: func(obj interface{}) error { // 遍历DeltaFIFO pop出来的Deltas for _, d := range obj.(Deltas) { switch d.Type {// 判断是那种类型 case Sync, Replaced, Added, Updated:// 除了删除操作 // 获取obj对应在缓存中的oldobj和是否存在 if old, exists, err := clientState.Get(d.Object); err == nil && exists { // 更新缓存中的obj if err := clientState.Update(d.Object); err != nil { return err } // 处理event事件,更新操作 h.OnUpdate(old, d.Object) } else { // 添加缓存中的obj if err := clientState.Add(d.Object); err != nil { return err } // 处理event事件,添加操作 h.OnAdd(d.Object) } case Deleted:// 删除事件 // 删除缓存中的obj if err := clientState.Delete(d.Object); err != nil { return err } // 处理删除事件 h.OnDelete(d.Object) } } return nil }, } // 新建一个controller,本文前面已做分析 return New(cfg) }
- 结构体
// Config 包含这些低级控制器之一的所有设置。 type Config struct { // 存储对象的队列 -必须是 DeltaFIFO. Process() 函数是处理这个 Queue 的 Pop() 方法的输出. Queue // 可以列出和观察您的对象的对象 ListerWatcher // 处理pop Delta 的obj。 Process ProcessFunc // ObjectType 是此controller预期处理的类型的示例对象。 ObjectType runtime.Object // FullResyncPeriod 是ShouldResync用来判断的时间段。 FullResyncPeriod time.Duration // refactor定期使用 shouldResync 来确定是否重新同步队列。如果 ShouldResync 为 `nil` 或返回 true,则表示反射器应该继续进行重新同步。 ShouldResync ShouldResyncFunc // 如果为 true,则当 Process() 返回错误时,将对象重新入队。 RetryOnError bool // 每当 ListAndWatch 因错误断开连接时调用。 WatchErrorHandler WatchErrorHandler //WatchListPageSize 是初始和重新列出监视列表的请求块大小。 WatchListPageSize int64 } // ShouldResyncFunc 指示反射器是否应该执行DeltaFIFO重新同步的函数. // 它可以由shared informer使用以支持具有自定义重新同步周期的多个事件处理程序。 type ShouldResyncFunc func() boolC // ProcessFunc 处理DeltaFIFO pop出来的单个对象. type ProcessFunc func(obj interface{}) error // controller 实现了 Controller type controller struct { // controller的配置,用与生成Reflector config Config // controller生成的Reflector,用来执行ListAndWatch,入DeltaFIFO,给Processor处理(reconcile最后处理) reflector *Reflector reflectorMutex sync.RWMutex // 用来ShouldResync判断使用 clock clock.Clock } // Run 开始处理项目,并将继续直到一个值被发送到 stopCh 或它被关闭. // 只调用一次,多次调用 Run 是错误的. func (c *controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() // 阻塞直到stopCh有输入 go func() { <-stopCh c.config.Queue.Close() }() // 构建一个Reflector r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) // 配置Reflector的WatchListPageSize,判断是否需要同步 r.ShouldResync = c.config.ShouldResync // 配置Reflector的WatchListPageSize,用于listAndWatch r.WatchListPageSize = c.config.WatchListPageSize // 配置clock,用于ShouldResync中判断是否该sync r.clock = c.clock // 用于处理listAndWatch的watch异常 if c.config.WatchErrorHandler != nil { r.watchErrorHandler = c.config.WatchErrorHandler } c.reflectorMutex.Lock() // 设置controller的reflector c.reflector = r c.reflectorMutex.Unlock() var wg wait.Group // 执行reflector wg.StartWithChannel(stopCh, r.Run) // 执行processLoop,下面做详细说明 wait.Until(c.processLoop, time.Second, stopCh) wg.Wait() } // DeltaFIFO同步的条件是执行了一次Add/Update/Delete/AddIfNotPresent(设置populated为true)且通过replcace设置的item全部被pop(也就是DeltaFIFO中的queue为空,即initialPopulationCount==0) func (c *controller) HasSynced() bool { return c.config.Queue.HasSynced() } // 获取最后更新缓存的资源版本号 func (c *controller) LastSyncResourceVersion() string { c.reflectorMutex.RLock() defer c.reflectorMutex.RUnlock() if c.reflector == nil { return "" } return c.reflector.LastSyncResourceVersion() } // processLoop 排空DeltaFIFO的queue工作队列。 // TODO:考虑并行处理。这需要稍微考虑一下以确保我们不会同时多次处理同一个对象。 func (c *controller) processLoop() { for { // pop DeltaFIFO并调用c.config.Process来处理每个item obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) // 如果有err if err != nil { // 如果err为队列关闭异常,则return if err == ErrFIFOClosed { return } // 重新入队 if c.config.RetryOnError { // 入队的安全方式。 c.config.Queue.AddIfNotPresent(obj) } } } } // ResourceEventHandlerFuncs 是一个适配器,可让您轻松指定任意数量的通知函数,同时仍然实现ResourceEventHandler. type ResourceEventHandlerFuncs struct { AddFunc func(obj interface{}) UpdateFunc func(oldObj, newObj interface{}) DeleteFunc func(obj interface{}) } // 实现了ResourceEventHandler的OnAdd方法,OnUpdate/OnDelete类似 func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) { if r.AddFunc != nil { r.AddFunc(obj) } } // FilteringResourceEventHandler 将提供的过滤器应用于传入的所有事件,确保调用适当的嵌套处理程序方法。 // 和处理程序一样,过滤器不能修改它给定的对象。 type FilteringResourceEventHandler struct { FilterFunc func(obj interface{}) bool Handler ResourceEventHandler } // OnAdd 仅在过滤器函数成功时调用嵌套处理程序 func (r FilteringResourceEventHandler) OnAdd(obj interface{}) { if !r.FilterFunc(obj) { return } r.Handler.OnAdd(obj) } // OnUpdate 确保根据过滤器是否匹配调用正确的处理程序 func (r FilteringResourceEventHandler) OnUpdate(oldObj, newObj interface{}) { // newObj和oldObj分别调用过滤函数 newer := r.FilterFunc(newObj) older := r.FilterFunc(oldObj) // 判断符合switch的哪个分支 switch { case newer && older:// 如果newer和older都为true // 执行update r.Handler.OnUpdate(oldObj, newObj) case newer && !older:// 如果newer为true,older为false // 执行add操作 r.Handler.OnAdd(newObj) case !newer && older:// 如果older为true,newer为false // 执行delete操作 r.Handler.OnDelete(oldObj) default: // 什么都不做 } } // OnDelete 仅在过滤器成功时调用嵌套处理程序 func (r FilteringResourceEventHandler) OnDelete(obj interface{}) { if !r.FilterFunc(obj) { return } r.Handler.OnDelete(obj) }
delta_fifo.go
- 变量
// 更改类型定义 const ( Added DeltaType = "Added" Updated DeltaType = "Updated" Deleted DeltaType = "Deleted" // 当我们遇到watch错误并且必须执行relist 时,会发出 Replaced。我们不知道被替换的对象是否发生了变化。 // // 注意:以前版本的 DeltaFIFO 也会使用同步替换事件。因此,只有在Option EmitdeltatePereplate=true时才会替换。 Replaced DeltaType = "Replaced" // 同步用于定期重新同步期间的合成事件。 Sync DeltaType = "Sync" )
- 接口
// KeyListerGetter 是知道如何列出其键并按键查找的东西。 type KeyListerGetter interface { KeyLister KeyGetter } // KeyLister 是知道如何列出键。 type KeyLister interface { ListKeys() []string } // KeyGetter 是任何知道如何获取存储在给定键下的值的东西. type KeyGetter interface { // GetByKey 返回与键关联的值,或设置exists=false。 GetByKey(key string) (value interface{}, exists bool, err error) }
- 结构体
// DeltaFIFOOptions 是 DeltaFIFO 的配置参数。都是可选的。 type DeltaFIFOOptions struct { // KeyFunction 用于确定对象应该具有什么键。 (它的返回值在 DeltaFIFO 的 KeyOf() 方法中使用,并附加处理已删除的对象和队列状态)。 可选,默认为MetaNamespaceKeyFunc. KeyFunction KeyFunc // KnownObjects预计会返回此队列的消费者“知道”的键列表。它用于决定当 Replace() 被调用时哪些项目丢失;为丢失的项目生成“已删除”增量。 // 如果您可以容忍 Replace() 中的缺失删除,KnownObjects 可为空。 KnownObjects KeyListerGetter // EmitDeltaTypeReplaced 指示队列consumer理解已替换的 DeltaType。 // 在添加`Replaced` 事件类型之前,对Replace() 的调用与Sync() 的处理方式相同。 // 出于向后兼容性的目的,默认情况下这是 false。 // 当为 true 时,将为传递给 Replace() 调用的项目发送 `Replaced` 事件。 // 如果为 false,则将发送 `Sync` 事件。 EmitDeltaTypeReplaced bool } // DeltaFIFO 类似于 FIFO,但有两个不同之处. // 一个是与给定对象的键相关联的累加器(items)不是该对象而是 Delta,它是该对象的 Delta 值的一部分。 // // 另一个区别是 DeltaFIFO 有两种额外的方式 可以将对象应用于累加器:替换和同步。 // 如果 EmitDeltaTypeReplaced 未设置为 true,则 Sync 将用于替换事件以实现向后兼容。同步用于周期性重新同步事件。 // // DeltaFIFO 是一个生产者-消费者队列,其中反射器旨在成为生产者,而消费者则是调用 Pop() 方法的任何东西。 // // DeltaFIFO 解决了这个用例: // * 您希望最多处理每个对象更改 (delta) 一次。 // *当您处理一个对象时,您希望看到自上次处理它以来发生的所有事情。(所有之前的事情都在map中的value) // * 您要处理某些对象的删除。 // * 您可能希望定期重新处理对象。 // // DeltaFIFO 的 Pop()、Get() 和 GetByKey() 方法返回interface{} 以满足存储/队列接口,但它们将始终返回类型为 Deltas 的对象。 List() 返回 FIFO 中每个累加器(items)的最新对象。 // // DeltaFIFO 的 knownObjects KeyListerGetter 提供了列出 Store 键和通过 Store 键获取对象的能力。 // 问题中的对象被称为“已知对象”,这组对象修改了删除、替换和重新同步方法的行为(每个都以不同的方式)。 // // 关于线程的注意事项:如果您从多个线程并行调用 Pop(),则最终可能会有多个线程处理同一对象的不同版本。 type DeltaFIFO struct { // lock/cond 保护对“项目”和“队列”的访问。 lock sync.RWMutex cond sync.Cond // `items` 将键映射到 Deltas。 // 每个这样的 Deltas 至少有一个 Delta。 items map[string]Deltas // `queue` 维护用于在 Pop() 中使用的键的 FIFO 顺序。 // `queue` 中没有重复项。 // 一个键在 `queue` 中当且仅当它在 `items` 中。 queue []string // 如果已填充了由 Replace() 插入的第一批项目,则填充为 true 或首先调用 Delete/Add/Update/AddIfNotPresent。 populated bool // initialPopulationCount 是第一次调用 Replace() 插入的项目数,在pop时会减少 initialPopulationCount int // keyFunc 用于制作用于排队项插入和检索的键,并且应该是确定性的. keyFunc KeyFunc // knownObjects 列出“已知”(缓存中存在)的键 --- 影响 Delete()、Replace() 和 Resync() knownObjects KeyListerGetter // 用于指示队列已关闭,以便在队列为空时可以退出控制循环。目前,不用于门控任何 CRED 操作。 closed bool // emitDeltaTypeReplaced 是在调用 Replace() 时是否发出 Replaced 或 Sync DeltaType(以保持向后兼容)。 emitDeltaTypeReplaced bool } // DeltaType 是更改的类型(添加、删除等) type DeltaType string // Delta 是 Deltas(Delta 对象列表)的成员,它反过来是 DeltaFIFO 存储的类型。它告诉你发生了什么改变,以及改变之后的对象状态。 // // [*] 除非更改是删除,然后您将在删除之前获得对象的最终状态。 type Delta struct { Type DeltaType Object interface{} } // Deltas 是单个对象的一个或多个“Delta”列表. // 最旧的 delta 在索引 0 处,最新的 delta 是最后一个. type Deltas []Delta // 关闭队列。 func (f *DeltaFIFO) Close() { f.lock.Lock() defer f.lock.Unlock() // closed设置为true,不影响close之前在queue中存在的items执行 f.closed = true f.cond.Broadcast() } // KeyOf 使用f 的 keyFunc生成key,但也检测 Deltas 对象或 DeletedFinalStateUnknown 对象的键。 func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) { // 判断obj是否是Deltas类型 if d, ok := obj.(Deltas); ok { // 判断长度是否为0 if len(d) == 0 { // 返回空字符串,长度为0的Deltas没有key错误 return "", KeyError{obj, ErrZeroLengthDeltasObject} } // 获取最新的Delta中的Object obj = d.Newest().Object } // 判断obj是否是DeletedFinalStateUnknown对象 if d, ok := obj.(DeletedFinalStateUnknown); ok { // 如果是直接返回其中的Key值 return d.Key, nil } // 调用keyFunc生成key return f.keyFunc(obj) } // 如果首先调用 Add/Update/Delete/AddIfNotPresent,HasSynced 返回 true,或者 Replace() 插入的第一批项目已被pop. func (f *DeltaFIFO) HasSynced() bool { f.lock.Lock() defer f.lock.Unlock() // f中的populated表示f已被填充,f.initialPopulationCount(只有在方法replace和pop中用到了)表示初始的填充数目 return f.populated && f.initialPopulationCount == 0 } // 添加插入一个项目,并将其放入队列中。该项目仅在集合中不存在(其实也就是判断delete操作下)时才入队. func (f *DeltaFIFO) Add(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() // 设置populated为true f.populated = true // 入对操作,方法中没有使用lock,但是在调用方法之前必须先lock return f.queueActionLocked(Added, obj) } // Update 就像 Add 一样,但会生成一个 Updated Delta。 func (f *DeltaFIFO) Update(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true return f.queueActionLocked(Updated, obj) } // Delete 就像 Add 一样,但会生成一个 Deleted Delta. // 如果给定的对象不存在,它将被忽略。 (例如,它可能已被替换(重新列出)删除。) // 在此方法中,`f.knownObjects`,如果不是 nil,则提供(通过 GetByKey)获取提供额外的验证是否存在表示. func (f *DeltaFIFO) Delete(obj interface{}) error { // 获取obj在f中的key id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } f.lock.Lock() defer f.lock.Unlock() // 设置populated = true f.populated = true // 判断knownObjects是否为空 if f.knownObjects == nil { // 判断在f的items中是否存在 if _, exists := f.items[id]; !exists { // 据推测,这是在 relist 发生时删除的. return nil } } else { // 如果对象不存在于 knownObjects 并且它在 items 中没有相应的项目,我们跳过“删除”操作。 // 注意,如果items中有“删除”动作,我们也可以忽略它,因为它会在“queueActionLocked”中自动去重 // 获取在缓存中是否存在 _, exists, err := f.knownObjects.GetByKey(id) // 获取在f中是否存在 _, itemsExist := f.items[id] // 如果err为空且不存在f和缓存中 if err == nil && !exists && !itemsExist { // 据推测,这是在重新列出时删除的。 return nil } } // 存在于项目或已知对象中,则添加Deleted Delta return f.queueActionLocked(Deleted, obj) } // AddIfNotPresent 插入一个项目,并将其放入队列中。如果该项目已经存在于集合中,则它既不会入队也不会添加到集合中。 // // 这在单个生产者/消费者场景中很有用,因此消费者可以安全地重试项目,而不会与生产者竞争并可能将过时的项目排入队列。 // // 重要提示:obj 必须是 Deltas(Pop() 函数的输出)。这与添加/更新/删除功能不同。 func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error { // 这里做了限制,必须是Deltas类型 deltas, ok := obj.(Deltas) if !ok { return fmt.Errorf("object must be of type deltas, but got: %#v", obj) } // 获取deltas对应的key id, err := f.KeyOf(deltas) if err != nil { return KeyError{obj, err} } f.lock.Lock() defer f.lock.Unlock() // 添加deltas到f中 f.addIfNotPresent(id, deltas) return nil } // 如果 id 不存在,addIfNotPresent 在 id 下插入增量,并假设调用者已经持有 fifo 锁. func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) { // 设置populated = true f.populated = true // 判断f中items中对应的key是否存在 if _, exists := f.items[id]; exists { // 存在则return,说明是对key对应的deltas的整体追加,而不是单一的append return } // 追加key到queue f.queue = append(f.queue, id) // 设置key为id value为deltas f.items[id] = deltas // 唤醒所有等待cond的goruntime f.cond.Broadcast() } // queueActionLocked 追加对应到增量列表。 // 注意: 调用者必须先锁定。 func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { // 获取obj对应的key id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } // 获取旧的deltas oldDeltas := f.items[id] // 追加新的delta到旧的deltas,并形成一个新的deltas newDeltas := append(oldDeltas, Delta{actionType, obj}) // 去重新的deltas newDeltas = dedupDeltas(newDeltas) // 判断新的deltas长度是否大于0 if len(newDeltas) > 0 { // 如果获取的key在f的items中不存在 if _, exists := f.items[id]; !exists { // 追加key到f的queue中 f.queue = append(f.queue, id) } // 覆盖f中items对应key的value f.items[id] = newDeltas // 通知等待cond的所有goruntime执行 f.cond.Broadcast() } else { // 这永远不会发生,因为 dedupDeltas 在给定非空列表时永远不会返回空列表(就像这里一样). // 如果无论如何它发生了,处理它。 if oldDeltas == nil { klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj) return nil } klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj) f.items[id] = newDeltas return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj) } return nil } // List 返回项目列表的所有obj;它items中每个Deltas 中返回最近的对象. func (f *DeltaFIFO) List() []interface{} { f.lock.RLock() defer f.lock.RUnlock() return f.listLocked() } func (f *DeltaFIFO) listLocked() []interface{} { // 初始化list list := make([]interface{}, 0, len(f.items)) // 遍历f中的items for _, item := range f.items { // 追加item中最新的delta中的obj到list list = append(list, item.Newest().Object) } return list } // ListKeys 返回当前在 FIFO 中的对象的所有键的列表. func (f *DeltaFIFO) ListKeys() []string { f.lock.RLock() defer f.lock.RUnlock() list := make([]string, 0, len(f.items)) for key := range f.items { list = append(list, key) } return list } // Get 返回请求项的完整增量列表,或设置exists=false。 func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { key, err := f.KeyOf(obj) if err != nil { return nil, false, KeyError{obj, err} } return f.GetByKey(key) } // GetByKey 返回请求项的完整增量列表,如果该列表为空,则设置 exists=false。 func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) { f.lock.RLock() defer f.lock.RUnlock() d, exists := f.items[key] if exists { // 复制项目的切片,因此对该切片的操作不会干扰我们返回的对象。c d = copyDeltas(d) } return d, exists, nil } // IsClosed 检查队列是否关闭 func (f *DeltaFIFO) IsClosed() bool { f.lock.Lock() defer f.lock.Unlock() return f.closed } // Pop 阻塞直到队列有一些项目,然后返回一个item. // 如果多个item准备就绪,它们将按照添加/更新的顺序返回. // 该item在返回之前从queue(和store)中删除,因此如果您没有成功处理它,则需要使用 AddIfNotPresent() 将其添加回来. // process 函数在锁定下调用,因此更新数据结构(需要与队列同步的数据结构(例如 knownKeys))是安全的 . // PopProcessFunc 可能会返回一个带有嵌套错误的 ErrRequeue 实例,以指示当前项应该重新排队(相当于在锁下调用 AddIfNotPresent)。 // process应该避免昂贵的 I/O 操作,以便其他队列操作,即 Add() 和 Get(),不因该被阻塞太久。 // // Pop 返回一个 'Deltas',它有一个完整的列表(发生在对象 (deltas) 上的所有事情)。 func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { // 如果f中queue的长度为0 for len(f.queue) == 0 { // 当队列为空时,Pop() 的调用将被阻塞,直到新项目入队. // 调用 Close() 时,设置 f.closed 并广播条件.这导致此循环继续并从 Pop() 返回。 if f.closed { return nil, ErrFIFOClosed } // 阻塞 等待被唤醒 f.cond.Wait() } // 获取queue中的第一个对象 id := f.queue[0] // 舍弃第一个形成新的queue赋值给f的queue f.queue = f.queue[1:] // 判断f.initialPopulationCount 是否大于0 if f.initialPopulationCount > 0 { // 自减1 f.initialPopulationCount-- } // 获取items中对应key的value item, ok := f.items[id] // 如果获取失败 if !ok { // 这应该从不发生 klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id) continue } // 删除 f.items中对应为id的item delete(f.items, id) // process处理对应的item err := process(item) // 如果有异常,则重新入队 if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err } // 这里不需要 copyDeltas,因为我们正在将所有权转移给调用者(调用者可以决定是否需要修改原始数据). return item, err } } // Replace atomically 做了两件事: (1) 它使用 Sync 或 Replace DeltaType 添加给定的对象 // (2) 它做了一些删除. // 特别是:对于每个不是 `list` 中的对象的键的预先存在的键 K 都有 `Delete(DeletedFinalStateUnknown{K, O})` 的效果,其中 O 是 K的当前对象. // 如果 `f.knownObjects == nil` 那么预先存在的键是 `f.items` 中的键,而 K 的当前对象是与 K 关联的增量的 `.Newest()`。 // 否则,预先存在的键是由 `f.knownObjects` 列出的键,而 K 的当前对象是 `f.knownObjects.GetByKey(K)` 返回的。 func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { f.lock.Lock() defer f.lock.Unlock() keys := make(sets.String, len(list)) // 为老client保持向后兼容 action := Sync // 如果r中的emitDeltaTypeReplaced为true,则设置action为Replaced if f.emitDeltaTypeReplaced { action = Replaced } // 为每个新项目添加同步/替换操作。 for _, item := range list { // 获取item对应的key key, err := f.KeyOf(item) if err != nil { return KeyError{item, err} } // 插入到keys keys.Insert(key) // 执行入队操作 if err := f.queueActionLocked(action, item); err != nil { return fmt.Errorf("couldn't enqueue object: %v", err) } } // 如果f.knownObjects == nil if f.knownObjects == nil { // 对我们自己的列表进行删除检测。 queuedDeletions := 0 // 遍历f中的items for k, oldItem := range f.items { // 验证item对应的key是否存在于list形成的keys中 if keys.Has(k) { continue } // 删除不在新列表中的存在的项目. // 如果在与 apiserver 断开连接时错过了watch删除事件,则可能会发生这种情况. var deletedObj interface{} // 获取items中最新的一个obj if n := oldItem.Newest(); n != nil { deletedObj = n.Object } // 队列删除 Delta加一 queuedDeletions++ // 删除delta入队 if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { return err } } // 判断populated(表示是否有填充队列)是否为false if !f.populated { f.populated = true // 虽然在队列的初始填充中不应该有任何排队删除,但最好是安全的方式. f.initialPopulationCount = keys.Len() + queuedDeletions } return nil } // 检测不在队列中的删除。 knownKeys := f.knownObjects.ListKeys() queuedDeletions := 0 // 遍历knownObjects(一般是缓存)中的所有keys for _, k := range knownKeys { if keys.Has(k) { continue } // 到这里,表明k不存在与list形成的keys中 // 获取k在knownObjects(一般是缓存)对应的obj deletedObj, exists, err := f.knownObjects.GetByKey(k) if err != nil { deletedObj = nil klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) } else if !exists { deletedObj = nil klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) } // 队列删除 Delta加一 queuedDeletions++ // 删除入队 if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { return err } } // 判断populated(表示是否有填充队列)是否为false if !f.populated { f.populated = true f.initialPopulationCount = keys.Len() + queuedDeletions } return nil } // Resync 使用 Delta 的 Sync 类型添加由 `f.knownObjects` (其键尚未排队等待处理)列出的每个对象 // 如果 `f.knownObjects` 是 `nil`,那么 Resync 什么都不做。 func (f *DeltaFIFO) Resync() error { f.lock.Lock() defer f.lock.Unlock() // 如果 `f.knownObjects` 是 `nil`,那么 Resync 什么都不做。 if f.knownObjects == nil { return nil } // 获取knownObjects中的所有keys keys := f.knownObjects.ListKeys() // 遍历keys for _, k := range keys { // 同步k if err := f.syncKeyLocked(k); err != nil { return err } } return nil } func (f *DeltaFIFO) syncKeyLocked(key string) error { obj, exists, err := f.knownObjects.GetByKey(key) if err != nil { klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key) return nil } else if !exists { klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key) return nil } // 如果我们正在执行 Resync() 并且已经有一个事件(不论什么事件)排队等待该对象,我们将忽略它的 Resync. // 这是为了避免竞争,因为重新同步带有对象前一个值(以为更改缓存或者apiserver中的值是事件驱动的,所以这里的前一个是指队列最前的delta之前的一个obj值)(对象排队事件不会触发更改底层存储 <knownObjects>,而在pop的HandleDeltas方法才会改变底层存储 <knownObjects>)。 id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } // 对应key是否在f中存在item if len(f.items[id]) > 0 { return nil } // 入队 Sync if err := f.queueActionLocked(Sync, obj); err != nil { return fmt.Errorf("couldn't queue object: %v", err) } return nil } // Oldest 是一个方便的函数,它返回最旧的 delta,如果没有 delta 则返回 nil. func (d Deltas) Oldest() *Delta { if len(d) > 0 { return &d[0] } return nil } // Newest 是一个方便的函数,它返回最新的 delta,如果没有 delta,则返回 nil. func (d Deltas) Newest() *Delta { if n := len(d); n > 0 { return &d[n-1] } return nil } // copyDeltas 返回 d 的浅拷贝;也就是说,它复制切片但不复制切片中的对象. // 这允许 Get/List 返回一个我们知道不会被后续修改破坏的对象。 func copyDeltas(d Deltas) Deltas { d2 := make(Deltas, len(d)) copy(d2, d) return d2 } // DeletedFinalStateUnknown 在对象被删除但在与 apiserver 断开连接时错过监视删除事件的情况下被放入 DeltaFIFO。 // 在这种情况下,我们不知道对象的最终“静止”状态,因此包含的 `Obj` 有可能是陈旧的. type DeletedFinalStateUnknown struct { Key string Obj interface{} }
- 函数
// NewDeltaFIFO 返回一个队列,可用于处理对items的更改。 // // keyFunc 用于确定对象应该具有什么键。 (它的返回值在 DeltaFIFO 的 KeyOf() 方法中使用,并附加处理 围绕已删除的对象和队列状态)。 // // 可以提供“knownObjects”来修改删除、替换和重新同步的行为。如果您不需要这些 修改,它可能为空。 // // TODO:考虑将 keyLister 与此对象合并,在调用 Pop() 时跟踪“已知”键的列表。必须考虑如何影响错误重试。 // // NOTE: 在使用外部known object source时,可能会误用它并导致竞争。 // 是否存在潜在的竞争取决于消费者修改 knownObjects 的方式。在 Pop() 中,进程函数在锁(注意有锁)下调用,因此更新其中需要与队列同步的数据结构是安全的(例如knownObjects)。 // // Example: // 如果 sharedIndexInformer 是消费者(https://github.com/kubernetes/kubernetes/blob/0cdd940f/staging/src/k8s.io/client-go/tools/cache/shared_informer.go#L192), 没有竞争,因为 knownObjects (s.indexer) 被安全修改在 DeltaFIFO 的锁定下。 // 唯一的例外是 GetStore() 和GetIndexer() 方法,它们公开了修改基础存储(cache/indexer)的方法。目前这两种方法用于创建 Lister和内部测试。 // // Warning: 这构造了一个 DeltaFIFO,它不区分由调用 Replace 引起的事件(例如,来自可能包含对象更新的重新列表)和由周期性重新同步引起的合成事件(只发出现有对象)。 // 有关详细信息,请参阅 https://issue.k8s.io/86015。 // // 使用 `NewDeltaFIFOWithOptions(DeltaFIFOOptions{..., EmitDeltaTypeReplaced: true})` 代替根据类型接收 `Replaced` 事件。 // // 弃用:等同于 NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: keyFunc,KnownObjects: knownObjects}) func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO { return NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KeyFunction: keyFunc, KnownObjects: knownObjects, }) } // NewDeltaFIFOWithOptions 返回一个队列,可用于处理对items的更改。 func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { // 判断opts中的keyFunction是否为空,如果为空则设置默认是的MetaNamespaceKeyFunc if opts.KeyFunction == nil { opts.KeyFunction = MetaNamespaceKeyFunc } // 构造一个DeltaFIFO f := &DeltaFIFO{ items: map[string]Deltas{}, queue: []string{}, keyFunc: opts.KeyFunction, knownObjects: opts.KnownObjects, emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced, } f.cond.L = &f.lock return f } // re-listing and watching 可以以任何顺序多次提供相同的更新。如果它们相同,这将组合最近的两个增量。 func dedupDeltas(deltas Deltas) Deltas { // 获取长度 n := len(deltas) // 如果长度小于2,则直接返回,不做任何操作 if n < 2 { return deltas } // 获取最后的两个delta a := &deltas[n-1] b := &deltas[n-2] // 对比是否需要去重 if out := isDup(a, b); out != nil { // 如果需要,则最后两个合并成一个并返回 deltas[n-2] = *out return deltas[:n-1] } return deltas } // 如果 a & b 表示相同的事件,则返回应该保留的增量。否则,返回 nil。 // 目前只有delete的需要去重 func isDup(a, b *Delta) *Delta { if out := isDeletionDup(a, b); out != nil { return out } // TODO: Detect other duplicate situations? Are there any? return nil } // 如果两者都是删除,则保留信息最多的那个。 func isDeletionDup(a, b *Delta) *Delta { if b.Type != Deleted || a.Type != Deleted { return nil } // TODO 做更复杂的检查,还是这样就足够了? if _, ok := b.Object.(DeletedFinalStateUnknown); ok { return a } return b }
expiration_cache.go
- 接口
// ExpirationPolicy 规定对象何时到期。目前只抽象出来,所以单元测试不依赖于系统时钟. type ExpirationPolicy interface { IsExpired(obj *TimestampedEntry) bool }
- 函数
// NewTTLStore 使用 TTL 策略创建并返回ExpirationStore func NewTTLStore(keyFunc KeyFunc, ttl time.Duration) Store { return NewExpirationStore(keyFunc, &TTLPolicy{ttl, clock.RealClock{}}) } // NewExpirationStore 为给定的策略创建并返回一个过期缓存 func NewExpirationStore(keyFunc KeyFunc, expirationPolicy ExpirationPolicy) Store { return &ExpirationCache{ cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}), keyFunc: keyFunc, clock: clock.RealClock{}, expirationPolicy: expirationPolicy, } }
- 结构体
// ExpirationCache 实现 store 接口,其实cache结构体类似,只是多了过期检查 // 1. 所有条目都会在插入时自动打上时间戳 // a. key是根据原始 item/keyFunc 计算的 // b. 在该键下插入的值是带时间戳的item // 2. 根据过期策略在读取时延迟发生过期 // a. 当cache中的*任何*项目到期时,不能将任何项目插入store. // 3. 在返回之前从未过期的条目中剥离时间戳 // 请注意,ExpirationCache 本质上比普通的threadSafeStore 慢,因为它每次检查项目是否已过期时都会获取写锁。 type ExpirationCache struct { cacheStorage ThreadSafeStore keyFunc KeyFunc clock clock.Clock expirationPolicy ExpirationPolicy // expireLock 是一个写锁,用于保证我们不会因为过期的过期时间戳比较而破坏新插入的对象 expirationLock sync.Mutex } // TTLPolicy 实现了基于 ttl 的 ExpirationPolicy。 type TTLPolicy struct { // >0: 表示age > ttl 的条目过期 // <=0: 没有任何条目过期 TTL time.Duration // 用于计算 ttl 到期时间的时钟 Clock clock.Clock } // IsExpired 如果给定的对象比 ttl 旧,或者它不能确定它的年龄,则返回 true. func (p *TTLPolicy) IsExpired(obj *TimestampedEntry) bool { return p.TTL > 0 && p.Clock.Since(obj.Timestamp) > p.TTL } // TimestampedEntry 是 ExpirationCache 中唯一允许的类型. // 请记住,在计算机之间共享时间戳是不安全的。 // 如果您从 API server获取时间戳并将其作为 ExpirationCache 的一部分在客户端计算机上使用,则行为可能会不一致。 type TimestampedEntry struct { Obj interface{} Timestamp time.Time key string } // getTimestampedEntry 返回存储在给定键下的 TimestampedEntry。 func (c *ExpirationCache) getTimestampedEntry(key string) (*TimestampedEntry, bool) { // 从cacheStorage获取 item, _ := c.cacheStorage.Get(key) // 如果是TimestampedEntry类型就返回 if tsEntry, ok := item.(*TimestampedEntry); ok { return tsEntry, true } return nil, false } // 当且仅当对象尚未过期时,getOrExpire 从 TimestampedEntry 检索对象。它持有跨删除的写锁。 func (c *ExpirationCache) getOrExpire(key string) (interface{}, bool) { // 从我们认为一个项目“过期”到我们删除它时,防止所有插入,因此在删除之前未过期的项目不会在同一键下潜入。 c.expirationLock.Lock() defer c.expirationLock.Unlock() // 获取对应key的timestampedItem timestampedItem, exists := c.getTimestampedEntry(key) if !exists { return nil, false } // 判断是否过期 if c.expirationPolicy.IsExpired(timestampedItem) { klog.V(4).Infof("Entry %v: %+v has expired", key, timestampedItem.Obj) // 从缓存中删除 c.cacheStorage.Delete(key) return nil, false } return timestampedItem.Obj, true } // GetByKey 返回存储在键下的项目,或设置exists=false。其实和getOrExpire一样,多了个乌龟壳 func (c *ExpirationCache) GetByKey(key string) (interface{}, bool, error) { obj, exists := c.getOrExpire(key) return obj, exists, nil } // 获取未过期的项目。它清除进程中过期项目的缓存. func (c *ExpirationCache) Get(obj interface{}) (interface{}, bool, error) { // 获取obj对应的key key, err := c.keyFunc(obj) if err != nil { return nil, false, KeyError{obj, err} } // 调用getOrExpire获取对象 obj, exists := c.getOrExpire(key) return obj, exists, nil } // List 检索未过期项目的列表。它清除进程中过期项目的缓存. func (c *ExpirationCache) List() []interface{} { // 获取所有的items items := c.cacheStorage.List() // 初始化返回列表 list := make([]interface{}, 0, len(items)) // 遍历items for _, item := range items { // 获取对应item的key key := item.(*TimestampedEntry).key // 获取key对应的obj if obj, exists := c.getOrExpire(key); exists { list = append(list, obj) } } return list } // ListKeys 返回过期缓存中所有键的列表。 func (c *ExpirationCache) ListKeys() []string { return c.cacheStorage.ListKeys() } // 为项目添加时间戳并将其插入缓存,覆盖可能存在于同一键下的条目。 func (c *ExpirationCache) Add(obj interface{}) error { key, err := c.keyFunc(obj) if err != nil { return KeyError{obj, err} } c.expirationLock.Lock() defer c.expirationLock.Unlock() c.cacheStorage.Add(key, &TimestampedEntry{obj, c.clock.Now(), key}) return nil } // 类似与add方法 func (c *ExpirationCache) Update(obj interface{}) error { return c.Add(obj) } // 从缓存中删除一个项目。 func (c *ExpirationCache) Delete(obj interface{}) error { key, err := c.keyFunc(obj) if err != nil { return KeyError{obj, err} } c.expirationLock.Lock() defer c.expirationLock.Unlock() c.cacheStorage.Delete(key) return nil } // 替换将在尝试替换操作之前将给定列表中的所有项目转换为 TimestampedEntries. func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) error { items := make(map[string]interface{}, len(list)) ts := c.clock.Now() for _, item := range list { key, err := c.keyFunc(item) if err != nil { return KeyError{item, err} } items[key] = &TimestampedEntry{item, ts, key} } c.expirationLock.Lock() defer c.expirationLock.Unlock() c.cacheStorage.Replace(items, resourceVersion) return nil }
fifo.go
- 接口
// Queue 使用Store的键扩展 Store 以“process”。 // 每个添加、更新或删除都可以将对象的键放入该集合中。 // Queue 有一种方法可以在给定累加器(items)的情况下派生相应的键。 // 一个 Queue 可以从多个 goroutines 并发访问。 // 队列可以“关闭”,之后弹出操作返回错误。 type Queue interface { // Store接口 用来做缓存 Store // Pop 阻塞直到至少有一个键要处理或队列关闭。 // 在后一种情况下(队列关闭),Pop 返回错误。 // 在前一种情况下,Pop 原子地选择一个键进行处理,从 Store 中删除该(键,累加器)关联,并处理累加器(items)。 // Pop 返回已处理的累加器(items)和处理结果。 // PopProcessFunc 可能会返回一个 ErrRequeue{inner},在这种情况下,Pop 将 // (a) 返回到队列的 (key, accumulator) 关联作为原子处理的一部分 // (b) 从 Pop 返回内部错误。 Pop(PopProcessFunc) (interface{}, error) // AddIfNotPresent 将给定的累加器(item)放入队列(在与累加器(item)的键关联)当且仅当该键尚未与非空累加器关联。 AddIfNotPresent(interface{}) error // 判断是否已同步 HasSynced() bool // 关闭队列 Close() }
- 结构体
// PopProcessFunc 传递给 Queue 接口的 Pop() 方法。 // 它应该处理从队列中弹出的累加器(item)。 type PopProcessFunc func(interface{}) error // ErrRequeue 可以由 PopProcessFunc 返回以安全地重新排队当前项目。 Err 的值将从 Pop 返回。 type ErrRequeue struct { // Err 由 Pop 函数返回 Err error } func (e ErrRequeue) Error() string { if e.Err == nil { return "the popped item should be requeued without returning an error" } return e.Err.Error() } // FIFO 是一个队列,其中每个累加器只是最近最近操作的对象,并且要处理的键集合是一个 FIFO。 // 累加器(items)一开始都是空的,从累加器(items)中删除一个对象会清空累加器(items中对应key)。 // 重新同步操作是空操作。 // // 因此:如果单个对象的多次添加/更新发生,而该对象的键在被处理之前就在队列中,那么它只会被处理一次,只处理最新版本。这不能通过channel完成 // // FIFO 解决了这个用例: // * 您希望(恰好)处理每个对象一次。 // * 您希望在处理对象时处理该对象的最新版本。 // * 您不想处理已删除的对象,应将它们从队列中删除。 // * 您不想定期重新处理对象。 // 可以类比DeltaFIFO type FIFO struct { lock sync.RWMutex cond sync.Cond // 我们依赖于`items`中的每个键也是`queue`中的属性 items map[string]interface{} queue []string // 如果已填充了由 Replace() 插入的第一批项目或首先调用了删除/添加/更新,则填充为真。 populated bool // initialPopulationCount 是第一次调用 Replace() 插入的项目数 initialPopulationCount int // keyFunc 用于制作用于排队项插入和检索的键,并且应该是确定性的。 keyFunc KeyFunc // 指示队列已关闭。 // 用于指示队列已关闭,以便在队列为空时可以退出控制循环。 closed bool } // 关闭队列。和DeltaFIFO一样 func (f *FIFO) Close() // 和DeltaFIFO一样 func (f *FIFO) HasSynced() bool // 添加插入一个项目,并将其放入队列中。该项目仅在集合中尚不存在时才入队(这个入队是指在obj对应的key在queue中不存在才添加key到f中的queue中)。 func (f *FIFO) Add(obj interface{}) error { // 构建obj对应的key id, err := f.keyFunc(obj) if err != nil { return KeyError{obj, err} } f.lock.Lock() defer f.lock.Unlock() // 设置populated = true f.populated = true // 获取items中是否存在key if _, exists := f.items[id]; !exists { // 存在就入队,添加到queue f.queue = append(f.queue, id) } // 覆盖key对应的items f.items[id] = obj f.cond.Broadcast() return nil } // AddIfNotPresent 插入一个项目,并将其放入队列中。如果该项目已经 存在于集合中,则它既不会入队也不会添加到集合中。 // 注意: 这在单个生产者/消费者场景中很有用,因此消费者可以安全地重试项目,而不会与生产者竞争并可能将过时的项目排入队列。 func (f *FIFO) AddIfNotPresent(obj interface{}) error { // 构建obj对应的key id, err := f.keyFunc(obj) if err != nil { return KeyError{obj, err} } f.lock.Lock() defer f.lock.Unlock() // 添加到f中 f.addIfNotPresent(id, obj) return nil } // addIfNotPresent 假定 fifo 锁已经被持有,如果它不存在,则将提供的项添加到 id 下的队列中。 func (f *FIFO) addIfNotPresent(id string, obj interface{}) { // 设置populated = true f.populated = true // 如果items中存在key if _, exists := f.items[id]; exists { // 什么都不做 return return } // 入队 f.queue = append(f.queue, id) // 添加map key :id value: obj f.items[id] = obj f.cond.Broadcast() } // Update is the same as Add in this implementation. func (f *FIFO) Update(obj interface{}) error { return f.Add(obj) } // Update 与此实现中的 Add 相同。 func (f *FIFO) Update(obj interface{}) error { return f.Add(obj) } // 删除一个项目。它不会将它添加到队列中,因为这个实现假设消费者只关心对象, 而不是它们被创建/添加的顺序。 func (f *FIFO) Delete(obj interface{}) error { // 获取key id, err := f.keyFunc(obj) if err != nil { return KeyError{obj, err} } f.lock.Lock() defer f.lock.Unlock() // 设置populated = true f.populated = true // 删除f.items对应id的项 delete(f.items, id) return err } // List 返回所有项目的列表。 func (f *FIFO) List() []interface{} { f.lock.RLock() defer f.lock.RUnlock() list := make([]interface{}, 0, len(f.items)) for _, item := range f.items { list = append(list, item) } return list } // ListKeys 返回在 FIFO 中当前对象的所有键的列表 。 func (f *FIFO) ListKeys() []string { f.lock.RLock() defer f.lock.RUnlock() list := make([]string, 0, len(f.items)) for key := range f.items { list = append(list, key) } return list } // Get 返回请求的项目,或设置exists=false。 func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { key, err := f.keyFunc(obj) if err != nil { return nil, false, KeyError{obj, err} } // 通过key获取obj return f.GetByKey(key) } // GetByKey 返回请求的项目,或设置exists=false。 func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) { f.lock.RLock() defer f.lock.RUnlock() // 返回items对应key的obj item, exists = f.items[key] return item, exists, nil } // Pop 等待一个项目准备好并处理它。如果多个项目准备就绪,它们将按照添加/更新的顺序返回。 // 该项目在处理之前从queue(FIFO中的queue)(和store(FIFO中的items))中删除, 所以如果你没有成功处理它,它应该用AddIfNotPresent() 添加回来。 // process 函数在锁定下调用,因此它是安全的更新其中需要与队列同步的数据结构。 func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { // 当队列为空时,Pop() 的调用将被阻止,直到新项目入队。 // 当调用 Close() 时,设置 f.closed 并广播条件。 // 这会导致这个循环继续并从 Pop() 返回。 if f.closed { return nil, ErrFIFOClosed } f.cond.Wait() } // 获取queue中最先入队的key id := f.queue[0] // 去掉第一个形成新的queue f.queue = f.queue[1:] // 如果initialPopulationCount > 0 则initialPopulationCount减一 if f.initialPopulationCount > 0 { f.initialPopulationCount-- } // 获取对应key的item item, ok := f.items[id] if !ok { // 项目可能随后已被删除。 continue } // 从store(items)中删除 delete(f.items, id) // 处理item err := process(item) if e, ok := err.(ErrRequeue); ok { // 有异常则重新入队 f.addIfNotPresent(id, item) err = e.Err } return item, err } } // Replace将删除 'f' 的内容,而是使用给定的映射。 // 'f' 获得items的所有权,您不应在调用此函数后再次引用该items。 f 的队列也被重置;返回时,它将包含items中的项目,没有特定的顺序。 func (f *FIFO) Replace(list []interface{}, resourceVersion string) error { // 初始化返回items items := make(map[string]interface{}, len(list)) // 遍历参数list for _, item := range list { // 获取item对应的key key, err := f.keyFunc(item) if err != nil { return KeyError{item, err} } // items添加 对应键值对 items[key] = item } f.lock.Lock() defer f.lock.Unlock() // 重置populated并设置initialPopulationCount if !f.populated { f.populated = true f.initialPopulationCount = len(items) } // 设置f中的items为items f.items = items // 重置f的queue f.queue = f.queue[:0] // 遍历items for id := range items { // 填充queue f.queue = append(f.queue, id) } if len(f.queue) > 0 { f.cond.Broadcast() } return nil } // Resync 将确保 Store(items) 中的每个对象在队列中都有其键。 func (f *FIFO) Resync() error { f.lock.Lock() defer f.lock.Unlock() // 初始化一个slice inQueue := sets.NewString() // 遍历queue for _, id := range f.queue { // 添加原有的key到新的inQueue inQueue.Insert(id) } // 遍历store(items) for id := range f.items { // 如果对应的key在新的inQueue中不存在,则添加 if !inQueue.Has(id) { f.queue = append(f.queue, id) } } if len(f.queue) > 0 { f.cond.Broadcast() } return nil }
- 函数
// Pop 是从队列中弹出item的辅助函数。 // 警告:不要在非测试代码中使用这个函数来避免竞争 , 除非你真的真的很清楚你在做什么。 func Pop(queue Queue) interface{} { var result interface{} queue.Pop(func(obj interface{}) error { result = obj return nil }) return result } // NewFIFO 返回一个 Store,可用于将项目排队处理。 func NewFIFO(keyFunc KeyFunc) *FIFO { f := &FIFO{ items: map[string]interface{}{}, queue: []string{}, keyFunc: keyFunc, } f.cond.L = &f.lock return f }
heap.go
- 接口
- 函数
// NewHeap 返回一个堆,可用于将要处理的item排队。 func NewHeap(keyFn KeyFunc, lessFn LessFunc) *Heap { h := &Heap{ data: &heapData{ items: map[string]*heapItem{}, queue: []string{}, keyFunc: keyFn, lessFunc: lessFn, }, } h.cond.L = &h.lock return h }
- 结构体
// LessFunc 用于比较堆中的两个对象。 type LessFunc func(interface{}, interface{}) bool type heapItem struct { obj interface{} // 存储在堆中的对象。 index int // Heap.queue 中对象键的索引(就是slice的索引)。 } type itemKeyValue struct { key string // 存储在堆中的对象的key。 obj interface{} // 存储在堆中的对象。 } // heapData 是一个内部结构,它实现了标准堆接口并将数据存储在堆中。 type heapData struct { // items 是从对象的键到对象及其索引的映射。 items map[string]*heapItem // queue 实现了一个堆数据结构,并根据堆不变性保持元素的顺序。队列将键的对象存储在“items”中。 queue []string // keyFunc 用于制作用于排队项插入和检索的键,并且应该是确定性的。 keyFunc KeyFunc // lessFunc 用于比较堆中的两个对象。 lessFunc LessFunc } // Less 比较两个对象,如果第一个对象应该放在堆中第二个对象的前面,则返回 true。 func (h *heapData) Less(i, j int) bool { // 如果两个索引超过了heap中queue的长度,则返回false if i > len(h.queue) || j > len(h.queue) { return false } // 先获取对应索引下的key,在根据key获取items的heapItem itemi, ok := h.items[h.queue[i]] // 如果未找到返回false if !ok { return false } // 先获取对应索引下的key,在根据key获取items的heapItem itemj, ok := h.items[h.queue[j]] // 如果未找到返回false if !ok { return false } // 调用lessFunc方法比较两个对象 return h.lessFunc(itemi.obj, itemj.obj) } // Len 返回堆中的项目数。 func (h *heapData) Len() int { return len(h.queue) } // Swap 实现了堆中两个元素的交换。这是标准堆接口的一部分,一般不应直接调用。 func (h *heapData) Swap(i, j int) { // 对调queue中两个索引下的key h.queue[i], h.queue[j] = h.queue[j], h.queue[i] // 获取对调后i对应的key的item item := h.items[h.queue[i]] // 修改对应item的索引为i item.index = i // 获取对调后j对应的key的item item = h.items[h.queue[j]] // 修改对应item的索引为j item.index = j } // 注意: Push 应该只被 heap.Push 调用。 func (h *heapData) Push(kv interface{}) { // key 和value形成的对象 keyValue := kv.(*itemKeyValue) // 获取队列的长度 n := len(h.queue) // 构建item,并添加到kv对应的key中 h.items[keyValue.key] = &heapItem{keyValue.obj, n} // 添加kv对应的key到queue h.queue = append(h.queue, keyValue.key) } // 注意: Pop 应该只被 heap.Pop 调用。堆是先进后出fieo func (h *heapData) Pop() interface{} { // 获取最后进入queue的key key := h.queue[len(h.queue)-1] // 截取除了最后一个元素形成新的queue,可以对比FIFO,做区别 h.queue = h.queue[0 : len(h.queue)-1] // 获取items对应key下的item item, ok := h.items[key] if !ok { // 这是一个错误 return nil } // 删除items下key delete(h.items, key) return item.obj } // 堆是一个线程安全的生产者/消费者队列,它实现了堆数据结构。 // 它可用于实现优先级队列和类似的数据结构(其内部有索引和排序方法(lessFunc))。 type Heap struct { lock sync.RWMutex cond sync.Cond // 数据存储对象并有一个队列,根据堆不变特性保持它们的顺序。 data *heapData // closed 表示队列已关闭。 closed bool } // 添加插入一个item,并将其放入队列中。如果该项目已存在,则更新该项目。 func (h *Heap) Add(obj interface{}) error { // 根据h中heapData的keyFunc获取key key, err := h.data.keyFunc(obj) if err != nil { return KeyError{obj, err} } h.lock.Lock() defer h.lock.Unlock() // 如果h已经close if h.closed { // 返回err return fmt.Errorf(closedMsg) } // 如果heapData中items中存在key if _, exists := h.data.items[key]; exists { // 覆盖key对应的item h.data.items[key].obj = obj // 这是go自带的函数,修正items中item的顺序 heap.Fix(h.data, h.data.items[key].index) } else { // 添加到heap h.addIfNotPresentLocked(key, obj) } h.cond.Broadcast() return nil } // BulkAdd 将列表中的所有项目添加到队列中,然后通知条件变量(cond.Broadcast())。 // 它很有用当调用者希望在消费者开始处理它们之前将所有项目添加到队列时。 func (h *Heap) BulkAdd(list []interface{}) error { h.lock.Lock() defer h.lock.Unlock() // 如果h已经关闭,则返回err if h.closed { return fmt.Errorf(closedMsg) } // 遍历list for _, obj := range list { // 获取obj对应的key key, err := h.data.keyFunc(obj) if err != nil { return KeyError{obj, err} } // 判断key是否在heapData中的items中 if _, exists := h.data.items[key]; exists { // 覆盖dataHeap的items对应key的值 h.data.items[key].obj = obj // 这是go自带的函数,修正items中item的顺序 heap.Fix(h.data, h.data.items[key].index) } else { // 添加到heap h.addIfNotPresentLocked(key, obj) } } h.cond.Broadcast() return nil } // AddIfNotPresent 插入一个项目,并将其放入队列中。如果items中存在带有键的项目,则不会对该项目进行任何更改。 func (h *Heap) AddIfNotPresent(obj interface{}) error { id, err := h.data.keyFunc(obj) if err != nil { return KeyError{obj, err} } h.lock.Lock() defer h.lock.Unlock() if h.closed { return fmt.Errorf(closedMsg) } h.addIfNotPresentLocked(id, obj) h.cond.Broadcast() return nil } // addIfNotPresentLocked 假定锁已经被持有,如果它不存在,则将提供的项添加到队列中。 func (h *Heap) addIfNotPresentLocked(key string, obj interface{}) { if _, exists := h.data.items[key]; exists { return } heap.Push(h.data, &itemKeyValue{key, obj}) } // Update 与此实现中的 Add 相同。当项目不存在时,它被添加。 func (h *Heap) Update(obj interface{}) error { return h.Add(obj) } // 删除一个项目。 func (h *Heap) Delete(obj interface{}) error { key, err := h.data.keyFunc(obj) if err != nil { return KeyError{obj, err} } h.lock.Lock() defer h.lock.Unlock() if item, ok := h.data.items[key]; ok { heap.Remove(h.data, item.index) return nil } return fmt.Errorf("object not found") } // Pop 会等待一个项目准备好。如果多个项目准备就绪,它们将按照 Heap.data.lessFunc 给出的顺序返回。 func (h *Heap) Pop() (interface{}, error) { h.lock.Lock() defer h.lock.Unlock() for len(h.data.queue) == 0 { if h.closed { return nil, fmt.Errorf("heap is closed") } h.cond.Wait() } obj := heap.Pop(h.data) if obj == nil { return nil, fmt.Errorf("object was removed from heap data") } return obj, nil }
index.go
- 接口
// Indexer使用多个indices扩展 Store 并限制每个累加器(items)只保存当前对象(并在删除后为空)。 // // 这里有三种字符串: // 1. 在 Store 接口中定义的存储key, // 2. 索引的名称,以及 // 3. 一个“索引值”,它由 IndexFunc 生成, 可以是字段值或从对象计算的任何其他字符串。 type Indexer interface { Store // Index返回其索引值集与给定对象的索引值集相交的存储对象 Index(indexName string, obj interface{}) ([]interface{}, error) // IndexKeys 返回存储对象的存储键(Index中的value),其命名索引的索引值集(就是Index,一个map)包括给定的索引值(就是Index中的一个key) IndexKeys(indexName, indexedValue string) ([]string, error) // ListIndexFuncValues 返回给定索引的所有索引值 ListIndexFuncValues(indexName string) []string // ByIndex 返回存储对象,其命名索引的索引值集(就是Index,一个map)包括给定的索引值(就是Index中的一个key) ByIndex(indexName, indexedValue string) ([]interface{}, error) // GetIndexer 返回索引器 GetIndexers() Indexers // AddIndexers 向此store添加indexers。如果在store中已有数据后调用此方法,则结果是undefined。 AddIndexers(newIndexers Indexers) error }
- 函数
// IndexFuncToKeyFuncAdapter 使 indexFunc 适应 keyFunc。 这仅在您的索引函数为每个对象返回唯一值时才有用. // 当找到多个键时,此转换可能会产生错误。您应该更喜欢制作适当的键和索引函数。 func IndexFuncToKeyFuncAdapter(indexFunc IndexFunc) KeyFunc { return func(obj interface{}) (string, error) { indexKeys, err := indexFunc(obj) if err != nil { return "", err } if len(indexKeys) > 1 { return "", fmt.Errorf("too many keys: %v", indexKeys) } if len(indexKeys) == 0 { return "", fmt.Errorf("unexpected empty indexKeys") } return indexKeys[0], nil } } // MetaNamespaceIndexFunc 是一个默认的索引函数,它基于对象的命名空间进行索引 func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) { // 获取obj的meta信息 meta, err := meta.Accessor(obj) if err != nil { return []string{""}, fmt.Errorf("object has no meta: %v", err) } // 返回obj对应namespace组成的slice return []string{meta.GetNamespace()}, nil }
- 结构体
// IndexFunc 知道如何计算对象的索引值集。 type IndexFunc func(obj interface{}) ([]string, error) // Index 将索引值映射到存储中匹配该值的一组键 type Index map[string]sets.String // Indexers将索引名称映射到IndexFunc type Indexers map[string]IndexFunc // Indices 将索引名称映射到Index type Indices map[string]Index
lister.go
- 接口
// GenericLister 是通用索引器的lister接口 type GenericLister interface { // List 将返回跨命名空间的所有对象 List(selector labels.Selector) (ret []runtime.Object, err error) // Get 将尝试检索假设 name==key Get(name string) (runtime.Object, error) // ByNamespace 将为您提供一个命名空间的 GenericNamespaceLister ByNamespace(namespace string) GenericNamespaceLister } // GenericNamespaceLister 是通用索引器的lister接口 type GenericNamespaceLister interface { // List 将返回此命名空间中的所有对象 List(selector labels.Selector) (ret []runtime.Object, err error) // Get 将尝试按命名空间和名称检索 Get(name string) (runtime.Object, error) }
- 函数
// ListAll 使用从store中检索到的与选择器匹配的每个值调用 appendFn。 func ListAll(store Store, selector labels.Selector, appendFn AppendFunc) error { // 判断标签选择器是否为空 selectAll := selector.Empty() // 遍历从store中获取的所有item对应的obj for _, m := range store.List() { // 如果便签选择器为空 if selectAll { // 避免计算对象的标签以加快列出所有对象的常见流程。 appendFn(m) continue } // 获取metadata metadata, err := meta.Accessor(m) if err != nil { return err } // 判断便签选择器是否匹配obj的metadata对应的labels if selector.Matches(labels.Set(metadata.GetLabels())) { // 追加m appendFn(m) } } return nil } // ListAllByNamespace 用于列出属于命名空间并且在Indexer存在且匹配标签选择器的项目。 func ListAllByNamespace(indexer Indexer, namespace string, selector labels.Selector, appendFn AppendFunc) error { // 判断标签选择器是否为空 selectAll := selector.Empty() // 如果参数nameapace等于"" if namespace == metav1.NamespaceAll { // 遍历indexer中的所有item for _, m := range indexer.List() { // 如果便签选择器为空 if selectAll { appendFn(m) continue } // 获取metadata metadata, err := meta.Accessor(m) if err != nil { return err } // 判断便签选择器是否匹配obj的metadata对应的labels if selector.Matches(labels.Set(metadata.GetLabels())) { appendFn(m) } } return nil } // 获取在cache中以"namespace"为索引名称,以namespace形成的索引值获取的所有keys下的所有obj items, err := indexer.Index(NamespaceIndex, &metav1.ObjectMeta{Namespace: namespace}) if err != nil { // 忽略错误;在没有索引的情况下进行慢速搜索。 klog.Warningf("can not retrieve list of objects using index : %v", err) // 获取缓存中的所有item for _, m := range indexer.List() { metadata, err := meta.Accessor(m) if err != nil { return err } // 判断每个item的namespace和namespace是否相等并且标签选择器是否匹配item的标签 if metadata.GetNamespace() == namespace && selector.Matches(labels.Set(metadata.GetLabels())) { appendFn(m) } } return nil } // 遍历cache中获取到的所有items for _, m := range items { if selectAll { appendFn(m) continue } // 获取每个item的metadata metadata, err := meta.Accessor(m) if err != nil { return err } // 比较标签 if selector.Matches(labels.Set(metadata.GetLabels())) { appendFn(m) } } return nil } // NewGenericLister 为 genericLister 创建一个新实例。 func NewGenericLister(indexer Indexer, resource schema.GroupResource) GenericLister { return &genericLister{indexer: indexer, resource: resource} }
- 结构体
// AppendFunc 用于将匹配项添加到调用者正在使用的任何列表中 type AppendFunc func(interface{}) type genericLister struct { indexer Indexer // 用于缓存 resource schema.GroupResource // 表示缓存的资源 } // 获取s中所有符合标签选择器的obj func (s *genericLister) List(selector labels.Selector) (ret []runtime.Object, err error) { // 函数ListAll本文分析过,用来获取indexer下所有符合标签选择器的obj err = ListAll(s.indexer, selector, func(m interface{}) { ret = append(ret, m.(runtime.Object)) }) return ret, err } // 获取限定了namespace的Lister func (s *genericLister) ByNamespace(namespace string) GenericNamespaceLister { return &genericNamespaceLister{indexer: s.indexer, namespace: namespace, resource: s.resource} } // 以name = key方式获取indexer中的obj func (s *genericLister) Get(name string) (runtime.Object, error) { obj, exists, err := s.indexer.GetByKey(name) if err != nil { return nil, err } if !exists { return nil, errors.NewNotFound(s.resource, name) } return obj.(runtime.Object), nil } // 其方法类似于genericLister,不做分析 type genericNamespaceLister struct { indexer Indexer // 缓存 namespace string // 限定的namespace resource schema.GroupResource // 限定缓存的gr }
listwatch.go
- 接口
// Lister 是知道如何执行初始列表的对象。 type Lister interface { // List 应该返回一个列表类型的对象(什么是列表类型的对象?比如PodList); Items 字段将被提取,ResourceVersion 字段将用于在正确的位置启动监视。 List(options metav1.ListOptions) (runtime.Object, error) } // Watcher 是任何知道如何开始监视资源的对象。 type Watcher interface { // Watch 应该在指定的版本开始一个 watch。 Watch(options metav1.ListOptions) (watch.Interface, error) } // ListerWatcher 是任何知道如何执行初始列表并开始监视资源的对象。 type ListerWatcher interface { Lister Watcher } // ListFunc 知道如何列出资源 type ListFunc func(options metav1.ListOptions) (runtime.Object, error) // WatchFunc 知道如何watch资源 type WatchFunc func(options metav1.ListOptions) (watch.Interface, error) // Getter 接口知道如何从 RESTClient 访问 Get 方法。 type Getter interface { Get() *restclient.Request }
- 函数
// NewListWatchFromClient 从指定的客户端、资源、命名空间和字段选择器创建一个新的 ListWatch。 func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch { optionsModifier := func(options *metav1.ListOptions) { options.FieldSelector = fieldSelector.String() } return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier) } // NewFilteredListWatchFromClient 从指定的客户端、资源、命名空间和选项修饰符创建一个新的 ListWatch。 // Option 修饰符是一个函数,它接受一个 ListOptions 并修改ListOptions。 // 提供自定义修饰符函数以使用字段选择器、标签选择器或任何其他所需选项对 ListOptions 进行修改。 func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch { // 构建listFunc,用来执行list listFunc := func(options metav1.ListOptions) (runtime.Object, error) { // 先修改listOptions optionsModifier(&options) // 执行get,获取list类型的obj return c.Get(). Namespace(namespace). Resource(resource). VersionedParams(&options, metav1.ParameterCodec). Do(context.TODO()). Get() } // 构建watchFunc,用来执行watch watchFunc := func(options metav1.ListOptions) (watch.Interface, error) { // 修改options.Watch = true,表示是watch操作 options.Watch = true // 修改listOptions optionsModifier(&options) // 执行get,获取list类型的obj return c.Get(). Namespace(namespace). Resource(resource). VersionedParams(&options, metav1.ParameterCodec). Watch(context.TODO()) } // 构建一个ListWatch对象 return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc} }
- 结构体
// ListWatch 知道如何列出和监视一组 apiserver 资源。它满足 ListerWatcher 接口。 // 它是为 NewReflector(就是在controller中产生Reflector使用) 等提供的便利功能。 // ListFunc 和 WatchFunc 不能为 nil type ListWatch struct { ListFunc ListFunc WatchFunc WatchFunc // DisableChunking 此ListWatch请求apiserver不分块(就是不分页)。 DisableChunking bool } // List是列出一组 apiserver 资源 func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) { // ListWatch 用在已经支持分页的 Reflector 中。 // 不要在此处分页以避免重复。也就是Reflector 中会分页循环获取 return lw.ListFunc(options) } // watch一组 apiserver 资源 func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) { return lw.WatchFunc(options) }
mutation_cache.go
- 接口
// MutationCache 能够获取更新操作的结果并将它们存储在 LRU 中,该 LRU 可用于提供请求对象的当前视图。 // 它需要解析resourceVersions 以进行比较。 // 实现必须是线程安全的。 type MutationCache interface { GetByKey(key string) (interface{}, bool, error) ByIndex(indexName, indexKey string) ([]interface{}, error) Mutation(interface{}) } // ResourceVersionComparator 能够比较对象版本。 type ResourceVersionComparator interface { CompareResourceVersion(lhs, rhs runtime.Object) int }
- 函数
// NewIntegerResourceVersionMutationCache 返回一个 MutationCache,它了解如何处理具有以下资源版本的对象: // // - 是一个整数 // - 更新时增加 // - 在命名空间中的同一资源之间具有可比性 // // 大多数backends将具有这些语义。索引器可能为零。 ttl 控制一个项目在被移除之前保留在突变缓存中的时间。 // // 如果 includeAdds 为 true,则即使在底层存储中不存在也将返回突变缓存中的对象. // 只有当您使用缓存可以处理突变条目时才是安全的, 当突变和删除非常接近地发生时,保留在缓存中最多 ttl。 func NewIntegerResourceVersionMutationCache(backingCache Store, indexer Indexer, ttl time.Duration, includeAdds bool) MutationCache { return &mutationCache{ backingCache: backingCache, indexer: indexer, mutationCache: utilcache.NewLRUExpireCache(100), comparator: etcdObjectVersioner{}, ttl: ttl, includeAdds: includeAdds, } }
- 结构体
// mutationCache 不保证它返回通过 Mutation 添加的值,因为它们可以换页并且因为您无法区分“没有观察到创建”和“在创建后被删除”,如果缺少键从后备缓存中,我们总是将其作为缺失返回 type mutationCache struct { lock sync.Mutex // 用于GetByKey方法获取obj backingCache Store // 用于ByIndex获取obj indexer Indexer // 使用lru(最近最少使用算法)用来记录一个obj自上次被访问以来所经历的时间 t,当须淘汰一个obj时,选择现有obj中其 t 值最大的,即最近最少使用的obj予以淘汰。 // LRUExpireCache中有一个evictionList(链表)用来存放最近使用的obj,entries(一个map)用来存放key value mutationCache *utilcache.LRUExpireCache includeAdds bool // 初始的过期时间间隔 ttl time.Duration comparator ResourceVersionComparator } // GetByKey 不保证返回 Mutation 中设置的值。 // 它可能被调出,可能比另一个副本更旧,backingCache 可能更新,或者,您可能已将两次写入同一个键。 // 您将获得一个在某个时间快照有效的值,并且将始终返回 backingCache 和 mutationCache 中较新的一个(比较资源版本号)。 func (c *mutationCache) GetByKey(key string) (interface{}, bool, error) { c.lock.Lock() defer c.lock.Unlock() // 通过key从backingCache中获取 obj, exists, err := c.backingCache.GetByKey(key) if err != nil { return nil, false, err } // 如果不存在 if !exists { // 如果includeAdds为false if !c.includeAdds { // 我们无法区分“没有观察到创建”和“在创建后被删除”,所以如果键丢失,我们总是将其返回为丢失 return nil, false, nil } // 从突变cache(mutationCache)中获取 obj, exists = c.mutationCache.Get(key) if !exists { return nil, false, nil } } // 类型转化为runtime.Object objRuntime, ok := obj.(runtime.Object) // 如果失败,则返回 if !ok { return obj, true, nil } // 和mutationCache中的obj作比较获取最新的 return c.newerObject(key, objRuntime), true, nil } // ByIndex 返回与提供的索引和索引器键匹配的较新对象。 // 如果未提供索引器,将返回错误。 func (c *mutationCache) ByIndex(name string, indexKey string) ([]interface{}, error) { c.lock.Lock() defer c.lock.Unlock() // 如果c中indexer为空,则return err if c.indexer == nil { return nil, fmt.Errorf("no indexer has been provided to the mutation cache") } // 调用indexer的IndexKeys获取缓存keys keys, err := c.indexer.IndexKeys(name, indexKey) if err != nil { return nil, err } var items []interface{} keySet := sets.NewString() // 遍历keys for _, key := range keys { keySet.Insert(key) // 从indexer中以key为键获取 obj, exists, err := c.indexer.GetByKey(key) if err != nil { return nil, err } // 不存在则下次循环 if !exists { continue } // 类型转化 if objRuntime, ok := obj.(runtime.Object); ok { // 和mutationcache对比获取最新 items = append(items, c.newerObject(key, objRuntime)) } else { items = append(items, obj) } } // 如果includeAdds=true if c.includeAdds { // 从indexer中获取name(索引名称)对应的fn fn := c.indexer.GetIndexers()[name] // Keys() 从最旧到最新返回,因此完全遍历不会改变 LRU 行为 // 遍历mutationCache中所有keys for _, key := range c.mutationCache.Keys() { // 根据key获取mutationCache的obj updated, ok := c.mutationCache.Get(key) if !ok { continue } // 如果在backingCache中已近存在则下次循环 if keySet.Has(key.(string)) { continue } // 获取该对象的indexValue elements, err := fn(updated) if err != nil { klog.V(4).Infof("Unable to calculate an index entry for mutation cache entry %s: %v", key, err) continue } // 遍历indexValue 列表 for _, inIndex := range elements { // 如果其中有一个和参数inIndex相等,则会追加到items中 if inIndex != indexKey { continue } items = append(items, updated) break } } } return items, nil } // newerObject 从mutationCache中获取最新的对象。 // 如果 mutated 对象比支持对象更旧,则在持有锁时必须从mutationCache删除对应key func (c *mutationCache) newerObject(key string, backing runtime.Object) runtime.Object { // 根据key从mutationCache中获取 mutatedObj, exists := c.mutationCache.Get(key) // 如果不存在,则返回参数backing if !exists { return backing } // 类型转化 mutatedObjRuntime, ok := mutatedObj.(runtime.Object) if !ok { return backing } // 比较两者的resoureversion if c.comparator.CompareResourceVersion(backing, mutatedObjRuntime) >= 0 { //从mutationCache中移除对应key c.mutationCache.Remove(key) return backing } return mutatedObjRuntime } // Mutation 如果它比 backingCache中对应key的obj的resourceverison更新,则向mutationCache添加更改并且则可以在mutationCache使用 GetByKey 中返回该更改。 // 如果你在不同线程上用同一个对象调用 Mutation 两次,只有一个可以成功,另一个则会not defined。 // 这不会影响正确性,因为保证“这两个缓存”的 GetByKey 被保留,但您可能无法获得所需对象的版本。 // 你得到的对象只保证是“在某个时间点有效的对象”,而不是“我想要的对象”. func (c *mutationCache) Mutation(obj interface{}) { c.lock.Lock() defer c.lock.Unlock() // 获取obj对应的key key, err := DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { utilruntime.HandleError(err) return } // 判断obj是否是runtime.Object类型 if objRuntime, ok := obj.(runtime.Object); ok { // 从mutationCache获取key对应的obj if mutatedObj, exists := c.mutationCache.Get(key); exists { // 判断mutatedObj是否是runtime.Object类型 if mutatedObjRuntime, ok := mutatedObj.(runtime.Object); ok { // 判断objRuntime的resourceversion是否小于mutatedObjRuntime的resourceversion if c.comparator.CompareResourceVersion(objRuntime, mutatedObjRuntime) < 0 { // 什么都不做 return return } } } } // 添加key为key value为obj ttl为c.ttl到mutationCache中 c.mutationCache.Add(key, obj, c.ttl) } // etcdObjectVersioner 为具有嵌入 ObjectMeta 或 ListMeta 字段的对象实现版本控制和提取 etcd 节点信息。 type etcdObjectVersioner struct{} // ObjectResourceVersion 实现 Versioner ,获取obj的resourceversion func (a etcdObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) { // 获取obj的metadata accessor, err := meta.Accessor(obj) if err != nil { return 0, err } // 通过metadata获取resourceversion version := accessor.GetResourceVersion() if len(version) == 0 { return 0, nil } return strconv.ParseUint(version, 10, 64) } // CompareResourceVersion 比较 etcd 资源版本。 // 在这个 API 之外,它们都是字符串,但是 etcd 资源版本很特殊,它们实际上是整数,因此我们可以轻松地比较它们。 func (a etcdObjectVersioner) CompareResourceVersion(lhs, rhs runtime.Object) int { // 获取obj的resourceversion lhsVersion, err := a.ObjectResourceVersion(lhs) if err != nil { panic(err) } rhsVersion, err := a.ObjectResourceVersion(rhs) if err != nil { panic(err) } // 相等返回0,前者小于后者返回-1,前者大于后者返回1 if lhsVersion == rhsVersion { return 0 } if lhsVersion < rhsVersion { return -1 } return 1 }
mutation_detector.go
- 变量
// 表示监测是否可用 var mutationDetectionEnabled = false func init() { // 从环境变量中获取KUBE_CACHE_MUTATION_DETECTOR对应的值 mutationDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_CACHE_MUTATION_DETECTOR")) }
- 接口
// MutationDetector 能够在有限的时间窗口内监控对象的突变 type MutationDetector interface { // AddObject 将给定的对象添加到集合中(该集合只是用来缓存一段时间的突变obj) AddObject(obj interface{}) // Run 开始监控,直到监控停止才返回。 Run(stopCh <-chan struct{}) }
- 函数
// NewCacheMutationDetector 为 defaultCacheMutationDetector 创建一个新实例。 func NewCacheMutationDetector(name string) MutationDetector { // 如果mutationDetectionEnabled = false if !mutationDetectionEnabled { // 返回一个假的MutationDetector(就是里面什么也没做) return dummyMutationDetector{} } klog.Warningln("Mutation detector is enabled, this will result in memory leakage.") // 构建一个defaultCacheMutationDetector(监测周期是1秒钟,保留时间是2分钟) return &defaultCacheMutationDetector{name: name, period: 1 * time.Second, retainDuration: 2 * time.Minute} }
- 结构体
// 假的MutationDetector type dummyMutationDetector struct{} func (dummyMutationDetector) Run(stopCh <-chan struct{}) { } func (dummyMutationDetector) AddObject(obj interface{}) { } // defaultCacheMutationDetector 提供了一种检测缓存对象是否发生变异的方法,它有一个缓存对象列表及对象的所有副本。 // 该对象还没有一种方法可以看到变异它的过程,只是得到变异的结果。 type defaultCacheMutationDetector struct { name string period time.Duration // compareLock 确保一次只运行一次对 CompareObjects 的调用 compareObjectsLock sync.Mutex // addLock 保护 AddObject 和 CompareObjects 之间的 addedObjs addedObjsLock sync.Mutex addedObjs []cacheObj cachedObjs []cacheObj retainDuration time.Duration lastRotated time.Time retainedCachedObjs []cacheObj // failureFunc 可用于单元测试。如果你没有它,这个过程就会panic。 // 这种panic是故意的,因为打开这个检测表明你想要一个强烈的失败信号。 failureFunc func(message string) } // 主要做了三件事: // 1.追加addedObjs到cachedObjs // 2.遍历cachedObjs和retainedCachedObjs,看是否有报警信息 // 3.有报警信息,则查看failureFunc不为nil,则执行failureFunc,否则panic func (d *defaultCacheMutationDetector) CompareObjects() { d.compareObjectsLock.Lock() defer d.compareObjectsLock.Unlock() // 在锁定状态下将 addedObjs 移动到 cachedObjs 中,这可以保持临界区较小以避免在我们比较 cachedObjs 时阻塞 AddObject d.addedObjsLock.Lock() // 追加addedObjs到cachedObjs d.cachedObjs = append(d.cachedObjs, d.addedObjs...) // 清空addedObjs d.addedObjs = nil d.addedObjsLock.Unlock() // 初始化altered=false,表示是否报警 altered := false // 遍历cachedObjs for i, obj := range d.cachedObjs { // 比较cachedObj中的obj和copyobj是否相等 if !reflect.DeepEqual(obj.cached, obj.copied) { fmt.Printf("CACHE %s[%d] ALTERED!\n%v\n", d.name, i, diff.ObjectGoPrintSideBySide(obj.cached, obj.copied)) // 设置altered = true altered = true } } // 遍历retainedCachedObjs for i, obj := range d.retainedCachedObjs { // 比较cachedObj中的obj和copyobj是否相等 if !reflect.DeepEqual(obj.cached, obj.copied) { // 设置altered = true fmt.Printf("CACHE %s[%d] ALTERED!\n%v\n", d.name, i, diff.ObjectGoPrintSideBySide(obj.cached, obj.copied)) altered = true } } // 如果altered = true if altered { // 构建报警信息 msg := fmt.Sprintf("cache %s modified", d.name) // 如果failureFunc不为nil if d.failureFunc != nil { // 执行失败函数 d.failureFunc(msg) return } // 如果failureFunc为nil,则panic panic(msg) } } // cacheObj 保存实际对象和副本 type cacheObj struct { cached interface{} copied interface{} } // 实现MutationDetector接口的Run方法 func (d *defaultCacheMutationDetector) Run(stopCh <-chan struct{}) { // 无限for循环 for { // 判断最后一次旋转日期(表示cachedObjs赋值给retainedCachedObjs并清空cachedObjs时的时间)是否为o if d.lastRotated.IsZero() { // 表示第一次for循环 // 设置最后一次旋转日期为当前时间 d.lastRotated = time.Now() } else if time.Since(d.lastRotated) > d.retainDuration { // 如果目前日期和最后一次旋转日期之间时长大于要进入保留的时长 // 缓存objs赋值给保留缓存objs d.retainedCachedObjs = d.cachedObjs // 清空缓存objs,设置最后一次旋转日期为目前时间 d.cachedObjs = nil d.lastRotated = time.Now() } // 遍历cachedObjs和retainedCachedObjs,比较其中的obj和copyobj是否相等形成alert信息 d.CompareObjects() // 退出或停留d.period时间 select { case <-stopCh: return case <-time.After(d.period): } } } // AddObject 制作对象的深层副本以供以后比较(在CompareObjects方法中)。 // 它只适用于 runtime.Object 但它涵盖了我们的绝大多数缓存对象 // 该方法在shared_informer.go的HandleDeltas中用到 func (d *defaultCacheMutationDetector) AddObject(obj interface{}) { // 判断是否是DeletedFinalStateUnknown类型对象,如果是则直接return if _, ok := obj.(DeletedFinalStateUnknown); ok { return } // // 判断是否是runtime.Object类型对象 if obj, ok := obj.(runtime.Object); ok { // 深拷贝 copiedObj := obj.DeepCopyObject() d.addedObjsLock.Lock() defer d.addedObjsLock.Unlock() // 形成新的cacheObj并追加到addedObjs d.addedObjs = append(d.addedObjs, cacheObj{cached: obj, copied: copiedObj}) } }
reflector.go
- 接口
// ResourceVersionUpdater 是一个接口,允许商店(store)实现跟踪(也就是用map存储每次更新放射器资源版本都会存储在slice中)反射器的当前资源版本。 type ResourceVersionUpdater interface { // 每次更新反射器的当前资源版本时都会调用 UpdateResourceVersion。 UpdateResourceVersion(resourceVersion string) }
- 结构体
// Reflector 监视指定的资源并使所有变更映射到给定的存储(store)中。 type Reflector struct { // 名称标识此反射器。 name string // 我们希望放置在商店中的类型的名称。 如果提供expectedGVK,名称将是 expectedGVK 的字符串化,否则为 expectedType 的字符串化。 // 它仅用于显示,不应用于解析或比较。 expectedTypeName string // 我们希望放置在商店中的类型的示例对象。只有类型需要是正确的。除非是 `unstructured.Unstructured` 对象的 `"apiVersion"` 和`"kind"` 必须是正确的(比较的是expectedGVK)。 expectedType reflect.Type // 如果是非结构化,表示我们希望放置在商店中的对象的 GVK。 expectedGVK *schema.GroupVersionKind // 同步watch源,出入的目的地(其实是一个queue) store Store // listerWatcher 用于执行列表和监视。 listerWatcher ListerWatcher // 退避管理 ListWatch 的退避(用于重试) backoffManager wait.BackoffManager // initConnBackoffManager 通过 ListAndWatch 的 Watch 调用管理回退初始连接(就是如果ListAndWatch.Watch返回结果有err,并且如果err是连接性错误,那么会backoff一段时间重新执行)。 initConnBackoffManager wait.BackoffManager resyncPeriod time.Duration // ShouldResync 被定期调用,每当它返回 `true` 时,都会调用 Store 的 Resync 操作 ShouldResync func() bool clock clock.Clock // paginatedResult 定义是否应该为列表调用强制分页。 // 它是根据初始列表调用的结果设置的。 paginatedResult bool // lastSyncResourceVersion 是最后的资源版本,在底层存储进行同步时观察到,它是线程安全的,但不与底层存储同步 lastSyncResourceVersion string // isLastSyncResourceVersionUnavailable 如果listAndWatch的list方法或带有lastSyncResourceVersion 的观察请求失败并出现“过期”或“资源版本太大”错误,则为真。 // 出现这种情况refactor会根据relistResourceVersion的版本从新获取,避免refactor结束执行 isLastSyncResourceVersionUnavailable bool // lastSyncResourceVersionMutex 保护对 lastSyncResourceVersion 的读/写访问 lastSyncResourceVersionMutex sync.RWMutex // WatchListPageSize 是请求的初始和重新同步监视列表的块大小。 // 注意:应该谨慎使用它,因为分页列表总是直接从 etcd 获取,这显着降低效率并可能导致严重的性能和可伸缩性问题。 WatchListPageSize int64 // 每当 ListAndWatch 因错误断开连接时调用。 watchErrorHandler WatchErrorHandler } // 每当 ListAndWatch 断开连接并出现错误时,就会调用 WatchErrorHandler。调用此处理程序后,通知者将退避并重试。 type WatchErrorHandler func(r *Reflector, err error) // 设置reflector的预期类型(也就是watch的type) func (r *Reflector) setExpectedType(expectedType interface{}) { r.expectedType = reflect.TypeOf(expectedType) if r.expectedType == nil { r.expectedTypeName = defaultExpectedTypeName return } // 注意这里设置的是type的转化为str的值 r.expectedTypeName = r.expectedType.String() // 判断是否是非机构化类型 if obj, ok := expectedType.(*unstructured.Unstructured); ok { // 如果为true,则应该还需要设置expectedGVK和expectedTypeName gvk := obj.GroupVersionKind() if gvk.Empty() { klog.V(4).Infof("Reflector from %s configured with expectedType of *unstructured.Unstructured with empty GroupVersionKind.", r.name) return } // 如果转化为Unstructured成功后,设置expectedGVK r.expectedGVK = &gvk // 注意这里会覆盖之前设置的type.String()值,替换为gvk.String r.expectedTypeName = gvk.String() } } // Run 重复使用反射器的 ListAndWatch 来获取所有对象和后续增量。 // 当 stopCh 关闭时,Run 将退出。 func (r *Reflector) Run(stopCh <-chan struct{}) { klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) // 注意这里使用了backoffManager,来控制下次何时再次调用 wait.BackoffUntil(func() { if err := r.ListAndWatch(stopCh); err != nil { r.watchErrorHandler(r, err) } }, r.backoffManager, true, stopCh) klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) } var ( // 任何东西都不会被发送到这个channel neverExitWatch <-chan time.Time = make(chan time.Time) // 用于指示由于来自反射器客户端的停止通道的信号而停止watch。 errorStopRequested = errors.New("Stop requested") ) // resyncChan 返回一个通道,该通道将在需要重新同步时接收一些东西,以及一个清理函数stop。 func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) { if r.resyncPeriod == 0 { return neverExitWatch, func() bool { return false } } t := r.clock.NewTimer(r.resyncPeriod) return t.C(), t.Stop } // LastSyncResourceVersion 是上次与底层存储(值得是etcd)同步时观察到的资源版本 // 注意: 返回的值与对底层存储的访问不同步并且不是线程安全的 func (r *Reflector) LastSyncResourceVersion() string { r.lastSyncResourceVersionMutex.RLock() defer r.lastSyncResourceVersionMutex.RUnlock() return r.lastSyncResourceVersion } func (r *Reflector) setLastSyncResourceVersion(v string) { r.lastSyncResourceVersionMutex.Lock() defer r.lastSyncResourceVersionMutex.Unlock() r.lastSyncResourceVersion = v } // relistResourceVersion 确定反射器应该列出或重新列出的资源版本。 // 返回 lastSyncResourceVersion 以便此反射器将重新列出不早于已在重新列出结果或观察事件中观察到的资源版本, // 或者,如果最后一个 relist 导致 HTTP 410 (Gone) 状态代码,则返回 "" 以便 relist 将通过法定读取使用 etcd 中可用的最新资源版本。 func (r *Reflector) relistResourceVersion() string { r.lastSyncResourceVersionMutex.RLock() defer r.lastSyncResourceVersionMutex.RUnlock() // listAndWatch执行list或者watch时,出现expired或者too large resource version 错误时,会设置isLastSyncResourceVersionUnavailable为true if r.isLastSyncResourceVersionUnavailable { // 则会返回"",表示会再次读取etcd的resource version return "" } // 如果r.lastSyncResourceVersion为"" if r.lastSyncResourceVersion == "" { // 表示初始工作,默认版本为0 return "0" } return r.lastSyncResourceVersion } // syncWith 用给定的列表替换store(缓存)的项目。 func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error { found := make([]interface{}, 0, len(items)) for _, item := range items { found = append(found, item) } return r.store.Replace(found, resourceVersion) } // watchHandler 监视 w 并保持 *resourceVersion 最新。 func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { eventCount := 0 // 停止观察者应该是幂等的,如果我们从这个函数返回,我们就不可能回到相同的观察界面。 defer w.Stop() loop: // for循环 for { select { case <-stopCh: return errorStopRequested case err := <-errc: return err case event, ok := <-w.ResultChan(): if !ok { break loop } if event.Type == watch.Error { return apierrors.FromObject(event.Object) } // 如果expectedType不为空 if r.expectedType != nil { // 判断watch到的event的obj是否是该类型 if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a { utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a)) continue } } // 这个是针对unstructure类型 if r.expectedGVK != nil { // 判断watch到的event的obj的gvk if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a { utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a)) continue } } // 获取event的obj的metadata meta, err := meta.Accessor(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) continue } // 获取event中obj的resource version // 根据不同的event type 做不同的操作 switch event.Type { case watch.Added: // add操作 // deltaFifo add event.obj err := r.store.Add(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err)) } case watch.Modified:// modify 操作 // 更新deltaFifo 中event.obj err := r.store.Update(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) } case watch.Deleted:// delete 操作 // TODO:是否有任何消费者需要访问在 event.Object 中传递的“最后已知状态”?如果是这样,可能需要更改此设置。 err := r.store.Delete(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err)) } case watch.Bookmark: // `Bookmark` 表示watch已经同步到这里,只需更新资源版本 default: utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) // 更新resourceVersion *resourceVersion = newResourceVersion // 更新最新同步的资源版本 r.setLastSyncResourceVersion(newResourceVersion) // 如果r.store是ResourceVersionUpdater类型,则更新ResourceVersion if rvu, ok := r.store.(ResourceVersionUpdater); ok { rvu.UpdateResourceVersion(newResourceVersion) } eventCount++ } } watchDuration := r.clock.Since(start) if watchDuration < 1*time.Second && eventCount == 0 { return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name) } klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount) return nil } // reflector的核心 // ListAndWatch首先列出所有item并获取调用时刻的资源版本,然后使用资源版本进行观看。 func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name) var resourceVersion string // 获取reflector listAndWatch的最后资源版本 options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()} if err := func() error { initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name}) defer initTrace.LogIfLong(10 * time.Second) var list runtime.Object var paginatedResult bool var err error listCh := make(chan struct{}, 1) panicCh := make(chan interface{}, 1) go func() { defer func() { if r := recover(); r != nil { panicCh <- r } }() // 尝试分块收集列表,如果listerWatcher支持,如果不支持,首次列表请求将返回resource的完整响应。 pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { return r.listerWatcher.List(opts) })) // switch来设置pager的pageSize(分页大小) switch { case r.WatchListPageSize != 0: pager.PageSize = r.WatchListPageSize case r.paginatedResult: case options.ResourceVersion != "" && options.ResourceVersion != "0": // 用户没有明确请求分页。 pager.PageSize = 0 } // 通过分页方式获取全部数据 list, paginatedResult, err = pager.List(context.Background(), options) // 判断err是否是过期(资源版本已过期,或者缓存可能尚未同步到etcd的资源版本)或者资源版本太大 if isExpiredError(err) || isTooLargeResourceVersionError(err) { //设置LastSyncResourceVersionUnavailable为true,listAndWatch时表示不会通过rv来过滤resource r.setIsLastSyncResourceVersionUnavailable(true) // 如果用于列出的资源版本不可用,请立即重试。所以这时我们需要回退到 resourceVersion="" 来恢复并确保reflector向前推进。 list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}) } close(listCh) }() select { case <-stopCh: return nil case r := <-panicCh: panic(r) case <-listCh: } if err != nil { return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err) } // 我们检查列表是否已分页,如果是,则基于此设置 reflector的paginatedResult。 // 但是,我们只想对初始列表执行此操作(这是我们设置 ResourceVersion="0" 时的唯一情况)。 // 其背后的原因是,在某些情况下,我们可能会强制直接从 etcd 列出(通过设置 ResourceVersion="",也就是说其实apiserver中存在watch cache) ,这将返回分页结果,即使启用了监视缓存。 // 但是,在这种情况下,如果可能,我们仍然希望发送请求以查看watch cache。 // // 为 ResourceVersion="0" 的请求返回的分页结果意味着watch cache(apiserver中)被禁用并且有很多给定类型的对象,详情可以看apiserver中pkg/registry/generic/registry/store.go中的ListPredicate方法。 // 在这种情况下,不需要限制从watch cache中列出。 if options.ResourceVersion == "0" && paginatedResult { r.paginatedResult = true } // 到这里表明list操作成功,设置IsLastSyncResourceVersionUnavailable为false r.setIsLastSyncResourceVersionUnavailable(false) initTrace.Step("Objects listed") listMetaInterface, err := meta.ListAccessor(list) if err != nil { return fmt.Errorf("unable to understand list result %#v: %v", list, err) } // 获取资源版本 resourceVersion = listMetaInterface.GetResourceVersion() initTrace.Step("Resource version extracted") // 返回list中item组成一个新的slice items, err := meta.ExtractList(list) if err != nil { return fmt.Errorf("unable to understand list result %#v (%v)", list, err) } initTrace.Step("Objects extracted") // 同步,也就是用items(从etcd中获取的),替换到store(缓存中) if err := r.syncWith(items, resourceVersion); err != nil { return fmt.Errorf("unable to sync list result: %v", err) } initTrace.Step("SyncWith done") // 设置最新的资源版本 r.setLastSyncResourceVersion(resourceVersion) initTrace.Step("Resource version updated") return nil }(); err != nil { return err } resyncerrc := make(chan error, 1) cancelCh := make(chan struct{}) defer close(cancelCh) go func() { // 获取同步channel和清除函数cleanup resyncCh, cleanup := r.resyncChan() defer func() { cleanup() // Call the last one written into cleanup }() for { select { case <-resyncCh: case <-stopCh: return case <-cancelCh: return } // 判断ShouldResync,如果应该resync(这里判断的标准 参考分析store) if r.ShouldResync == nil || r.ShouldResync() { klog.V(4).Infof("%s: forcing resync", r.name) // 执行store 的resync方法 if err := r.store.Resync(); err != nil { resyncerrc <- err return } } // 执行清除方法 cleanup() // 重新获取新的 resyncCh, cleanup = r.resyncChan() } }() for { select { case <-stopCh: return nil default: } // 随机一个timeout(在minWatchTimeout的1-2倍之间) timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) // 构建执行watch的选项 options = metav1.ListOptions{ ResourceVersion: resourceVersion, // 我们希望避免挂起观察者的情况。停止在超时窗口内未收到任何事件的所有 wacher。 TimeoutSeconds: &timeoutSeconds, // 为了减少watch重启时 kube-apiserver 的负载,您可以启用 watch bookmarks。 // Reflector 根本不假设书签被返回(如果服务器不支持watch bookmarks,它将忽略此字段)。 AllowWatchBookmarks: true, } // 在发送请求之前启动时钟,因为某些代理在发送第一个观察事件之后才会刷新标头 start := r.clock.Now() // 获取listAndWatch的watch方法(具体可以参考controller-runtime的informers_map.go的createxxxListWatch函数) w, err := r.listerWatcher.Watch(options) if err != nil { // 如果时连接拒绝或者时请求太多异常,那么可以backoff if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) { <-r.initConnBackoffManager.Backoff().C() continue } return err } // 执行watchHandler方法 if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil { if err != errorStopRequested { switch { case isExpiredError(err): // 通常由于指定了resource version查询时产生(resource version 太久) klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) case apierrors.IsTooManyRequests(err): // 通常是api request太多 klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName) <-r.initConnBackoffManager.Backoff().C() continue default: klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) } } return nil } } }
- 函数
// DefaultWatchErrorHandler 是 WatchErrorHandler 的默认实现 func DefaultWatchErrorHandler(r *Reflector, err error) { switch { // 如果是过期err case isExpiredError(err): klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) case err == io.EOF: case err == io.ErrUnexpectedEOF: klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err) default: utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err)) } } // NewNamespaceKeyedIndexerAndReflector 创建一个 Indexer 和一个 Reflector, 索引器被配置为 key on namespace(就是keyFunc中产生的数据包含namespace) func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) { indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc}) reflector = NewReflector(lw, expectedType, indexer, resyncPeriod) return indexer, reflector } // internalPackages 是在创建默认反射器名称时忽略的包。也就是在NewReflector中,创建name(会忽略该数组中调用包,来组合成name) var internalPackages = []string{"client-go/tools/cache/"} // NewReflector 创建一个新的 Reflector 对象,它将使给定的存储与给定资源的服务器内容保持同步。 // Reflector 承诺只将类型为 expectedType 的东西放入 store,除非 expectedType 为 nil(也会比较expectedGVK)。 // 如果 resyncPeriod 非零,则反射器会定期查询它的 ShouldResync 函数来决定是否调用 Store 的 Resync 操作; `ShouldResync==nil` 表示总是“是”。 // 这使您能够使用反射器来定期处理所有(ShouldResync -- store.Resync)内容以及增量(pager.List -- watchHandler)处理更改的内容。 func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod) } // NewNamedReflector 与 NewReflector 相同,但具有指定的reflector(也可以理解为日志记录)名称 func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { realClock := &clock.RealClock{} r := &Reflector{ name: name, listerWatcher: lw, store: store, // 我们过去每 1 秒(1 QPS)调用一次,这里的目标是API 服务器不健康时实现约 98% 的流量减少。使用这些参数,退避将在 [30,60) 秒间隔停止,即0.22 QPS。如果我们没有退避 2 分钟,假设 API 服务器是健康的,我们重置退避。 backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock), initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock), resyncPeriod: resyncPeriod, clock: realClock, watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), } r.setExpectedType(expectedType) return r }
shared_informer.go
- 接口
// SharedInformer 提供其客户端与给定对象集合状态的最终一致。 // 对象由其 API 组、种类/资源、命名空间(如果有)和名称标识;就本合约而言,`ObjectMeta.UID` 不是对象 ID 的一部分。 // 一个 SharedInformer 提供与特定 API 组和种类/资源的对象的链接。 // SharedInformer 的链接对象集合可以进一步限制为一个命名空间(如果适用)或标签选择器或字段选择器。 // // 对象的权威状态是 apiserver 提供的访问,并且对象经过严格的状态序列。 // 对象状态要么 (1) 存在 ResourceVersion 和其他适当的内容,要么 (2) “不存在”。 // // SharedInformer 维护本地缓存 --- 由 GetStore()、GetIndexer() 在索引通知者的情况下以及可能由参与创建和或访问通知者的机器公开 --- 每个相关对象的状态. // 这个缓存最终与权威状态(etcd中的)一致。(通过resync) // 这意味着,除非被持久通信问题阻止,否则如果某个特定对象 ID X 与状态 S 权威相关,那么对于其集合包括 (X, S) 的每个 SharedInformer I, // 最终要么I 的缓存将 X 与S 或 X 的后续状态,要么 I 停止,或 X 的权威状态服务终止。 // 为了正式完整,我们说不存在状态满足标签选择器或字段选择器的任何限制。 // // 对于给定的通知者和相关对象 ID X,出现在通知者缓存中的状态序列是与 X 权威关联的状态的子序列。 // 也就是说,某些状态可能永远不会出现在缓存中,但出现状态之间的排序是正确的。 // 但是请注意,对于不同对象所见的状态之间的排序并没有保证。 // // 本地缓存开始时为空,并在“Run()”期间填充和更新。 // // 举一个简单的例子,如果一个对象的集合此后不变,则创建一个链接到该集合的 SharedInformer,并且该 SharedInformer 是 `Run()`, // 那么 SharedInformer 的缓存最终保存了该集合的精确副本(除非它过早停止,权威状态服务结束,或两者之间的通信问题持续阻碍实现)。 // // 作为另一个简单的例子,如果本地缓存曾经为某个对象 ID 保持非缺失状态,并且该对象最终从权威状态中移除, // 那么最终该对象将从本地缓存中移除(除非 SharedInformer 过早停止 ,权威状态服务结束,或者通信问题持续阻碍预期结果)。 // // 客户端通知发生在相应的缓存更新之后,如果是 SharedIndexInformer,则发生在相应的索引更新之后 // 在这种规定的通知之前,可能会发生额外的缓存和索引更新。 // 因为`ObjectMeta.UID` 没有识别对象的作用, 当 SharedInformer 的本地缓存中的(1)对象 O1 具有 ID(例如命名空间和名称)X 和 `ObjectMeta.UID` U1 时, // 可能会被删除之后 (2) 另一个具有 ID X 和 ObjectMeta.UID U2 的对象 O2 被创建, 通知者的客户端不会收到 (1) 和 (2) 的通知,而是只收到从 O1 到 O2 的更新通知. // 需要检测这种情况的客户端可以通过比较处理更新通知的代码中的新旧对象的 `ObjectMeta.UID` 字段来实现通知(即 ResourceEventHandler 的 `OnUpdate` 方法) type SharedInformer interface { // AddEventHandler 使用共享通知程序的重新同步周期向共享通知程序添加事件处理程序。 // 单个处理程序的事件按顺序传递,但不同处理程序之间没有协调。 AddEventHandler(handler ResourceEventHandler) // AddEventHandlerWithResyncPeriod 向共享通知者添加事件处理程序,并具有请求的重新同步周期(在reflector的ShouldResync方法中使用,用来判断是否该同步etcd); // 零表示此处理程序不关心重新同步。重新同步操作包括向通知者本地缓存中的每个对象的处理程序发送更新通知;它不会添加与权威存储的任何交互。 // 一些通知者根本不进行重新同步,甚至对于添加了非零 resyncPeriod 的处理程序也不进行。 // 对于进行重新同步的通知者,以及对于每个请求重新同步的处理程序,该通知者会制定一个标称的重新同步周期,该周期不短于请求的周期,但可能更长。 // 任何两次重新同步之间的实际时间可能比标称周期长,因为实现需要时间来完成工作,并且可能存在竞争负载和调度干扰。 AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) // GetStore 将通知者的本地缓存作为 Store 返回。 GetStore() Store //GetController 已弃用,它没有任何用处 GetController() Controller // Run 启动并运行共享的 Informer,停止后返回。 Run(stopCh <-chan struct{}) // 如果共享通知者的存储已被通知,至少一个通知者对象集合的权威状态的完整列表通知,则 HasSynced 返回 true。这与“重新同步”无关。 HasSynced() bool // LastSyncResourceVersion 是上次与底层存储同步时观察到的资源版本。返回的值与对底层存储的访问不同步,并且不是线程安全的。 LastSyncResourceVersion() string // 每当 ListAndWatch 断开连接并出现错误时,就会调用 WatchErrorHandler。调用此处理程序后,通知者将退避并重试。 SetWatchErrorHandler(handler WatchErrorHandler) error } // SharedIndexInformer 提供了基于 SharedInformer 并增加了添加和获取索引器的能力。 type SharedIndexInformer interface { SharedInformer // AddIndexers 在 Informer 开始之前将索引器添加到 Informer。 AddIndexers(indexers Indexers) error GetIndexer() Indexer }
- 结构体
// `*sharedIndexInformer` 实现了 SharedIndexInformer 并具有三个主要组件。 // 一个是索引本地缓存,`indexer Indexer`。 // 第二个主要组件是一个controller,它使用 ListerWatcher 拉取对象/通知并将它们推入DeltaFIFO ---其 knownObjects 是通知者的本地缓存Indexer --- 同时从该先进先出法中弹出 Deltas 值并使用`sharedIndexInformer::HandleDeltas`处理。 // 每次对 HandleDeltas 的调用(在持有 fifo 的锁的情况下完成)依次处理每个 Delta。 // 对于每个 Delta,这都会更新本地缓存并将相关通知填充到 sharedProcessor 中。 // 第三个主要组件是 sharedProcessor,它负责将这些通知转发给每个 Informer 的客户端(执行reconcile)。 type sharedIndexInformer struct { indexer Indexer controller Controller processor *sharedProcessor cacheMutationDetector MutationDetector listerWatcher ListerWatcher // objectType 是此通知者应处理的类型的示例对象。只有类型需要是正确的,除了当它是 `unstructured.Unstructured` 时,但对象的 `"apiVersion"` 和 `"kind"` 也必须是正确的。 objectType runtime.Object // resyncCheckPeriod 是我们希望reflector的重新同步计时器触发的频率,以便它可以调用 shouldResync 来检查我们的任何侦听器是否需要重新同步。 resyncCheckPeriod time.Duration // defaultEventHandlerResyncPeriod 是通过 AddEventHandler 添加的任何处理程序的默认重新同步周期(即它们不指定一个,只想使用共享通知者的默认值)。 defaultEventHandlerResyncPeriod time.Duration clock clock.Clock // 标识informer是否已经启动或者停止 started, stopped bool startedLock sync.Mutex // blockDeltas 提供了一种停止所有事件分发的方法,以便迟到的事件处理程序可以安全地加入共享通知者。 blockDeltas sync.Mutex // 每当 ListAndWatch 因错误断开连接时调用。 watchErrorHandler WatchErrorHandler } // add/update/delete Notification机构提 type updateNotification struct { oldObj interface{} newObj interface{} } type addNotification struct { newObj interface{} } type deleteNotification struct { oldObj interface{} } // 设置listwatcher处理出现err时的处理函数 func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) error { s.startedLock.Lock() defer s.startedLock.Unlock() if s.started { return fmt.Errorf("informer has already started") } s.watchErrorHandler = handler return nil } // 启动Informer func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() // 创建DeltaFIFO,indexer用于识别obj是否在集群中 fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KnownObjects: s.indexer, // 兼容以前代码做的字段,设置为true,标识在resync时使用type为replace,而不是sync EmitDeltaTypeReplaced: true, }) // 构建controller.Config, 供controller和reflector使用 cfg := &Config{ // deltaFifo,存储watch event Queue: fifo, // 用来监听和获取Obj event ListerWatcher: s.listerWatcher, // 要监听的obj type ObjectType: s.objectType, // 用来判断是否需要resync(resync的同步周期) FullResyncPeriod: s.resyncCheckPeriod, RetryOnError: false, // 注意这里设置的是从sharedProcessor中的shouldResync方法(方法中用到了上面的FullResyncPeriod,用来判断是否需要resync) ShouldResync: s.processor.shouldResync, // Process表示用来处理上面fifo中item的方法 Process: s.HandleDeltas, WatchErrorHandler: s.watchErrorHandler, } func() { s.startedLock.Lock() defer s.startedLock.Unlock() // 构造controller并赋值给sharedInformer s.controller = New(cfg) s.controller.(*controller).clock = s.clock s.started = true }() // 单独的停止通道,因为processor应该在controller之后严格停止 processorStopCh := make(chan struct{}) var wg wait.Group defer wg.Wait() // 等待处理器停止 defer close(processorStopCh) // 告诉处理器停止 // 用来处理deltaFifo中的even.obj是否有突变(通过reflect.DeepEqual对比obj和copyobj)的obj wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) // 用来处理go runtime其中包含的所有processorListener的run(处理nextCh中的item)和pop(流转addCh到nextCh) wg.StartWithChannel(processorStopCh, s.processor.run) defer func() { s.startedLock.Lock() defer s.startedLock.Unlock() s.stopped = true // 用来组织新创建的processorListener的加入 }() // 开启controller s.controller.Run(stopCh) } // 判断是否已同步完成(其实还是包装了controller.HasSynced()) func (s *sharedIndexInformer) HasSynced() bool { s.startedLock.Lock() defer s.startedLock.Unlock() if s.controller == nil { return false } return s.controller.HasSynced() } func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) { s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod) } // 处理deltaFIFO的方法 func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() // 从最旧到最新 for _, d := range obj.(Deltas) { switch d.Type { // 对于Sync, Replaced, Added, Updated都是走的同一个case case Sync, Replaced, Added, Updated: // 添加even.obj到MutationDetector s.cacheMutationDetector.AddObject(d.Object) // 根据索引器判断是否存在对应的obj if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { // 更新对应的obj if err := s.indexer.Update(d.Object); err != nil { return err } // 标识是否是类型同步 isSync := false switch { // Sync case d.Type == Sync: // 同步事件仅传播到请求重新同步的侦听器 isSync = true case d.Type == Replaced: // 获取even.obj的metadata数据 if accessor, err := meta.Accessor(d.Object); err == nil { // 获取从indexer中获取的obj的metadata数据 if oldAccessor, err := meta.Accessor(old); err == nil { // 未更改 resourceVersion 的替换事件被视为重新同步事件,并且仅传播到请求重新同步的侦听器 isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion() } } } // 分发notification到listeners s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { // 添加obj到indexer if err := s.indexer.Add(d.Object); err != nil { return err } // 分发notification到listeners s.processor.distribute(addNotification{newObj: d.Object}, false) } case Deleted: // 从indexer删除obj if err := s.indexer.Delete(d.Object); err != nil { return err } // 分发notification到listeners s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil } // sharedProcessor 有一个processorListener 的集合,可以将一个通知对象分发给它的侦听器。 // 有两种分发操作。 // 同步分发进入侦听器的子集,(a) 在周期调用 shouldResync 时重新计算添加的listener和 (b) 最初放入每个侦听器。 // 非同步分发到每个侦听器。 type sharedProcessor struct { listenersStarted bool listenersLock sync.RWMutex listeners []*processorListener syncingListeners []*processorListener clock clock.Clock wg wait.Group } // 在locked后调用的addListener func (p *sharedProcessor) addListenerLocked(listener *processorListener) { p.listeners = append(p.listeners, listener) p.syncingListeners = append(p.syncingListeners, listener) } // 添加Listener func (p *sharedProcessor) addListener(listener *processorListener) { p.listenersLock.Lock() defer p.listenersLock.Unlock() p.addListenerLocked(listener) // 如果sharedProcessor已经开启 if p.listenersStarted { // go runtime 参数listener run/ pop p.wg.Start(listener.run) p.wg.Start(listener.pop) } } // shouldResync 根据每个侦听器的 resyncPeriod 查询每个侦听器以确定它们中的任何一个是否需要重新同步。只要有一个shouldResync 则返回true func (p *sharedProcessor) shouldResync() bool { p.listenersLock.Lock() defer p.listenersLock.Unlock() // 每次判定shouldResync,都会清空p.syncingListeners p.syncingListeners = []*processorListener{} resyncNeeded := false now := p.clock.Now() for _, listener := range p.listeners { // 需要遍历所有侦听器以查看它们是否需要重新同步,以便我们可以准备将要重新同步的任何侦听器。 if listener.shouldResync(now) { resyncNeeded = true p.syncingListeners = append(p.syncingListeners, listener) // 计算下次同步时间 listener.determineNextResync(now) } } return resyncNeeded } // 改变sharedProcessor中所有processorListener的resyncPeriod func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod) listener.setResyncPeriod(resyncPeriod) } } // processorListener 将通知从 sharedProcessor 中继到一个 ResourceEventHandler --- 使用两个 goroutine、两个无缓冲通道和一个无界环形缓冲区。 // `add(notification)` 函数将给定的通知发送到 `addCh`。 // 一个 goroutine 运行 `pop()`,它使用环形缓冲区中的存储将通知从 `addCh` 泵送到 `nextCh`。 // 另一个 goroutine 运行 `run()`,它接收来自 `nextCh` 的通知并同步调用适当的处理程序方法。 // // processorListener 还动态调整侦听器的请求重新同步周期。 type processorListener struct { // 两个无缓冲通道 nextCh chan interface{} addCh chan interface{} // 处理程序方法--一般用户自定义 handler ResourceEventHandler // pendingNotifications 是一个无界环形缓冲区,用于保存所有尚未分发的通知。 // 每个侦听器有一个,但是失败/停止的侦听器将添加无限的挂起通知,直到我们 OOM。 // TODO:这并不比以前更糟,因为reflector由无界 DeltaFIFO 支持,但我们应该尝试做一些更好的事情。 pendingNotifications buffer.RingGrowing // requestResyncPeriod 是侦听器希望从共享通知者完全重新同步的频率,但通过两次调整(详情见sharedInformer的AddEventHandlerWithResyncPeriod)进行了修改。 // 一种是强加一个下限,`minimumResyncPeriod`。 // 另一个是另一个下限,sharedProcessor 的 `resyncCheckPeriod`,它被强加于 (a) 仅在sharedProcessor 启动之后进行的 AddEventHandlerWithResyncPeriod 调用中,并且 (b) 仅当通知者完全重新同步时。 requestedResyncPeriod time.Duration // resyncPeriod 是将在此侦听器的逻辑中使用的阈值。只有当 sharedIndexInformer 不进行重新同步时,此值才与 requestedResyncPeriod 不同,在这种情况下,此处的值为零。 // 重新同步之间的实际时间取决于 sharedProcessor 的 `shouldResync` 函数何时被调用以及何时 sharedIndexInformer 处理 `Sync` 类型的 Delta 对象。 resyncPeriod time.Duration // nextResync 是侦听器应该获得完全重新同步的最早时间 nextResync time.Time // resyncLock 保护对 resyncPeriod 和 nextResync 的访问 resyncLock sync.Mutex } // 添加方法。用于分发event时调用 func (p *processorListener) add(notification interface{}) { p.addCh <- notification } // 用于将addCh中的item 转发到nextCh中 func (p *processorListener) pop() { defer utilruntime.HandleCrash() defer close(p.nextCh) var nextCh chan<- interface{} var notification interface{} // for循环 for { select { // 非空变量notification 入nextCh case nextCh <- notification: var ok bool // 从pendingNotifications中读取一个item notification, ok = p.pendingNotifications.ReadOne() if !ok { // 没有read到任何item nextCh = nil // 禁用此选择案例 } case notificationToAdd, ok := <-p.addCh: if !ok { return } if notification == nil { // 没有要弹出的通知(且 pendingNotifications 为空) // 从addCh读取的item设置notification notification = notificationToAdd // 有可能nextCh之前设置为nil了,故这里从新设置为p.nextCh nextCh = p.nextCh } else { // 已经有通知等待发送 // 写入pendingNotifications p.pendingNotifications.WriteOne(notificationToAdd) } } } } // processorListener处理nextCh中item的启动方法 func (p *processorListener) run() { // 此调用会阻塞,直到通道关闭。当通知期间发生恐慌时,我们将捕获它:**将跳过违规项目!**,并在短暂延迟(一秒)后尝试下一个通知。 stopCh := make(chan struct{}) // 如果发生err,则延迟一秒重新执行 wait.Until(func() { // 遍历p.nextCh for next := range p.nextCh { // 根据不同的notification type,做不同handler switch notification := next.(type) { case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) case addNotification: p.handler.OnAdd(notification.newObj) case deleteNotification: p.handler.OnDelete(notification.oldObj) default: utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next)) } } // 唯一到达这里的是 p.nextCh 为空且已关闭 close(stopCh) }, 1*time.Second, stopCh) }
- 函数
// NewSharedInformer 为 listwatcher 创建一个新实例。 func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer { return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{}) } // NewSharedIndexInformer 为listwatcher创建一个新实例。 // 如果给定的 defaultEventHandlerResyncPeriod 为零,则创建的通知程序将不会重新同步。 // 否则:对于每个具有非零请求重新同步周期的处理程序,无论是在通知程序启动之前还是之后添加,标称重新同步周期是请求的重新同步周期四舍五入为通知程序重新同步检查周期的倍数。 func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { realClock := &clock.RealClock{} sharedIndexInformer := &sharedIndexInformer{ processor: &sharedProcessor{clock: realClock}, indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), listerWatcher: lw, objectType: exampleObject, resyncCheckPeriod: defaultEventHandlerResyncPeriod, defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)), clock: realClock, } return sharedIndexInformer } // WaitForNamedCacheSync 是 WaitForCacheSync 的包装器,它生成日志消息指示由名称标识的调用者正在等待同步,然后是成功或失败的同步。 func WaitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool { klog.Infof("Waiting for caches to sync for %s", controllerName) if !WaitForCacheSync(stopCh, cacheSyncs...) { utilruntime.HandleError(fmt.Errorf("unable to sync caches for %s", controllerName)) return false } klog.Infof("Caches are synced for %s ", controllerName) return true } // WaitForCacheSync 等待缓存填充。如果都成功则返回真 func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool { err := wait.PollImmediateUntil(syncedPollPeriod, func() (bool, error) { for _, syncFunc := range cacheSyncs { if !syncFunc() { return false, nil } } return true, nil }, stopCh) if err != nil { klog.V(2).Infof("stop requested") return false } klog.V(4).Infof("caches populated") return true } // 重新计算resyncPeriod func determineResyncPeriod(desired, check time.Duration) time.Duration { // 如果desired(表示最少需要的duration)为0,那么返回desired if desired == 0 { return desired } // 如果check(设定的一个固定判定值)为0,返回0 if check == 0 { klog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired) return 0 } // 如果desired小于check,则返回check if desired < check { klog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check) return check } return desired } // 常量minimumResyncPeriod 表示最少resync 周期 const minimumResyncPeriod = 1 * time.Second // 添加EventHandler、ResyncPeriod func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) { s.startedLock.Lock() defer s.startedLock.Unlock() // 如果s.stopped,sharedInformer已停止,则return if s.stopped { klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler) return } // 如果resyncPeriod大于0。需要重新计算ProcessListener的resyncPeriod参数 if resyncPeriod > 0 { // 判断参数resyncPeriod是否小于minimumResyncPeriod if resyncPeriod < minimumResyncPeriod { klog.Warningf("resyncPeriod %v is too small. Changing it to the minimum allowed value of %v", resyncPeriod, minimumResyncPeriod) // 覆盖resyncPeriod为minimumResyncPeriod resyncPeriod = minimumResyncPeriod } // 如果resyncPeriod小于s.resyncCheckPeriod if resyncPeriod < s.resyncCheckPeriod { // 如果s.started,sharedInformer已开始 if s.started { klog.Warningf("resyncPeriod %v is smaller than resyncCheckPeriod %v and the informer has already started. Changing it to %v", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod) // 覆盖resyncPeriod为s.resyncCheckPeriod resyncPeriod = s.resyncCheckPeriod } else { // 如果事件处理程序的 resyncPeriod 小于当前的 resyncCheckPeriod,则更新 resyncCheckPeriod为resyncPeriod 并相应地调整所有侦听器的重新同步周期 s.resyncCheckPeriod = resyncPeriod s.processor.resyncCheckPeriodChanged(resyncPeriod) } } } // 构造ProcessorListener listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize) // 如果s.started为false,sharedInformer为开始 if !s.started { // 增加listener到sharedProcessor s.processor.addListener(listener) return } // 为了安全加入,我们必须 // 1.停止发送添加/更新/删除通知 // 2. 对store做一次list操作 // 3. 将合成的“添加”事件发送到新的处理程序 // 4. unblock s.blockDeltas.Lock() defer s.blockDeltas.Unlock() s.processor.addListener(listener) for _, item := range s.indexer.List() { listener.add(addNotification{newObj: item}) } } // 构造ProcessListener func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener { ret := &processorListener{ nextCh: make(chan interface{}), addCh: make(chan interface{}), handler: handler, pendingNotifications: *buffer.NewRingGrowing(bufferSize), requestedResyncPeriod: requestedResyncPeriod, resyncPeriod: resyncPeriod, } ret.determineNextResync(now) return ret }
store.go
- 接口
// Store 是一个通用的对象存储和处理接口。 // 一个 Store 保存着一个从字符串键到累加器的映射, 并且具有向当前与给定键关联的累加器添加、更新和删除给定对象的操作。 // Store 也知道如何从给定的对象中提取键,因此很多操作都只给了对象。 // // Reflector 知道如何监视服务器并更新 Store。这个包提供了多种 Store 实现。 type Store interface { // Add 将给定对象添加到与给定对象的键关联的累加器 Add(obj interface{}) error // Update 更新与给定对象的键关联的累加器中的给定对象 Update(obj interface{}) error // Delete 从与给定对象的键关联的累加器中删除给定对象 Delete(obj interface{}) error // List 返回所有当前非空累加器的列表 List() []interface{} // ListKeys 返回当前与非空累加器关联的所有键的列表 ListKeys() []string // Get 返回与给定对象的键关联的累加器 Get(obj interface{}) (item interface{}, exists bool, err error) // GetByKey 返回与给定键关联的累加器 GetByKey(key string) (item interface{}, exists bool, err error) // 替换将删除store的items,然后使用给定的列表重新赋值items. // 注意:Store 拥有列表的所有权,您不应在调用此函数后引用它。 Replace([]interface{}, string) error // 重新同步在此处出现的术语中没有意义,但在某些具有重要附加行为的实现(例如,DeltaFIFO)中有意义。在cache中没有意义 Resync() error }
- 结构体
// `*cache` 根据 ThreadSafeStore 和关联的 KeyFunc 实现 Indexer(实现了store并有几个特定的方法)。 type cache struct { // cacheStorage 线程安全的缓存 cacheStorage ThreadSafeStore // keyFunc 用于为存储在项目中和从项目中检索的对象制作key,并且应该是确定性的。 keyFunc KeyFunc } // 这里值分析add,其他类似 // 添加将一个项目插入到缓存中。 func (c *cache) Add(obj interface{}) error { // 调用keyFunc生成obj对应在cacheStorage中的key key, err := c.keyFunc(obj) if err != nil { return KeyError{obj, err} } // 添加到cacheStorage中(key:key value:obj),详情见分析thread_safe_store.go c.cacheStorage.Add(key, obj) return nil }
- 函数
// MetaNamespaceKeyFunc 是一个的默认 KeyFunc,它知道如何为实现 meta.Interface 的 API 对象制作key。 // T密钥使用格式 <namespace>/<name>,除非 <namespace> 为空,否则它只是 <name>。 func MetaNamespaceKeyFunc(obj interface{}) (string, error) { // 这里的ExplicitKey是声明的string类型 if key, ok := obj.(ExplicitKey); ok { return string(key), nil } // 获取metadata数据 meta, err := meta.Accessor(obj) if err != nil { return "", fmt.Errorf("object has no meta: %v", err) } // 判断namespace是否为空 if len(meta.GetNamespace()) > 0 { // namespace/name return meta.GetNamespace() + "/" + meta.GetName(), nil } return meta.GetName(), nil }
thread_safe_store.go
- 接口
// ThreadSafeStore 是一个接口,它允许对存储后端进行并发索引访问。 // 它类似于 Indexer,但不知道如何从给定对象中提取 Store 键。 // // 警告:不得修改 Get 或 List 返回的任何内容,因为除了不是线程安全之外,它还会破坏索引功能。 // // List/Get 提供的线程安全保证只有在调用者将返回的项视为只读时才有效。 // 例如,通过`Add` 插入存储中的指针将通过`Get` 原样返回。 // 多个客户端可能会在同一个键上调用 `Get` 并以非线程安全的方式修改指针。 // 另请注意,修改索引器存储的对象(如果有)将*不会*自动导致重新索引。 // 所以一般来说直接修改Get/List返回的对象并不是正确的操作。 type ThreadSafeStore interface { Add(key string, obj interface{}) Update(key string, obj interface{}) Delete(key string) Get(key string) (item interface{}, exists bool) List() []interface{} ListKeys() []string Replace(map[string]interface{}, string) Index(indexName string, obj interface{}) ([]interface{}, error) IndexKeys(indexName, indexKey string) ([]string, error) ListIndexFuncValues(name string) []string ByIndex(indexName, indexKey string) ([]interface{}, error) GetIndexers() Indexers // AddIndexers 向此存储添加更多索引器。如果在存储中已有数据后调用此方法,则返回结果 undefined. AddIndexers(newIndexers Indexers) error // 重新同步是空操作,已弃用 Resync() error }
- 结构体
// threadSafeMap 实现了 ThreadSafeStore type threadSafeMap struct { lock sync.RWMutex // 用来存储obj items map[string]interface{} // 索引器将名称映射到 IndexFunc indexers Indexers // 索引将名称映射到Index indices Indices } // 这里只分析add,其他方法类似 // 添加obj到threadSafeMap func (c *threadSafeMap) Add(key string, obj interface{}) { c.lock.Lock() defer c.lock.Unlock() // 获取旧obj oldObject := c.items[key] // 覆盖key对应的obj c.items[key] = obj // 更新indices c.updateIndices(oldObject, obj, key) } // updateIndices 修改托管索引中的对象位置, 如果这是一个更新,你必须提供一个 oldObj updateIndices 必须从一个已经锁定缓存的函数中调用 func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) { // 如果我们有一个旧对象,我们需要在再次添加之前将其删除 if oldObj != nil { c.deleteFromIndices(oldObj, key) } // 遍历indexers(key为indices的key value是生成Index的key的方法) for name, indexFunc := range c.indexers { // 生成Index的key indexValues, err := indexFunc(newObj) if err != nil { panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) } // 获取name对应indices的value index := c.indices[name] if index == nil { index = Index{} c.indices[name] = index } // 遍历生成的indexValues for _, indexValue := range indexValues { // 获取index对应key的value set := index[indexValue] // 为空则创建 if set == nil { set = sets.String{} index[indexValue] = set } set.Insert(key) } } }
- 函数
// NewThreadSafeStore 创建一个新的 ThreadSafeStore 实例。 func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore { return &threadSafeMap{ items: map[string]interface{}{}, indexers: indexers, indices: indices, } }
undelta_store.go
- 结构体
// UndeltaStore 侦听增量更新并在每次更改时发送store中所有的items。也就是提供了一个方法来自定义操作 // 它实现了 Store 接口,以便它可以从 Reflector 接收镜像对象流。 // 每当它收到任何完整(Store.Replace)或增量更改(Store.Add、Store.Update、Store.Delete)时,它都会通过调用 PushFunc 发送完整状态。 // 它是线程安全的。它保证每次更改(添加、更新、替换、删除)都会导致对 PushFunc 的一次调用,但有时可能会使用相同的值调用两次 PushFunc。 // 例如可能会使用相同的值调用两次 PushFunc。 // time thread 1 thread 2 // 0 UndeltaStore.Add(a) // 1 UndeltaStore.Add(b) // 2 Store.Add(a) // 3 Store.Add(b) // 4 Store.List() -> [a,b] // 5 Store.List() -> [a,b] // PushFunc 应该是线程安全的。 type UndeltaStore struct { Store PushFunc func([]interface{}) }
- 函数
// NewUndeltaStore 返回一个用 Store 实现的 UndeltaStore。 func NewUndeltaStore(pushFunc func([]interface{}), keyFunc KeyFunc) *UndeltaStore { return &UndeltaStore{ Store: NewStore(keyFunc), PushFunc: pushFunc, } }
更多推荐
已为社区贡献15条内容
所有评论(0)