总结:

k8s版本基于v1.23
本文分析了informer的相关源码,当读完本文读者就应该可以了解到一个要实现一个自定义controller,应该做一些什么事情。
文章内容如下:
	1:informer创建流程
	2:informer.AddEventHandler添加处理函数
	3:informer启动流程 !!!
	4:informer.Lister
	5:等待informer同步完成
	6:自定义controller笔记

1:informer创建流程

//1: 创建Informer。informers.NewSharedInformerFactory.Core().V1().Events().Informer()
/*
	本质上就是构造一个Core对象,然后把InformerFactory对象传递给他,同理,构造一个V1对象,并把core对象的factory对象传递给v1,
	同理构造EventInformer对象,然后把V1的factory对象传递给Events对象,注意,到目前为止都没有创建Informer,也就是Factory内部的Informers为0,
	只有调用Informer()函数,而Informer函数内部调用InformerFor的时候才会创建Informer
	下面以eventInformer为例,如果是podInformer,则直接换成v1.podInformer就行了,这一块k8s所有informer的源代码都一样的流程
*/
informers.NewSharedInformerFactory
  ......
	v1.eventInformer.Informer //每个Informer派生类都实现了informer方法,此方法是把类型以及对应的默认构造函数注册到传进来的informerfactory内部的map中,
                              //即map[informerType]informerObj,如果key=informerType已存在,则直接返回已存在的informer,如果不存在,则创建并注册
      v1.eventInformer.defaultInformer//前面注册的eventInformer类的默认构造函数
        v1.NewFilteredEventInformer   //真正的构造函数,每次调用都会传递一个全新的indexers给通过NewFilteredEventInformer函数创建出来的informer
								      //我们前面在v1.eventInformer.defaultInformer函数中会把此型注册到factory的map中,这样factory内部就有了这个类型的informer
									  //所以下次我们再次调用SharedInformerFactory.Core().V1().Events().Informer()时就会返回已有的informer,
							          //所有通过此工厂对象返回的同一类型的informer,实际都是同一个informer,
                                      //所以这也就是sharedInformerFactory中shared的由来,即sharedInfromerFactory返回的是一个sharedIndexerInformer对象
                                      //一个informer有一个indexers,这是sharedIndexerInformer中Indexer的由来
									  //注意,每次调用NewSharedInformerFactory都是创建一个不同的factory,每个factory都有自己的map,所以不同的factory返回的肯定是不同的informer
		  indexers:=Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc} //创建indexers,即map[key]indexFunc,其他文章详解cache.cache
          cache.NewSharedIndexInformer     //注意,虽然有许多Informer派生类,但是因为他们底层都是用cache.NewSharedIndexInformer,所以返回的informer全部都是SharedIndexInformer
            store.NewIndexer               //根据indexers创建Indexer对象,Indexer具体为cache.cache
			cache.NewCacheMutationDetector //创建一个mutationDetector,可以用来检测缓存是否已经被修改,但是无法检测是谁修改

2:informer.AddEventHandler添加处理函数

//2:添加处理函数。informers.NewSharedInformerFactory.Core().V1().Events().Informer().AddEventHandle
cache.sharedIndexInformer.AddEventHandler
  cache.sharedIndexInformer.AddEventHandlerWithResyncPerio
    cache.newProcessListener //创建一个listener对象,整个listener对象负责把process发来的队列数据发给通过AddEventHandler函数注册的处理函数(AddFunc/UpdateFunc/DeleteFunc)
	cache.sharedProcessor.addListener //把listener对象添加到sharedIndexInformer对象内部的sharedProcess对象中,这个sharedProcess就是专门用来启动listerner的
	  go cache.processorListener.run  //下文详解
      go cache.processorListener.pop  //下文详解

3:informer启动流程 !!!

//3: 启动Informer。informers.NewSharedInformerFactory().Start()
cache.SharedInformerFactory.Start
  go cache.sharedIndexInformer.Run                   //循环调用每个sharedIndexerInformer对象的run方法,记住,所有sharedInfromerFactory创建的对象都是sharedIndexerInformer
													 //总结就是三件事:
													 //1:创建deletafifo队列;2:启动listener;3:启动controller
	cache.NewDeltaFIFOWithOptions                    //创建deltafifo队列,并把队列和sharedInformer内部的indexer联系起来,队列的默认key函数为cache.MetaNamespaceKeyFunc 
                                                     //!!!!queue中存放的元素是对象的引用,因为go是自动回收,所以只需创建一次,然后到处引用就行
                                                     //!!!!对象引用后面也会存放在indexer里面,queue之所以需要indexer,是因为queue的删除操作要参考indexer,即queue不会写indexer对象
	cache.New                                        //创建一个controller,controller用来处理队列里的元素,实际处理函数为cache.sharedIndexInformer.HandleDeltas
	cache.sharedProcessor.run                        //开启所有监听器
      go cache.processorListener.run                 //另开一个线程去运行listener,注意,这是listener,不是lister,别看错了,默认是每秒运行一次,每次运行处理所有chan里面的对象
                                                     //根据通知类型然后调用对应的处理函数,这个处理函数就是我们通过Informer().AddEventHandler函数添加的处理函数  
                                                     //cache.ResourceEventHandlerFuncs对象实现了ResourceEventHandler接口
													 //run函数负责监听nextChan,nextChan中的元素就是发生的事件                   
		cache.ResourceEventHandlerFuncs.OnUpdate
	      cache.ResourceEventHandlerFuncs.UpdateFunc
	    cache.ResourceEventHandlerFuncs.OnAdd
		  cache.ResourceEventHandlerFuncs.AddFunc
		cache.ResourceEventHandlerFuncs.OnDelete
	      cache.ResourceEventHandlerFuncs.AddFunc
      go cache.processorListener.pop                 //pop函数负责监听addChan,然后再把这个chan传递给run函数监听的nextChan函数
													 //pop使用了一个环形缓冲区notificationChan来缓冲和中转,即addChan->notificationChan->nextChan
	go cache.controller.Run                          //总结:1:创建并运行Reflector获取数据;2:运行controller.processLoop转发数据到addChan,然后listener的pop方法会把数据转发给run方法
	  cache.NewReflector                             //创建reflector,并把cache.NewSharedIndexInformer函数中创建的listerwatcher传递给这个reflector 
                                                     //reflector就是用来从apiserver获取事件,并保存到指定的地方,就是cache.sharedIndexInformer.Run函数中创建的deltafifo_queue
                                                     //在处理queue中事件的时候会把数据写到indexer中,所以最终我们就能在informer中通过indexer来获取到reflector从apiserver获取的对象了
                                                     //queue实现了cache.Queue接口,而cache.Queue接口包含了cache.store接口,所以queue可以用作store
	  go cache.Reflector.Run                         //开一个线程去运行refector,reflector会把数据保存到指定的queue,
                                                   
		cache.Reflector.ListAndWatch                 //两步,第一步:list,第二步:watch,list和watch操作都会把事件保存到store中,这个store对象就是前面传进来的queue
                                                     //因为queue实现了cache.store接口
		  pager.New                                  //创建一个分页器,list是分页操作
		  pager.List                                 //执行list操作,循环读取,直到读完所有的数据,会把读取的数据放到一个list里面返回
			cache.ListWatch.list                     //然后这个list函数内部就是调用我们在v1.NewFilteredEventInformer函数中调用cache.NewSharedIndexInformer函数中创建的listwatch,
                                                     //就是clientset的list方法,注意因为是不同的v1.NewFilteredXXXXInformer,所以创建的SharedIndexInformer不同
                                                     //所以调用cache.NewSharedIndexInformer时传递的listfunc和watchfun不同,即用的是clientset的不同资源的list方法
                                                     //即v1.NewFilteredEventInformer就是client.CoreV1().Events(namespace).List(context.TODO(), options)
                                                     //即v1.NewFilteredPodInformer就是client.CoreV1().Pods(namespace).List(context.TODO(), options)
                                                       
			meta.EachListItem                        //把list获取的所有对象都放到一个list里面
		    meta.ExtractList                         //从list中提取所有对象,list中读取的原始对象,这里就是通过反射获取原始对象的类型(runtime.Object or runtime.Unknow)
                                                     //其中runtime.unknow表示该类型我们的scheme.codec解析失败的对象
			cache.Reflector.syncWith               
              cache.DeltaQueue.Replace               //把list操作获取的所有数据都保存到内部的deltaqueue中,即我们前面创建的deltaqueue队列,
                                                     //注意:对于queue中的indexer,queue只读不写,queue中的indexer由cache.sharedIndexInformer.HandleDeltas方法写入对象
		  cache.ListWatch.Watch                      //watch操作,对于eventInformer就是client.CoreV1().Events(namespace).Watch(context.TODO(), options)
		  cache.Reflector.watchHandler               //watch机制是另开了一个线程,通过chan通信,watchhandler就是监听这个chan,然后根据事件类型调用内部store对象的对应方法   
            cache.DeltaQueue.Update
            cache.DeltaQueue.Add
            cache.DeltaQueue.Delete
	  cache.controller.processLoop                    //从queue读取事件,写入indexer,然后分发给listener pop所监听的addChan,然后pop函数再转发给run监听的nextChan
        queue.DeltaFIFO.Pop                           //一个死循环,不断调用队列的pop函数来处理队列里的元素,如果没有就会阻塞
	      cache.sharedIndexInformer.HandleDeltas      //处理一个元素,干三件事:1:记录更改的对象;2:更新indexer;2:分发事件,即把事件写入addChan
			cache.defaultCacheMutationDetector.AddObj //记录被修改的对象到mutationDetector对象内部
			case:
			  sharedIndexerInformer.Indexer.Update               //更新Indexer,即把对象的引用保存到indexer对象,indexer只是一个接口,indexer的实际类型为cache.cache
																 //!!!!indexer才是对象的最终存储的地方,informer里面的queue相当于一个数据或事件的中转站,即缓存apiserver过来的数据
                                                                 //reflector通过listwatch把对象放到informer的queue里面,然后cache.controller.processLoop不断从queue读取数据/事件
                                                                 //获取到数据后再把数据写入indexer中
			  cache.sharedProcessor.distribute                   //分发事件
				cache.processorListener.add
	  			  cache.processorListener.addCh <- notification  //实际的分发操作就是把结果写入chan就行了           
			case:
			  sharedIndexerInformer.Indexer.Add
			  cache.sharedProcessor.distribute  
			case:
			  sharedIndexerInformer.Indexer.Delete
              cache.sharedProcessor.distribute  

4:informer.Lister

//4:informer.Lister
//4.1:创建informer.Lister。informers.NewSharedInformerFactory.Core().V1().Events().Lister
cache.eventInformer.Lister
  cache.eventInformer.Informer           //这个Informer函数即上面分析的informers.NewSharedInformerFactory.Core().V1().Events().Informer()函数,他用于获取和创建informer
                                         //所以lister的逻辑就是先从factory中获取event资源对应的informer,如果存在,则返回已存在的,如果不存在,则创建
  cache.sharedIndexInformer.GetIndexer   //获取informer内部的indexer对象,因为go是自动回收,所以对象创建一次然后到处引用就行,所以indexer中保存了原始对象的引用
                                         //indexer本质上也是一个store对象,他是一个提供了索引功能的store对象,所以indexer可用来存储数据,而前面说的queue是一个先进先出的store对象
  v1.NewEventLister                      //创建一个v1.eventLister对象,这个函数会把informer中的indexer对象的引用存放到新创建的eventLister对象中  


//4.2:通过Lister从informer中获取对象。informers.NewSharedInformerFactory.Core().V1().Events().Lister().List()
v1.eventLister.List              //返回indexer中的所有数据
  cache.ListAll                  //把indexer中所有的对象都添加到一个list中
   cache.List()                  //返回indexer中的所有对象
     cache.threadSafeMap.List    //就是遍历内部的map,然后返回所有的value


//4.3:通过lister从informer中检索对象。informers.NewSharedInformerFactory.Core().V1().Events().Lister().Events().Get()
//informers.NewSharedInformerFactory.Core().V1().Events().Lister().Events()会返回一个eventNamespaceLister对象,该对象可以检索指定命名空间下的所有对象
v1.eventNamespaceLister.List
  cache.ListAllByNamespace      //就是从indexer中检索指定命名空间下由get参数指定的对象,
								//!!!informer.Lister是直接读indexer,如果本地缓存没有就直接报错了
v1.eventNamepsaceLister.Get
  cache.cache.GetByKey          //从indexer中查找,informer.Lister是直接读indexer,如果本地缓存没有就直接报错了


暂停一下,我们捋一下存储结构:
	informer中含有一个indexer,一个indexer内部是一个cache.cache,cache.cache内部是一个cache.threadSafeMap,threadSafeMap内部是一个带锁的map
	我们可以通过informer.Lister()来创建一个可以检索informer内部的indexer,至此,结束

5:等待informer同步完成

//5:等待同步(略)。cache.WaitForCacheSync
//这个简单,就是不断地调用传进来的检查函数,如果检查函数返回true,说明该infromer应该同步成功了,如果所有的检查函数都返回true,就说明所有的informer都同步成功了

6:自定义controller笔记

//6:自定义controller
/*
   这个就简单了,前面我们已经分析了informer的整个流程,可以知道informer的reflector会自动从apiserver拉取数据,因为处理能力有限,所以informer的reflector就把数据或者事件缓存到事件队列queue
   然后从queue事件队列取元素,然后把数据保存到indexer,然后对每一个数据调用一次对应的处理函数,所以如果我们要实现一个自定义的controller,那么思路就很清晰了,我们一共需要两个东西
   第一个是一个事件队列或者叫数据队列,即queue,用来存储发生了什么事件或者来了什么数据,然后是一个indexer,即数据存储的地方,我们不断从事件队列取出事件,这个事件可以是某个对象的key
   然后去indexer中通过这个key获取数据,indexer是informer内部的,但是我们可以通过informer.Lister方法访问到,所以我们只需要把informer传递给我们自定义的controller就行了,这样就由informer
   内部的indexer去自动存储数据,就不需要自己手动取存储这些对象了,我们直接从indexer检索就行了。另一方面,informer内部reflector用到的事件队列是informer内部的,我们外部是无法感知的,
   但是informer内部对于每个事件都会调用一次informer.AddEventHandler中的函数,所以我们就需要利用这个机会,把事件转存到我们自己的事件队列所以我们要自定义一个queue,
   然后在informer.AddEventHandler中的AddFunc/UpdateFunc/DeleteFunc中把事件转存到我们自己的事件队列,然后开一个死循环去读取我们自己的事件队列就行了。
   实现一个自定义controller的逻辑如下:
	1:创建一个并发安全的事件队列,即queue。之后我们要把他同时传递给controller和informer的AddEventHandler。
	  这个queue我们存放的元素有两种选择:
	    1:直接存放原始对象,这样我们在controller里取出的对象就直接是完整对象了,就可以不用再去informer中查找了,即indexers就可以不需要了,即可以不把informer传递给我们的controller
		2:放原始对象的key,然后在controller里再根据这个key去查找原始对象。这种方式要求我们在创建controller的时候要同时把queue和informer传递给controller,因为数据是存放在informer里面的
	2:在informer.AddEventHandler中添加Add/Update/Delete函数,该处理函数把informer框架传进来的对象或者对象的key转存到我们自己的queue中,这样就把informer和这个queue联系起来了
	3:把queue传递给我们创建的controller,这样我们的controller就能异步获取到所有数据了,接下来的处理就很简单了:开n个工作者线程,不断去轮询我们自己的queue,如果有元素就处理,没有就阻塞
	笔记1:informer中对象默认的key函数是cache.MetaNamespaceKeyFunc即key=namespace/objname
	笔记2:我们这里informer是通过sharedInformerFactory来创建informer对象,也就是说indexer是factory帮我们创建并传递给informer的,如果我们不用factory,而是直接使用
          cache.NewSharedIndexInformer,那么我们从函数参数可以发现我们需要手动创建indexer,手动创建ListerWatch
*/

附录:几个函数的源代码

//factory的InformerFor函数:如果存在则直接返回,如果没有则创建一个全新的,并保存到map
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
	f.lock.Lock()
	defer f.lock.Unlock()

	informerType := reflect.TypeOf(obj)
	informer, exists := f.informers[informerType]
	if exists {
		return informer
	}

	resyncPeriod, exists := f.customResync[informerType]
	if !exists {
		resyncPeriod = f.defaultResync
	}

	informer = newFunc(f.client, resyncPeriod)  //这个就是对应的v1.eventInformer.defaultInformer函数
	f.informers[informerType] = informer

	return informer
}


//实际构造eventInformer构造函数详解
v1.NewFilteredEventInformer  
  return cache.NewSharedIndexInformer(    //这是通用的Informer构造函数,就是填充参数并简单地返回一个SharedIndexInformer对象
	&cache.ListWatch{
		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {             //list函数
			return client.CoreV1().Events(namespace).List(context.TODO(), options)      //本质就是client-go去询问ApiServer
		},
		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {          //watch函数
			return client.CoreV1().Events(namespace).Watch(context.TODO(), options)     //本质就是client-go去询问ApiServer
		},
	},
	&corev1.Event{},                                                                    //这是此informer监听的对象,会通过反射把查询结果转换为此对象
	resyncPeriod, 
	indexers,                                                                           //数据存放位置
  )


//构造实际的存储对象
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
	return &cache{
		cacheStorage: NewThreadSafeStore(indexers, Indices{}),    //实际是cache.threadSafeMap实际就是一个带锁的结构体,内部包含了一个用来存储的indexer
		keyFunc:      keyFunc,                                    //这个keyFunc默认为cache.DeletionHandlingMetaNamespaceKeyFunc
	}
}

//key函数
func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
	if d, ok := obj.(DeletedFinalStateUnknown); ok {
		return d.Key, nil
	}
	return MetaNamespaceKeyFunc(obj)
}
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐