(二)informer源码阅读
k8s版本基于v1.23本文分析了informer的相关源码,当读完本文读者就应该可以了解到一个要实现一个自定义controller,应该做一些什么事情。文章内容如下:1:informer创建流程2:informer.AddEventHandler添加处理函数3:informer启动流程 !!!4:informer.Lister5:等待informer同步完成6:自定义controller笔记
·
总结:
k8s版本基于v1.23
本文分析了informer的相关源码,当读完本文读者就应该可以了解到一个要实现一个自定义controller,应该做一些什么事情。
文章内容如下:
1:informer创建流程
2:informer.AddEventHandler添加处理函数
3:informer启动流程 !!!
4:informer.Lister
5:等待informer同步完成
6:自定义controller笔记
1:informer创建流程
//1: 创建Informer。informers.NewSharedInformerFactory.Core().V1().Events().Informer()
/*
本质上就是构造一个Core对象,然后把InformerFactory对象传递给他,同理,构造一个V1对象,并把core对象的factory对象传递给v1,
同理构造EventInformer对象,然后把V1的factory对象传递给Events对象,注意,到目前为止都没有创建Informer,也就是Factory内部的Informers为0,
只有调用Informer()函数,而Informer函数内部调用InformerFor的时候才会创建Informer
下面以eventInformer为例,如果是podInformer,则直接换成v1.podInformer就行了,这一块k8s所有informer的源代码都一样的流程
*/
informers.NewSharedInformerFactory
......
v1.eventInformer.Informer //每个Informer派生类都实现了informer方法,此方法是把类型以及对应的默认构造函数注册到传进来的informerfactory内部的map中,
//即map[informerType]informerObj,如果key=informerType已存在,则直接返回已存在的informer,如果不存在,则创建并注册
v1.eventInformer.defaultInformer//前面注册的eventInformer类的默认构造函数
v1.NewFilteredEventInformer //真正的构造函数,每次调用都会传递一个全新的indexers给通过NewFilteredEventInformer函数创建出来的informer
//我们前面在v1.eventInformer.defaultInformer函数中会把此型注册到factory的map中,这样factory内部就有了这个类型的informer
//所以下次我们再次调用SharedInformerFactory.Core().V1().Events().Informer()时就会返回已有的informer,
//所有通过此工厂对象返回的同一类型的informer,实际都是同一个informer,
//所以这也就是sharedInformerFactory中shared的由来,即sharedInfromerFactory返回的是一个sharedIndexerInformer对象
//一个informer有一个indexers,这是sharedIndexerInformer中Indexer的由来
//注意,每次调用NewSharedInformerFactory都是创建一个不同的factory,每个factory都有自己的map,所以不同的factory返回的肯定是不同的informer
indexers:=Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc} //创建indexers,即map[key]indexFunc,其他文章详解cache.cache
cache.NewSharedIndexInformer //注意,虽然有许多Informer派生类,但是因为他们底层都是用cache.NewSharedIndexInformer,所以返回的informer全部都是SharedIndexInformer
store.NewIndexer //根据indexers创建Indexer对象,Indexer具体为cache.cache
cache.NewCacheMutationDetector //创建一个mutationDetector,可以用来检测缓存是否已经被修改,但是无法检测是谁修改
2:informer.AddEventHandler添加处理函数
//2:添加处理函数。informers.NewSharedInformerFactory.Core().V1().Events().Informer().AddEventHandle
cache.sharedIndexInformer.AddEventHandler
cache.sharedIndexInformer.AddEventHandlerWithResyncPerio
cache.newProcessListener //创建一个listener对象,整个listener对象负责把process发来的队列数据发给通过AddEventHandler函数注册的处理函数(AddFunc/UpdateFunc/DeleteFunc)
cache.sharedProcessor.addListener //把listener对象添加到sharedIndexInformer对象内部的sharedProcess对象中,这个sharedProcess就是专门用来启动listerner的
go cache.processorListener.run //下文详解
go cache.processorListener.pop //下文详解
3:informer启动流程 !!!
//3: 启动Informer。informers.NewSharedInformerFactory().Start()
cache.SharedInformerFactory.Start
go cache.sharedIndexInformer.Run //循环调用每个sharedIndexerInformer对象的run方法,记住,所有sharedInfromerFactory创建的对象都是sharedIndexerInformer
//总结就是三件事:
//1:创建deletafifo队列;2:启动listener;3:启动controller
cache.NewDeltaFIFOWithOptions //创建deltafifo队列,并把队列和sharedInformer内部的indexer联系起来,队列的默认key函数为cache.MetaNamespaceKeyFunc
//!!!!queue中存放的元素是对象的引用,因为go是自动回收,所以只需创建一次,然后到处引用就行
//!!!!对象引用后面也会存放在indexer里面,queue之所以需要indexer,是因为queue的删除操作要参考indexer,即queue不会写indexer对象
cache.New //创建一个controller,controller用来处理队列里的元素,实际处理函数为cache.sharedIndexInformer.HandleDeltas
cache.sharedProcessor.run //开启所有监听器
go cache.processorListener.run //另开一个线程去运行listener,注意,这是listener,不是lister,别看错了,默认是每秒运行一次,每次运行处理所有chan里面的对象
//根据通知类型然后调用对应的处理函数,这个处理函数就是我们通过Informer().AddEventHandler函数添加的处理函数
//cache.ResourceEventHandlerFuncs对象实现了ResourceEventHandler接口
//run函数负责监听nextChan,nextChan中的元素就是发生的事件
cache.ResourceEventHandlerFuncs.OnUpdate
cache.ResourceEventHandlerFuncs.UpdateFunc
cache.ResourceEventHandlerFuncs.OnAdd
cache.ResourceEventHandlerFuncs.AddFunc
cache.ResourceEventHandlerFuncs.OnDelete
cache.ResourceEventHandlerFuncs.AddFunc
go cache.processorListener.pop //pop函数负责监听addChan,然后再把这个chan传递给run函数监听的nextChan函数
//pop使用了一个环形缓冲区notificationChan来缓冲和中转,即addChan->notificationChan->nextChan
go cache.controller.Run //总结:1:创建并运行Reflector获取数据;2:运行controller.processLoop转发数据到addChan,然后listener的pop方法会把数据转发给run方法
cache.NewReflector //创建reflector,并把cache.NewSharedIndexInformer函数中创建的listerwatcher传递给这个reflector
//reflector就是用来从apiserver获取事件,并保存到指定的地方,就是cache.sharedIndexInformer.Run函数中创建的deltafifo_queue
//在处理queue中事件的时候会把数据写到indexer中,所以最终我们就能在informer中通过indexer来获取到reflector从apiserver获取的对象了
//queue实现了cache.Queue接口,而cache.Queue接口包含了cache.store接口,所以queue可以用作store
go cache.Reflector.Run //开一个线程去运行refector,reflector会把数据保存到指定的queue,
cache.Reflector.ListAndWatch //两步,第一步:list,第二步:watch,list和watch操作都会把事件保存到store中,这个store对象就是前面传进来的queue
//因为queue实现了cache.store接口
pager.New //创建一个分页器,list是分页操作
pager.List //执行list操作,循环读取,直到读完所有的数据,会把读取的数据放到一个list里面返回
cache.ListWatch.list //然后这个list函数内部就是调用我们在v1.NewFilteredEventInformer函数中调用cache.NewSharedIndexInformer函数中创建的listwatch,
//就是clientset的list方法,注意因为是不同的v1.NewFilteredXXXXInformer,所以创建的SharedIndexInformer不同
//所以调用cache.NewSharedIndexInformer时传递的listfunc和watchfun不同,即用的是clientset的不同资源的list方法
//即v1.NewFilteredEventInformer就是client.CoreV1().Events(namespace).List(context.TODO(), options)
//即v1.NewFilteredPodInformer就是client.CoreV1().Pods(namespace).List(context.TODO(), options)
meta.EachListItem //把list获取的所有对象都放到一个list里面
meta.ExtractList //从list中提取所有对象,list中读取的原始对象,这里就是通过反射获取原始对象的类型(runtime.Object or runtime.Unknow)
//其中runtime.unknow表示该类型我们的scheme.codec解析失败的对象
cache.Reflector.syncWith
cache.DeltaQueue.Replace //把list操作获取的所有数据都保存到内部的deltaqueue中,即我们前面创建的deltaqueue队列,
//注意:对于queue中的indexer,queue只读不写,queue中的indexer由cache.sharedIndexInformer.HandleDeltas方法写入对象
cache.ListWatch.Watch //watch操作,对于eventInformer就是client.CoreV1().Events(namespace).Watch(context.TODO(), options)
cache.Reflector.watchHandler //watch机制是另开了一个线程,通过chan通信,watchhandler就是监听这个chan,然后根据事件类型调用内部store对象的对应方法
cache.DeltaQueue.Update
cache.DeltaQueue.Add
cache.DeltaQueue.Delete
cache.controller.processLoop //从queue读取事件,写入indexer,然后分发给listener pop所监听的addChan,然后pop函数再转发给run监听的nextChan
queue.DeltaFIFO.Pop //一个死循环,不断调用队列的pop函数来处理队列里的元素,如果没有就会阻塞
cache.sharedIndexInformer.HandleDeltas //处理一个元素,干三件事:1:记录更改的对象;2:更新indexer;2:分发事件,即把事件写入addChan
cache.defaultCacheMutationDetector.AddObj //记录被修改的对象到mutationDetector对象内部
case:
sharedIndexerInformer.Indexer.Update //更新Indexer,即把对象的引用保存到indexer对象,indexer只是一个接口,indexer的实际类型为cache.cache
//!!!!indexer才是对象的最终存储的地方,informer里面的queue相当于一个数据或事件的中转站,即缓存apiserver过来的数据
//reflector通过listwatch把对象放到informer的queue里面,然后cache.controller.processLoop不断从queue读取数据/事件
//获取到数据后再把数据写入indexer中
cache.sharedProcessor.distribute //分发事件
cache.processorListener.add
cache.processorListener.addCh <- notification //实际的分发操作就是把结果写入chan就行了
case:
sharedIndexerInformer.Indexer.Add
cache.sharedProcessor.distribute
case:
sharedIndexerInformer.Indexer.Delete
cache.sharedProcessor.distribute
4:informer.Lister
//4:informer.Lister
//4.1:创建informer.Lister。informers.NewSharedInformerFactory.Core().V1().Events().Lister
cache.eventInformer.Lister
cache.eventInformer.Informer //这个Informer函数即上面分析的informers.NewSharedInformerFactory.Core().V1().Events().Informer()函数,他用于获取和创建informer
//所以lister的逻辑就是先从factory中获取event资源对应的informer,如果存在,则返回已存在的,如果不存在,则创建
cache.sharedIndexInformer.GetIndexer //获取informer内部的indexer对象,因为go是自动回收,所以对象创建一次然后到处引用就行,所以indexer中保存了原始对象的引用
//indexer本质上也是一个store对象,他是一个提供了索引功能的store对象,所以indexer可用来存储数据,而前面说的queue是一个先进先出的store对象
v1.NewEventLister //创建一个v1.eventLister对象,这个函数会把informer中的indexer对象的引用存放到新创建的eventLister对象中
//4.2:通过Lister从informer中获取对象。informers.NewSharedInformerFactory.Core().V1().Events().Lister().List()
v1.eventLister.List //返回indexer中的所有数据
cache.ListAll //把indexer中所有的对象都添加到一个list中
cache.List() //返回indexer中的所有对象
cache.threadSafeMap.List //就是遍历内部的map,然后返回所有的value
//4.3:通过lister从informer中检索对象。informers.NewSharedInformerFactory.Core().V1().Events().Lister().Events().Get()
//informers.NewSharedInformerFactory.Core().V1().Events().Lister().Events()会返回一个eventNamespaceLister对象,该对象可以检索指定命名空间下的所有对象
v1.eventNamespaceLister.List
cache.ListAllByNamespace //就是从indexer中检索指定命名空间下由get参数指定的对象,
//!!!informer.Lister是直接读indexer,如果本地缓存没有就直接报错了
v1.eventNamepsaceLister.Get
cache.cache.GetByKey //从indexer中查找,informer.Lister是直接读indexer,如果本地缓存没有就直接报错了
暂停一下,我们捋一下存储结构:
informer中含有一个indexer,一个indexer内部是一个cache.cache,cache.cache内部是一个cache.threadSafeMap,threadSafeMap内部是一个带锁的map
我们可以通过informer.Lister()来创建一个可以检索informer内部的indexer,至此,结束
5:等待informer同步完成
//5:等待同步(略)。cache.WaitForCacheSync
//这个简单,就是不断地调用传进来的检查函数,如果检查函数返回true,说明该infromer应该同步成功了,如果所有的检查函数都返回true,就说明所有的informer都同步成功了
6:自定义controller笔记
//6:自定义controller
/*
这个就简单了,前面我们已经分析了informer的整个流程,可以知道informer的reflector会自动从apiserver拉取数据,因为处理能力有限,所以informer的reflector就把数据或者事件缓存到事件队列queue
然后从queue事件队列取元素,然后把数据保存到indexer,然后对每一个数据调用一次对应的处理函数,所以如果我们要实现一个自定义的controller,那么思路就很清晰了,我们一共需要两个东西
第一个是一个事件队列或者叫数据队列,即queue,用来存储发生了什么事件或者来了什么数据,然后是一个indexer,即数据存储的地方,我们不断从事件队列取出事件,这个事件可以是某个对象的key
然后去indexer中通过这个key获取数据,indexer是informer内部的,但是我们可以通过informer.Lister方法访问到,所以我们只需要把informer传递给我们自定义的controller就行了,这样就由informer
内部的indexer去自动存储数据,就不需要自己手动取存储这些对象了,我们直接从indexer检索就行了。另一方面,informer内部reflector用到的事件队列是informer内部的,我们外部是无法感知的,
但是informer内部对于每个事件都会调用一次informer.AddEventHandler中的函数,所以我们就需要利用这个机会,把事件转存到我们自己的事件队列所以我们要自定义一个queue,
然后在informer.AddEventHandler中的AddFunc/UpdateFunc/DeleteFunc中把事件转存到我们自己的事件队列,然后开一个死循环去读取我们自己的事件队列就行了。
实现一个自定义controller的逻辑如下:
1:创建一个并发安全的事件队列,即queue。之后我们要把他同时传递给controller和informer的AddEventHandler。
这个queue我们存放的元素有两种选择:
1:直接存放原始对象,这样我们在controller里取出的对象就直接是完整对象了,就可以不用再去informer中查找了,即indexers就可以不需要了,即可以不把informer传递给我们的controller
2:放原始对象的key,然后在controller里再根据这个key去查找原始对象。这种方式要求我们在创建controller的时候要同时把queue和informer传递给controller,因为数据是存放在informer里面的
2:在informer.AddEventHandler中添加Add/Update/Delete函数,该处理函数把informer框架传进来的对象或者对象的key转存到我们自己的queue中,这样就把informer和这个queue联系起来了
3:把queue传递给我们创建的controller,这样我们的controller就能异步获取到所有数据了,接下来的处理就很简单了:开n个工作者线程,不断去轮询我们自己的queue,如果有元素就处理,没有就阻塞
笔记1:informer中对象默认的key函数是cache.MetaNamespaceKeyFunc即key=namespace/objname
笔记2:我们这里informer是通过sharedInformerFactory来创建informer对象,也就是说indexer是factory帮我们创建并传递给informer的,如果我们不用factory,而是直接使用
cache.NewSharedIndexInformer,那么我们从函数参数可以发现我们需要手动创建indexer,手动创建ListerWatch
*/
附录:几个函数的源代码
//factory的InformerFor函数:如果存在则直接返回,如果没有则创建一个全新的,并保存到map
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerType := reflect.TypeOf(obj)
informer, exists := f.informers[informerType]
if exists {
return informer
}
resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}
informer = newFunc(f.client, resyncPeriod) //这个就是对应的v1.eventInformer.defaultInformer函数
f.informers[informerType] = informer
return informer
}
//实际构造eventInformer构造函数详解
v1.NewFilteredEventInformer
return cache.NewSharedIndexInformer( //这是通用的Informer构造函数,就是填充参数并简单地返回一个SharedIndexInformer对象
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { //list函数
return client.CoreV1().Events(namespace).List(context.TODO(), options) //本质就是client-go去询问ApiServer
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { //watch函数
return client.CoreV1().Events(namespace).Watch(context.TODO(), options) //本质就是client-go去询问ApiServer
},
},
&corev1.Event{}, //这是此informer监听的对象,会通过反射把查询结果转换为此对象
resyncPeriod,
indexers, //数据存放位置
)
//构造实际的存储对象
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}), //实际是cache.threadSafeMap实际就是一个带锁的结构体,内部包含了一个用来存储的indexer
keyFunc: keyFunc, //这个keyFunc默认为cache.DeletionHandlingMetaNamespaceKeyFunc
}
}
//key函数
func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
if d, ok := obj.(DeletedFinalStateUnknown); ok {
return d.Key, nil
}
return MetaNamespaceKeyFunc(obj)
}
更多推荐
已为社区贡献6条内容
所有评论(0)