【k8s源码篇之Informer篇3】理解Informer中的Reflector组件
接着,启动 processLoop 不断从 DeltaFIFO Pop 进行消费。通过上面的 Reflector 分析可以知道,DeltaFIFO 的职责是通过队列加锁处理(queueActionLocked)、去重(dedupDeltas)、存储在由 DeltaFIFO 实现的本地缓存(local Store) 中,包括 queue(仅存 objKeys) 和 items(存 objKeys 和
参考
- (三)Kubernetes 源码剖析之学习Informer机制
- 如何高效掌控K8s资源变化?K8s Informer实现机制浅析
- 25 | 深入解析声明式API(二):编写自定义控制器
- k8s client-go informer中的processorlistener数据消费,缓存的分析
架构
Informer 和 Controller
- 便于理解的架构图
- 这里 Indexer「索引」 和 Local Store 「缓存」 是分开表示的
- 在源码级别,基本上是一起实现的,一个结构体内涵盖
Informer 简要架构
- 源码级简要理解
Informer 详细架构
- 源码级详细理解
Reflector
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
type sharedIndexInformer struct {
// 索引和 缓存 store
indexer Indexer
// Informer 内部的 controller,不是我们自定义的 Controller
controller Controller
// 处理函数,将是重点
processor *sharedProcessor
// 检测 cache 是否有变化,一把用作调试,默认是关闭的
cacheMutationDetector MutationDetector
// 构造 Reflector 需要
listerWatcher ListerWatcher
// 目标类型,给 Reflector 判断资源类型
objectType runtime.Object
// Reflector 进行重新同步周期
resyncCheckPeriod time.Duration
// 如果使用者没有添加 Resync 时间,则使用这个默认的重新同步周期
defaultEventHandlerResyncPeriod time.Duration
clock clock.Clock
// 两个 bool 表达了三个状态:controller 启动前、已启动、已停止
started, stopped bool
startedLock sync.Mutex
// 当 Pop 正在消费队列,此时新增的 listener 需要加锁,防止消费混乱
blockDeltas sync.Mutex
// Watch 返回 err 的回调函数
watchErrorHandler WatchErrorHandler
}
内部总管家 controller
-
Controller 作为核心中枢,集成了组件 Reflector、DeltaFIFO
-
DeltaFIFO 的消费
HandleDeltas
成为连接下游消费者的桥梁- 用于更新索引Indexer和缓存 Loacl Store
- 用于 AddEventHandler 注册的回调函数(AddFunc、UpdateFunc、DeleteFunc)的过滤及处理,然后放入 workqueue,供用户自定义的 Controller (这里指的是用户的业务逻辑)消费使用
-
Controller 由 controller 结构体进行具体实现 —— 这里的 controller 指的是 Informer 内部的控制器:
- 在 K8s 中约定俗成:大写定义的 interface 接口,由对应小写定义的结构体进行实现。
结构体定义如下:
// k8s.io/client-go/tools/cache/controller.go
// 接口定义又哪些行为
// Controller is a generic controller framework.
type Controller interface {
Run(stopCh <-chan struct{})
HasSynced() bool
LastSyncResourceVersion() string
}
// controller 的具体实现
// Controller is a generic controller framework.
type controller struct {
config Config // 包含着 ListAndWatch 函数,DeltaFIFO
reflector *Reflector // Reflector , 用于 ListAndWatch
reflectorMutex sync.RWMutex
clock clock.Clock
}
// Run 的时候,会创建 Reflector
// Run begins processing items, and will continue until a value is sent down stopCh.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
// Reflector 的构建依赖于 Config
r := NewReflector(
c.config.ListerWatcher, // ListAndWatch 函数
c.config.ObjectType,
c.config.Queue, // Delta FIFO
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
c.reflectorMutex.Lock()
c.reflector = r // Reflector
c.reflectorMutex.Unlock()
var wg wait.Group
defer wg.Wait()
wg.StartWithChannel(stopCh, r.Run)
// processLoop 就是 HandleDeltas 函数
// 1. 用于更新索引Indexer和缓存 Loacl Store
// 2. 用于 AddEventHandler 注册的回调函数(AddFunc、UpdateFunc、DeleteFunc)的过滤及处理,然后放入 workqueue,供用户自定义的 Controller (这里指的是用户的业务逻辑)消费使用
wait.Until(c.processLoop, time.Second, stopCh)
}
// 启动 processLoop 不断从 DeltaFIFO Pop 进行消费
// c.config.Process 就是 HandleDeltas 函数,在 config 初始化可以看到
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
Controller 中以 goroutine 协程方式启动 Run 方法,会启动 Reflector 的 ListAndWatch(),用于从 apiserver 拉取全量和监听增量资源,存储到 DeltaFIFO。接着,启动 processLoop 不断从 DeltaFIFO Pop 进行消费。在 sharedIndexInformer 中 Pop 出来进行处理的函数是 HandleDeltas,一方面维护 Indexer 的 Add/Update/Delete,另一方面调用下游 sharedProcessor 进行 handler 处理。
连接下游的 HandleDeltas
DeltaFIFO 的消费 HandleDeltas
成为连接下游消费者的桥梁
- 用于更新索引Indexer和缓存 Loacl Store
- 用于 AddEventHandler 注册的回调函数(AddFunc、UpdateFunc、DeleteFunc)的过滤及处理,然后放入 workqueue,供用户自定义的 Controller (这里指的是用户的业务逻辑)消费使用
// k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
// 资源的同步、添加、更新实践
case Sync, Added, Updated:
isSync := d.Type == Sync
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
// 重点!!!更新缓存和索引
if err := s.indexer.Update(d.Object); err != nil {
return err
}
// 将事件分发,进行处理
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
// 重点!!!更新缓存和索引
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
}
// 资源的删除时间
case Deleted:
// 重点!!!更新缓存和索引
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
// 重点!!! 通知事件的到来,给订阅的 Informer 发送消息通知
// 相应 Informer 的 AddEventHandler 注册的回调函数(AddFunc、UpdateFunc、DeleteFunc)的过滤及处理
// 然后放入 workqueue,供用户自定义的 Controller (这里指的是用户的业务逻辑)消费使用
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
controller 的配置管理 Config
- 此配置项基本上涵盖了 controller 创建所必须得
- Queue —— DeltaFIFO
- ListerWatcher —— Reflector 的 ListAndWatch
- Process —— 连接下游的 HandleDeltas(事件处理handler,同步缓存和索引)
// staging/src/k8s.io/client-go/tools/cache/controller.go
type Config struct {
// 实际由 DeltaFIFO 实现
Queue
// 构造 Reflector 需要
ListerWatcher
// Pop 出来的 obj 处理函数 连接下游的 HandleDeltas 函数
Process ProcessFunc
// 目标对象类型
ObjectType runtime.Object
// 全量重新同步周期
FullResyncPeriod time.Duration
// 是否进行重新同步的判断函数
ShouldResync ShouldResyncFunc
// 如果为 true,Process() 函数返回 err,则再次入队 re-queue
RetryOnError bool
// Watch 返回 err 的回调函数
WatchErrorHandler WatchErrorHandler
// Watch 分页大小
WatchListPageSize int64
}
controller 管理的 Reflector
Reflector 的主要职责是从 apiserver 拉取并持续监听(ListAndWatch) 相关资源类型的增删改(Add/Update/Delete)事件,存储在由 DeltaFIFO 实现的本地缓存(local Store) 中。
首先看一下 Reflector 结构体定义:
// staging/src/k8s.io/client-go/tools/cache/reflector.go
type Reflector struct {
// 通过 file:line 唯一标识的 name
name string
// 下面三个为了确认类型
expectedTypeName string
expectedType reflect.Type
expectedGVK *schema.GroupVersionKind
// 存储 interface: 具体由 DeltaFIFO 实现存储
store Store
// 用来从 apiserver 拉取全量和增量资源
listerWatcher ListerWatcher
// 下面两个用来做失败重试
backoffManager wait.BackoffManager
initConnBackoffManager wait.BackoffManager
// informer 使用者重新同步的周期
resyncPeriod time.Duration
// 判断是否满足可以重新同步的条件
ShouldResync func() bool
clock clock.Clock
// 是否要进行分页 List
paginatedResult bool
// 最后同步的资源版本号,以此为依据,watch 只会监听大于此值的资源
lastSyncResourceVersion string
// 最后同步的资源版本号是否可用
isLastSyncResourceVersionUnavailable bool
// 加把锁控制版本号
lastSyncResourceVersionMutex sync.RWMutex
// 每页大小
WatchListPageSize int64
// watch 失败回调 handler
watchErrorHandler WatchErrorHandler
}
从结构体定义可以看到,通过指定目标资源类型进行 ListAndWatch,并可进行分页相关设置。第一次拉取全量资源(目标资源类型) 后通过 syncWith 函数全量替换(Replace) 到 DeltaFIFO queue/items 中,之后通过持续监听 Watch(目标资源类型) 增量事件,并去重更新到 DeltaFIFO queue/items 中,等待被消费。
watch 目标类型通过 Go reflect 反射实现如下:
// staging/src/k8s.io/client-go/tools/cache/reflector.go
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
...
if r.expectedType != nil {
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
continue
}
}
if r.expectedGVK != nil {
if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
continue
}
}
...
}
- 通过反射确认目标资源类型,所以命名为 Reflector 还是比较贴切的;
- List/Watch 的目标资源类型在NewSharedIndexInformer.ListerWatcher 进行了确定,但 Watch 还会在 watchHandler 中再次比较一下目标类型;
controller 管理的 DeltaFIFO
// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
// 读写锁、条件变量
lock sync.RWMutex
cond sync.Cond
// kv 存储:objKey1->Deltas[obj1-Added, obj1-Updated...]
items map[string]Deltas
// 只存储所有 objKeys
queue []string
// 是否已经填充:通过 Replace() 接口将第一批对象放入队列,或者第一次调用增、删、改接口时标记为true
populated bool
// 通过 Replace() 接口将第一批对象放入队列的数量
initialPopulationCount int
// keyFunc 用来从某个 obj 中获取其对应的 objKey
keyFunc KeyFunc
// 已知对象,其实就是 Indexer
knownObjects KeyListerGetter
// 队列是否已经关闭
closed bool
// 以 Replaced 类型发送(为了兼容老版本的 Sync)
emitDeltaTypeReplaced bool
}
DeltaType 可分为以下类型:
// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaType string
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
Replaced DeltaType = "Replaced" // 第一次或重新同步
Sync DeltaType = "Sync" // 老版本重新同步叫 Sync
)
通过上面的 Reflector 分析可以知道,DeltaFIFO 的职责是通过队列加锁处理(queueActionLocked)、去重(dedupDeltas)、存储在由 DeltaFIFO 实现的本地缓存(local Store) 中,包括 queue(仅存 objKeys) 和 items(存 objKeys 和对应的 Deltas 增量变化),并通过 Pop 不断消费,通过 Process(item) 处理相关逻辑。
Reflector 的 ListAndWatch
// 接口定义
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
Lister // 该接口 定义了 List 方法
Watcher // 该接口 定义了 Watch 方法
}
// Lister is any object that knows how to perform an initial list.
type Lister interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options metav1.ListOptions) (runtime.Object, error)
}
// Watcher is any object that knows how to start a watch on a resource.
type Watcher interface {
// Watch should begin a watch at the specified version.
Watch(options metav1.ListOptions) (watch.Interface, error)
}
// 接口的实现
// 接口的作用体 —— ListWatch struct
// ListFunc knows how to list resources
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)
// WatchFunc knows how to watch resources
type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
// ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface.
// It is a convenience function for users of NewReflector, etc.
// ListFunc and WatchFunc must not be nil
type ListWatch struct {
ListFunc ListFunc
WatchFunc WatchFunc
// DisableChunking requests no chunking for this list watcher.
DisableChunking bool
}
// 作用体 ListWatch struct 的构建
// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
optionsModifier := func(options *metav1.ListOptions) {
options.FieldSelector = fieldSelector.String()
}
return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
}
// NewFilteredListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, and option modifier.
// Option modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function
// to apply modification to ListOptions with a field selector, a label selector, or any other desired options.
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Do().
Get()
}
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
options.Watch = true
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Watch()
}
return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}
// 方法的实现
// List a set of apiserver resources
func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
if !lw.DisableChunking {
return pager.New(pager.SimplePageFunc(lw.ListFunc)).List(context.TODO(), options)
}
return lw.ListFunc(options)
}
// Watch a set of apiserver resources
func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
return lw.WatchFunc(options)
}
sharedIndexInformer 的 Run 函数
- 初始化 Config 包含(DeltaFIFO队列、ListAndWatch函数、HandleDeltas函数)
- 利用 Config 创建 controller
- controller 执行 Run 函数时,会利用 Config 创建 Reflector
// k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// DeltaFIFO
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
// 注意此处的 Config
cfg := &Config{
Queue: fifo, // DeltaFIFO
ListerWatcher: s.listerWatcher, // ListAndWatch 函数
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
// 重点!!!
// 处理 Deltas 的函数,也就是 handler(调用注册的 AddFunc、UpdateFunc、DeleteFunc)
// 同时负责同步 索引Indexer 和 缓存 Local Store
Process: s.HandleDeltas,
}
// 使用 Config 创建 共享Informer 内部的 controller
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 创建 共享Informer 内部的 controller
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
// Run 的时候会创建 Reflector
s.controller.Run(stopCh)
}
// k8s.io/client-go/tools/cache/controller.go
// 接口定义又哪些行为
// Controller is a generic controller framework.
type Controller interface {
Run(stopCh <-chan struct{})
HasSynced() bool
LastSyncResourceVersion() string
}
// controller 的具体实现
// Controller is a generic controller framework.
type controller struct {
config Config // 包含着 ListAndWatch 函数,DeltaFIFO
reflector *Reflector // Reflector , 用于 ListAndWatch
reflectorMutex sync.RWMutex
clock clock.Clock
}
// Run 的时候会创建 Reflector
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
// Reflector 的构建依赖于 Config
r := NewReflector(
c.config.ListerWatcher, // ListAndWatch 函数
c.config.ObjectType,
c.config.Queue, // Delta FIFO
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
c.reflectorMutex.Lock()
c.reflector = r // Reflector
c.reflectorMutex.Unlock()
var wg wait.Group
defer wg.Wait()
wg.StartWithChannel(stopCh, r.Run)
// processLoop 就是 HandleDeltas 函数
// 1. 用于更新索引Indexer和缓存 Loacl Store
// 2. 用于 AddEventHandler 注册的回调函数(AddFunc、UpdateFunc、DeleteFunc)的过滤及处理,然后放入 workqueue,供用户自定义的 Controller (这里指的是用户的业务逻辑)消费使用
wait.Until(c.processLoop, time.Second, stopCh)
}
// 启动 processLoop 不断从 DeltaFIFO Pop 进行消费
// c.config.Process 就是 HandleDeltas 函数,在 config 初始化可以看到
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
更多推荐
所有评论(0)