【k8s源码篇之Informer篇2】理解Informer内部的运行逻辑
但是一个资源可能会有「多个 Informer 实例」监听着,而且「每个 Informer 的处理逻辑都不是一致的(AddFunc、DeleteFunc、UpdateFunc)」,因此如何通知这些 Informer 处理呢?其实他们共享着 —— 【Refactor】【Indexer】【Local Store】 —— 为了减少 Apiserver 的访问压力,及节约存储(因为都是关注着一种资源)【资源
·
参考
- (三)Kubernetes 源码剖析之学习Informer机制
- 如何高效掌控K8s资源变化?K8s Informer实现机制浅析
- 25 | 深入解析声明式API(二):编写自定义控制器
- k8s client-go informer中的processorlistener数据消费,缓存的分析
架构
Informer 和 Controller
- 便于理解的架构图
- 这里 Indexer「索引」 和 Local Store 「缓存」 是分开表示的
- 在源码级别,基本上是一起实现的,一个结构体内涵盖
Informer 简要架构
- 源码级简要理解
Informer 详细架构
- 源码级详细理解
带着问题去思考
编写 informer 时的 AddEventHandler 如何作用?
- 可以看出这三个函数都与【资源的变化处理】有关
- 因此我们下一步将查看与【资源的变化处理】相关的函数
studentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueStudent, // 下面有这个函数解释,就是先将对象同步到缓存中,再放入 workqueue 队列
UpdateFunc: func(old, new interface{}) {
oldStudent := old.(*bolingcavalryv1.Student)
newStudent := new.(*bolingcavalryv1.Student)
if oldStudent.ResourceVersion == newStudent.ResourceVersion {
//版本一致,就表示没有实际更新的操作,立即返回
return
}
controller.enqueueStudent(new)
},
DeleteFunc: controller.enqueueStudentForDelete,
})
HandleDeltas
- 因为【DeltasFiFO】记录着【资源的变化】,通过查看源码,我们定位到此函数【HandleDeltas】
- 通过代码得知
- 首先根据事件类型,更新缓存和索引
- 之后将事件分发,进行处理,涉及到
s.processor.distribute
函数
// k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
// 资源的同步、添加、更新四件
case Sync, Added, Updated:
isSync := d.Type == Sync
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
// 更新缓存和索引
if err := s.indexer.Update(d.Object); err != nil {
return err
}
// 将事件分发,进行处理
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
// 下面有详细解析
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
}
// 资源的删除时间
case Deleted:
// 更新缓存和索引
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
// 将事件分发,进行处理
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
type SharedIndexInformer interface {
SharedInformer
// AddIndexers add indexers to the informer before it starts.
AddIndexers(indexers Indexers) error
GetIndexer() Indexer
}
type sharedIndexInformer struct {
// 索引和 缓存 store
indexer Indexer
controller Controller
// 处理函数,将是重点
processor *sharedProcessor
// 检测 cache 是否有变化,一把用作调试,默认是关闭的
cacheMutationDetector MutationDetector
// 构造 Reflector 需要
listerWatcher ListerWatcher
// 目标类型,给 Reflector 判断资源类型
objectType runtime.Object
// Reflector 进行重新同步周期
resyncCheckPeriod time.Duration
// 如果使用者没有添加 Resync 时间,则使用这个默认的重新同步周期
defaultEventHandlerResyncPeriod time.Duration
clock clock.Clock
// 两个 bool 表达了三个状态:controller 启动前、已启动、已停止
started, stopped bool
startedLock sync.Mutex
// 当 Pop 正在消费队列,此时新增的 listener 需要加锁,防止消费混乱
blockDeltas sync.Mutex
// Watch 返回 err 的回调函数
watchErrorHandler WatchErrorHandler
}
distribute 处理分发函数
-
首先考虑为什么要有
distribute
分发函数,而是为什么sharedProcessor
处理器叫”共享“?-
是因为
ShareInformer
,不知道你是否还记得每创建一个Informer
都是基于ShareInformerFactory
-
// 这个是 Students 的控制器运行的 main 函数 // 其中涉及共享 Informer 工厂函数 func main() { // 省略若干行代码 ... // 关注 Student 资源的 「共享 Informer 制造工厂studentInformerFactory」 studentInformerFactory := informers.NewSharedInformerFactory(studentClient, time.Second*30) // 得到controller // 利用 「共享 Informer 制造工厂studentInformerFactory」生产出或获取已有的 「Student 某版本的 共享Informer 实例」 // 共享 Informer,只是共享了 Reflector(ListAndWatch 对Apiserver 监控),以及缓存 Indexer 和索引 Local Store // 目的是:减少同类型多个 Informer controller := NewController(kubeClient, studentClient, studentInformerFactory.Bolingcavalry().V1().Students()) //启动informer go studentInformerFactory.Start(stopCh) } // NewController returns a new student controller func NewController( // 省略若干行代码 ... controller := &Controller{ kubeclientset: kubeclientset, studentclientset: studentclientset, studentsLister: studentInformer.Lister(), studentsSynced: studentInformer.Informer().HasSynced, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Students"), recorder: recorder, } // 基于共享 Informer ,添加自己的 Informer 处理逻辑 // 这部分可以理解为 每个 Controller 独立的 Informer 部分 studentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueStudent, UpdateFunc: func(old, new interface{}) { oldStudent := old.(*bolingcavalryv1.Student) newStudent := new.(*bolingcavalryv1.Student) if oldStudent.ResourceVersion == newStudent.ResourceVersion { //版本一致,就表示没有实际更新的操作,立即返回 return } controller.enqueueStudent(new) }, DeleteFunc: controller.enqueueStudentForDelete, }) return controller }
-
其实他们共享着 —— 【Refactor】【Indexer】【Local Store】 —— 为了减少 Apiserver 的访问压力,及节约存储(因为都是关注着一种资源)
-
但是一个资源可能会有「多个 Informer 实例」监听着,而且「每个 Informer 的处理逻辑都不是一致的(AddFunc、DeleteFunc、UpdateFunc)」,因此如何通知这些 Informer 处理呢?
- 答案很简单 —— []*processorListener 数组(也叫切片)
- 每一个 Informer 的处理逻辑 —— 封装为一个 processorListener
- 当有【资源变化 Delta】产生时,便会通过 range,通知到所有的【processorListener】,即【 Informer 的处理逻辑 (AddFunc、DeleteFunc、UpdateFunc)】
- 通过对下面源码的阅读便可理解
-
// k8s.io/client-go/tools/cache/shared_informer.go
type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
// 因为各 listeners 设置的 resyncPeriod 可能不一致
// 所以将没有设置(resyncPeriod = 0) 的归为 listeners 组,将设置了 resyncPeriod 的归到 syncingListeners 组;
listeners []*processorListener
// 如果某个 listener 在多个地方(sharedIndexInformer.resyncCheckPeriod,
// sharedIndexInformer.AddEventHandlerWithResyncPeriod)都设置了 resyncPeriod,则取最小值 minimumResyncPeriod;
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
}
// k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
// 遍历所属组全部 listeners,将数据投递到 processorListener 进行处理
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
processorListener —— Informer 的差异处
- 通过下面代码的查看,便可得知
handler ResourceEventHandler
是【 Informer 的处理逻辑 (AddFunc、DeleteFunc、UpdateFunc)】- 也就是每个 Informer 的差异之处
// k8s.io/client-go/tools/cache/shared_informer.go
type processorListener struct {
// nextCh:数据从此通道中读出并调用handler函数处理,非缓冲通道
nextCh chan interface{}
// addCh:FIFO中POP出的数据通过distribute函数放入此通道中,非缓冲通道
addCh chan interface{}
// 此处即为 Informer 的处理逻辑 (AddFunc、DeleteFunc、UpdateFunc)
handler ResourceEventHandler
// pendingNotifications:addCh中读出数据放入nextCh中,如果阻塞,则放入此缓冲区域(k8s自定义对象)
// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
// added until we OOM.
// TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
// we should try to do something better.
pendingNotifications buffer.RingGrowing
// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
requestedResyncPeriod time.Duration
// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
// value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
// informer's overall resync check period.
resyncPeriod time.Duration
// nextResync is the earliest time the listener should get a full resync
nextResync time.Time
// resyncLock guards access to resyncPeriod and nextResync
resyncLock sync.Mutex
}
// k8s.io/client-go/tools/cache/controller.go
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
// as few of the notification functions as you want while still implementing
// ResourceEventHandler.
type ResourceEventHandlerFuncs struct {
AddFunc func(obj interface{})
UpdateFunc func(oldObj, newObj interface{})
DeleteFunc func(obj interface{})
}
// k8s.io/utils/buffer/ring_growing.go
// RingGrowing is a growing ring buffer.
// Not thread safe.
type RingGrowing struct {
data []interface{}
n int // Size of Data
beg int // First available element
readable int // Number of data items available
}
processorListener 中通道和环形缓存的作用
-
考虑这样一个问题?【资源变化的事件 Delta】很多时,【processorListener】处理不过来怎么办?
- 接下来我们就分析【processorListener】的【事件获取】【事件缓存】和【事件处理机制】
-
pop 函数的作用:
- 【Delta】首先传给【addCh】通道
- 若【nextCh】通道可以接收,那么【nextCh】直接接收【Delta通知】
- 若【nextCh】不能接收,就将【Delta通知】放入到【环形缓存pendingNotifications中】,之后【nextCh】空闲,便会消费【环形缓存pendingNotifications中的Delta通知】
- 【Delta】首先传给【addCh】通道
-
run 函数的作用:
- 从【nextCh】通道读取【Delta通知】,之后调用预先注册的回调函数【如p.handler.OnAdd】进行处理
- 之后就是 【handler函数的自定义筛选逻辑】将需要的事件放入到【workqueue】,等待【自定义的Controller消费】——【Controller】部分就不属于 Informer 逻辑了
-
run 和 pop 以各自的 goroutine 在后台运行
// k8s.io/client-go/tools/cache/shared_informer.go
type processorListener struct {
// nextCh:数据从此通道中读出并调用handler函数处理,非缓冲通道
nextCh chan interface{}
// addCh:FIFO中POP出的数据通过distribute函数放入此通道中,非缓冲通道
addCh chan interface{}
...
// pendingNotifications:addCh中读出数据放入nextCh中,如果阻塞,则放入此缓冲区域(k8s自定义对象)
pendingNotifications buffer.RingGrowing
}
// k8s.io/client-go/tools/cache/shared_informer.go
// Deleta 传输函数
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
// k8s.io/client-go/tools/cache/shared_informer.go
// Deleta 处理函数
func (p *processorListener) run() {
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make(chan struct{})
wait.Until(func() {
// this gives us a few quick retries before a long pause and then a few more quick retries
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
for next := range p.nextCh {
switch notification := next.(type) {
// 调用 handler 函数,也就是预先注册的(AddFunc、UpdateFunc、DeleteFunc 函数),进行处理
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
return true, nil
})
// the only way to get here is if the p.nextCh is empty and closed
if err == nil {
close(stopCh)
}
}, 1*time.Minute, stopCh)
}
更多推荐
已为社区贡献41条内容
所有评论(0)