client-go之informers包源码分析
client-go之informers包源码分析informers包用于生成各种gvk对应的informer(注意这里这是针对原生k8s的对象)generic.go注意: 这里的informer和informers包是有区别的,虽然具体的informer底层都是包装了SharedIndexInformer,但是另一个属性是不相同的。这里是gvr,而informers包中是gr接口// Generi
·
informers包源码分析
informers包
用于生成各种gvk对应的informer(注意这里这是针对原生k8s的对象)
generic.go
注意: 这里的informer和informers包是有区别的,虽然具体的informer底层都是包装了SharedIndexInformer,但是另一个属性是不相同的。
这里是gvr,而informers包中是gr
- 接口
// GenericInformer 是 SharedIndexInformer 的类型,它将根据类型定位并委托给其他sharedInformer type GenericInformer interface { // 获取informer Informer() cache.SharedIndexInformer // 获取informer衍生出来的lister Lister() cache.GenericLister }
- 结构体
type genericInformer struct { // 具体的informer informer cache.SharedIndexInformer // 这里使用gr来表示对应的资源(ps: 为什么不适用gvr?) resource schema.GroupResource } // Informer 返回 SharedIndexInformer。 func (f *genericInformer) Informer() cache.SharedIndexInformer { return f.informer } // Lister 返回 GenericLister。 func (f *genericInformer) Lister() cache.GenericLister { return cache.NewGenericLister(f.Informer().GetIndexer(), f.resource) } // ForResource 提供对匹配类型(k8s原生Type)的GenericInformer func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
factory.go
注意: 这里的informer和informers包是有区别的,虽然具体的informer底层都是包装了SharedIndexInformer,但是另一个属性是不相同的。
这里是gvr,而informers包中是gr
- 函数
// WithCustomResyncConfig 为指定的 Informer 类型设置自定义重新同步周期 func WithCustomResyncConfig(resyncConfig map[v1.Object]time.Duration) SharedInformerOption { return func(factory *sharedInformerFactory) *sharedInformerFactory { for k, v := range resyncConfig { factory.customResync[reflect.TypeOf(k)] = v } return factory } } // WithTweakListOptions 在配置的 SharedInformerFactory 的所有listers上设置自定义过滤器。 func WithTweakListOptions(tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerOption { return func(factory *sharedInformerFactory) *sharedInformerFactory { factory.tweakListOptions = tweakListOptions return factory } } // WithNamespace 将 SharedInformerFactory 限制为指定的命名空间。 func WithNamespace(namespace string) SharedInformerOption { return func(factory *sharedInformerFactory) *sharedInformerFactory { factory.namespace = namespace return factory } } // NewSharedInformerFactory 为所有命名空间构造一个新的 sharedInformerFactory 实例。 func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory { return NewSharedInformerFactoryWithOptions(client, defaultResync) } // NewFilteredSharedInformerFactory 构造了 sharedInformerFactory 的新实例。 // 通过此 SharedInformerFactory 获得的lister将受到与此处指定的相同的tweakListOptions的约束(执行list方法时)。 // 已弃用:请改用 NewSharedInformerFactoryWithOptions func NewFilteredSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory { return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions)) } // NewSharedInformerFactoryWithOptions 使用附加选项构造 SharedInformerFactory 的新实例。 func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory { factory := &sharedInformerFactory{ client: client, namespace: v1.NamespaceAll, defaultResync: defaultResync, informers: make(map[reflect.Type]cache.SharedIndexInformer), startedInformers: make(map[reflect.Type]bool), customResync: make(map[reflect.Type]time.Duration), } // 配置所有的optins,设置factory的属性 for _, opt := range options { factory = opt(factory) } return factory }
- 接口
// SharedInformerFactory 为所有已知的API 组版本中的资源提供共享informer的接口定义。 type SharedInformerFactory interface { // client-go内部的SharedInformerFactory接口,提供了Start和InformerFor方法 internalinterfaces.SharedInformerFactory // 根据gvr获取GenericInformer ForResource(resource schema.GroupVersionResource) (GenericInformer, error) // 等待informer同步方法 WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool // 下面是informers包中定义获取各种gvr的informer接口 Admissionregistration() admissionregistration.Interface Internal() apiserverinternal.Interface Apps() apps.Interface Autoscaling() autoscaling.Interface Batch() batch.Interface Certificates() certificates.Interface Coordination() coordination.Interface Core() core.Interface Discovery() discovery.Interface Events() events.Interface Extensions() extensions.Interface Flowcontrol() flowcontrol.Interface Networking() networking.Interface Node() node.Interface Policy() policy.Interface Rbac() rbac.Interface Scheduling() scheduling.Interface Storage() storage.Interface }
- 结构体
// SharedInformerOption 定义 SharedInformerFactory 的功能选项类型。用来设置SharedInformerFactory的部分属性 type SharedInformerOption func(*sharedInformerFactory) *sharedInformerFactory // Informer的构造工厂(其实包装了一个map) type sharedInformerFactory struct { // Clientset(k8s内部类型的restClient) client kubernetes.Interface // 限定命名空间 namespace string // 用来调整ListOptions(list查询的options)的函数 tweakListOptions internalinterfaces.TweakListOptionsFunc lock sync.Mutex // 默认的同步周期 defaultResync time.Duration // 自定义的同步周期(key: obj的type value:时间周期) customResync map[reflect.Type]time.Duration // 缓存的informers informers map[reflect.Type]cache.SharedIndexInformer // startInformers 用于跟踪哪些 Informers 已启动。这允许安全地多次调用 Start()。 startedInformers map[reflect.Type]bool } // Start 初始化所有请求的通知者 func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { f.lock.Lock() defer f.lock.Unlock() // 遍历 for informerType, informer := range f.informers { // 判断是否存在 if !f.startedInformers[informerType] { // 开启goruntime启动 go informer.Run(stopCh) // 设置对应的type的启动状态为true f.startedInformers[informerType] = true } } } // WaitForCacheSync 等待所有启动的informer的缓存同步。 func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { informers := func() map[reflect.Type]cache.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() // 获取所有已经started的informer informers := map[reflect.Type]cache.SharedIndexInformer{} for informerType, informer := range f.informers { if f.startedInformers[informerType] { informers[informerType] = informer } } return informers }() // 表示已经started的informer是否已同步完成 res := map[reflect.Type]bool{} for informType, informer := range informers { res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced) } return res } // InternalInformerFor 使用内部Client(client-go定义好的k8s内部的client)返回 obj 的 SharedIndexInformer。newFunc是不同gvk的结构体对应的创建informer的函数 func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() informerType := reflect.TypeOf(obj) informer, exists := f.informers[informerType] if exists { return informer } resyncPeriod, exists := f.customResync[informerType] if !exists { resyncPeriod = f.defaultResync } informer = newFunc(f.client, resyncPeriod) f.informers[informerType] = informer return informer } // 获取admissionregistration的group接口(用来获取各个version的接口) func (f *sharedInformerFactory) Admissionregistration() admissionregistration.Interface { return admissionregistration.New(f, f.namespace, f.tweakListOptions) }
admissionregistration包
以admissionregistration包做分析,其他的类似,不在分析
- interface.go
- 接口
// 提供对这个group的每个version的访问。 type Interface interface { // V1 提供对 V1包 中资源的共享informer的访问。 V1() v1.Interface // V1beta1 提供对 V1beta1包中资源的共享informer的访问。 V1beta1() v1beta1.Interface }
- 结构体
type group struct { factory internalinterfaces.SharedInformerFactory //SharedInformerFactory接口,一般是实现了其的结构体 namespace string // 限定命名空间 tweakListOptions internalinterfaces.TweakListOptionsFunc // 调整ListOptions的函数(用在lister的list方法之前) } // V1 返回一个新的 v1.Interface(接口的实现接头体对象)。 func (g *group) V1() v1.Interface { // 参考下面的v1包分析 return v1.New(g.factory, g.namespace, g.tweakListOptions) } // V1beta1 返回一个新的 v1beta1.Interface(接口的实现接头体对象)。 func (g *group) V1beta1() v1beta1.Interface { // 参考下面的v1beta1包分析 return v1beta1.New(g.factory, g.namespace, g.tweakListOptions) }
- 函数
// New 返回一个新的接口实现的结构体。 func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface { return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions} }
- 接口
- v1
- interface.go
- 接口
// 提供访问此gv中所有resource的informer的权限。 type Interface interface { // MutatingWebhookConfigurations 返回一个 MutatingWebhookConfigurationInformer。 MutatingWebhookConfigurations() MutatingWebhookConfigurationInformer // ValidatingWebhookConfigurations 返回一个 ValidatingWebhookConfigurationInformer。 ValidatingWebhookConfigurations() ValidatingWebhookConfigurationInformer }
- 函数
// New 返回一个新的接口(接口实现的结构体对象)。 func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface { return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions} }
- 结构体
// 所有属性都是从group(此group非原生group,是上面提到的group)中传递过来的 type version struct { factory internalinterfaces.SharedInformerFactory namespace string tweakListOptions internalinterfaces.TweakListOptionsFunc } // MutatingWebhookConfigurations 返回一个 MutatingWebhookConfigurationInformer。 func (v *version) MutatingWebhookConfigurations() MutatingWebhookConfigurationInformer { return &mutatingWebhookConfigurationInformer{factory: v.factory, tweakListOptions: v.tweakListOptions} } // ValidatingWebhookConfigurations 返回一个 ValidatingWebhookConfigurationInformer。 func (v *version) ValidatingWebhookConfigurations() ValidatingWebhookConfigurationInformer { return &validatingWebhookConfigurationInformer{factory: v.factory, tweakListOptions: v.tweakListOptions} }
- 接口
- mutatingwebhookconfiguration.go
- 接口
// MutatingWebhookConfigurationInformer 定义共享informer和lister。 type MutatingWebhookConfigurationInformer interface { Informer() cache.SharedIndexInformer Lister() v1.MutatingWebhookConfigurationLister }
- 方法
// NewMutatingWebhookConfigurationInformer 为 MutatingWebhookConfiguration(k8s原生类型)类型构造一个新的 Informer。 // 最佳实践是使用 Informer 工厂来获得一个共享的 Informer 而不是获得独立的一个。这减少了内存占用和到服务器的连接数。 func NewMutatingWebhookConfigurationInformer(client kubernetes.Interface, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { return NewFilteredMutatingWebhookConfigurationInformer(client, resyncPeriod, indexers, nil) } // NewFilteredMutatingWebhookConfigurationInformer 为 MutatingWebhookConfiguration(k8s原生类型) 类型构造一个新的informer(带有TweakListOptionsFunc)。 // 最佳实践是使用 Informer 工厂来获得一个共享的 Informer 而不是获得独立的一个。这减少了内存占用和到服务器的连接数。 func NewFilteredMutatingWebhookConfigurationInformer(client kubernetes.Interface, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { // 调用tools/cache包下的方法(后面章节分析),用来创建一个带有indexer(缓存)的informer return cache.NewSharedIndexInformer( // 构造ListWatch接口,用来监听apiserver接口 &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.AdmissionregistrationV1().MutatingWebhookConfigurations().List(context.TODO(), options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.AdmissionregistrationV1().MutatingWebhookConfigurations().Watch(context.TODO(), options) }, }, // MutatingWebhookConfiguration(kubernetes/api的type) &admissionregistrationv1.MutatingWebhookConfiguration{}, // 同步周期 resyncPeriod, // 一个map,key:索引name value:生成Index(indics的value)的key的函数 indexers, ) }
- 结构体
// 所有属性都是从version(此version非原生version,是上面提到的version)中传递过来的 type mutatingWebhookConfigurationInformer struct { factory internalinterfaces.SharedInformerFactory tweakListOptions internalinterfaces.TweakListOptionsFunc } // 作为参数传递给factory.InformerFor方法,用来创建informer func (f *mutatingWebhookConfigurationInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { return NewFilteredMutatingWebhookConfigurationInformer(client, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) } // 调用factory的InformerFor(本节前面已分析,不在解释) func (f *mutatingWebhookConfigurationInformer) Informer() cache.SharedIndexInformer { return f.factory.InformerFor(&admissionregistrationv1.MutatingWebhookConfiguration{}, f.defaultInformer) } // 调用listers包中对应gv下的NewMutatingWebhookConfigurationLister(后续章节分析),用来创建lister func (f *mutatingWebhookConfigurationInformer) Lister() v1.MutatingWebhookConfigurationLister { return v1.NewMutatingWebhookConfigurationLister(f.Informer().GetIndexer()) }
- 接口
- interface.go
- v1beta1
类似v1,这里不在做分析
更多推荐
已为社区贡献15条内容
所有评论(0)