sigs.k8s.io controller-runtime系列之五 cache分析
简介之前介绍过sigs.k8s.io controller-runtime系列之四client分析sigs.k8s.io controller-runtime-client 。本文主要介绍pkg/cache的源码分析。目录结构cache_suite_test.go 注册测试GVK 校验k8s环境获取client config依赖ginkgo做集成测试,表示该文件夹内的测试例执行之前执行,Befor
·
简介
之前介绍过sigs.k8s.io controller-runtime系列之四 client分析sigs.k8s.io controller-runtime-client 。
本文主要介绍pkg/cache的源码分析。
目录结构
- cache_suite_test.go 注册测试GVK 校验k8s环境 获取client config
- 依赖ginkgo做集成测试,表示该文件夹内的测试例执行之前执行,
- BeforeSuite和AfterSuite,会在所有测试例执行之前和之后执行
- 如果BeforeSuite执行失败,则这个测试集都不会被执行
- cache.go
- Cache接口 提供了充当client.Reader的实例,帮助驱动基于事件处理的Kubernetes对象并添加对象的filed索引
// Cache充当一个在缓存中存储对象的client type Cache interface { client.Reader // 用于Cache加载Informers并添加filed 索引(用于过滤和查找). Informers }
- Informers接口 为不同的gvk创建和fetch Informers
type Informers interface { // 为给定对应一个kind和resource的obj获取或者构造一个informer GetInformer(ctx context.Context, obj client.Object) (Informer, error) // 类似于 GetInformer,通过gvk获取 GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) // 启动该cache中所包含的所有的informers Start(ctx context.Context) error // 等待所有的cache同步 WaitForCacheSync(ctx context.Context) bool // 用于增加filed索引 client.FieldIndexer }
- Informer接口
type Informer interface { // 使用共享informer的重新同步周期将事件处理程序添加到共享informer。 // 单个handler的事件按顺序传递,但不同handler之间没有协调。 AddEventHandler(handler toolscache.ResourceEventHandler) // 使用特定的重新同步周期将事件处理程序添加到共享informer。 // 单个handler的事件按顺序传递,但不同handler之间没有协调。 AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) // 添加索引, 如果在store中已经存在,在调用该方法,则undefined AddIndexers(indexers toolscache.Indexers) error // informers下的store已经同步,则返回true HasSynced() bool }
- Options接口 在创建新的InformersMap的可选择的参数
type Options struct { // 用于将对象映射到 GroupVersionKinds Scheme *runtime.Scheme // Mapper is the RESTMapper to use for mapping GroupVersionKinds to Resources // 用于将 GroupVersionKinds映射到Resource Mapper meta.RESTMapper // informer重新同步的基本频率,默认是 defaultResyncTime. // informer之间的重新同步期间将增加 10% 的抖动,所以所有informer不会同时发送list请求。 Resync *time.Duration // 将缓存的 ListWatch 限制为所需的命名空间 // 默认t watches all namespaces Namespace string }
- New函数 初始化并返回新的cache
func New(config *rest.Config, opts Options) (Cache, error) { opts, err := defaultOpts(config, opts) if err != nil { return nil, err } // 下文会讲到,主要是内部实现的创建structured and unstructured objects的informer im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace) return &informerCache{InformersMap: im}, nil } // 检测opt,赋值默认属性 func defaultOpts(config *rest.Config, opts Options) (Options, error) { // 没有设置Scheme,使用默认的 if opts.Scheme == nil { opts.Scheme = scheme.Scheme } // 设置新的restmapper if opts.Mapper == nil { var err error opts.Mapper, err = apiutil.NewDiscoveryRESTMapper(config) if err != nil { log.WithName("setup").Error(err, "Failed to get API Group-Resources") return opts, fmt.Errorf("could not create RESTMapper from config") } } // 没有设置同步周期 默认为10 hours if opts.Resync == nil { opts.Resync = &defaultResyncTime } return opts, nil }
- multi_namespace_cache.go
- MultiNamespacedCacheBuilder函数 顾名思义,根据多个命名空间创建一个multicache(以一个map来包含多个cache)
// 将该cache的作用范围限定到一个命名空间list。 // 注意在大量命名空间中使用它时可能会遇到性能问题。 func MultiNamespacedCacheBuilder(namespaces []string) NewCacheFunc { return func(config *rest.Config, opts Options) (Cache, error) { opts, err := defaultOpts(config, opts) if err != nil { return nil, err } caches := map[string]Cache{} for _, ns := range namespaces { opts.Namespace = ns c, err := New(config, opts) if err != nil { return nil, err } caches[ns] = c } return &multiNamespaceCache{namespaceToCache: caches, Scheme: opts.Scheme}, nil } }
- internal/cache_reader.go 实现了client.Reader
- CacheReader结构体 内部使用cache.Index来实现了单一类型的client.CacheReader接口
type CacheReader struct { // 修饰cache的索引器 indexer cache.Indexer // resource的gvk groupVersionKind schema.GroupVersionKind // resource的范围 (namespaced or cluster-scoped). scopeName apimeta.RESTScopeName }
- Get函数 检查对象的索引器并在找到数据的副本写入参数out
func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Object) error { if c.scopeName == apimeta.RESTScopeNameRoot { key.Namespace = "" } // 获取obj store的key storeKey := objectKeyToStoreKey(key) // 从索引器缓存中查找对象 obj, exists, err := c.indexer.GetByKey(storeKey) if err != nil { return err } // 如果没有get,则抛异常 if !exists { // Resource gets transformed into Kind in the error anyway, so this is fine return errors.NewNotFound(schema.GroupResource{ Group: c.groupVersionKind.Group, Resource: c.groupVersionKind.Kind, }, key.Name) } // 验证结果是一个 runtime.Object if _, isObj := obj.(runtime.Object); !isObj { // 通常 这是不会发生的 return fmt.Errorf("cache contained %T, which is not an Object", obj) } // 深拷贝来避免污染缓存 obj = obj.(runtime.Object).DeepCopyObject() // 拷贝在cache中的item的值到返回值中 outVal := reflect.ValueOf(out) objVal := reflect.ValueOf(obj) // 判断获取到的val和参数返回的type是否相同 if !objVal.Type().AssignableTo(outVal.Type()) { return fmt.Errorf("cache had type %s, but %s was asked for", objVal.Type(), outVal.Type()) } reflect.Indirect(outVal).Set(reflect.Indirect(objVal)) out.GetObjectKind().SetGroupVersionKind(c.groupVersionKind) return nil }
- List函数 列出索引器中的项目并将它们写出到参数out
func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...client.ListOption) error { var objs []interface{} var err error listOpts := client.ListOptions{} listOpts.ApplyOptions(opts) if listOpts.FieldSelector != nil { // 目前只支持通过单一的FieldSelector,TODO通过组合多个索引、GetIndexers 等来支持更复杂的字段选择器 field, val, requiresExact := requiresExactMatch(listOpts.FieldSelector) if !requiresExact { return fmt.Errorf("non-exact field matches are not supported by the cache") } // 通过字段选择器列出所有对象.如果有且仅有一个namespace,则以其作为namespace的key. 否则使用 "all namespaces"作为namespace key. // indexname的表示形式"field:" + field,indexkey的表示形式是ns + "/" + baseKey objs, err = c.indexer.ByIndex(FieldIndexName(field), KeyToNamespacedKey(listOpts.Namespace, val)) } else if listOpts.Namespace != "" { // indexname的表示形式namespace,indexkey的表示形式是ns objs, err = c.indexer.ByIndex(cache.NamespaceIndex, listOpts.Namespace) } else { objs = c.indexer.List() } if err != nil { return err } // 标签选择器 var labelSel labels.Selector if listOpts.LabelSelector != nil { labelSel = listOpts.LabelSelector } runtimeObjs := make([]runtime.Object, 0, len(objs)) // 遍历通过indexer获取的items,过滤掉不符合label的item for _, item := range objs { obj, isObj := item.(runtime.Object) if !isObj { return fmt.Errorf("cache contained %T, which is not an Object", obj) } meta, err := apimeta.Accessor(obj) if err != nil { return err } if labelSel != nil { lbls := labels.Set(meta.GetLabels()) if !labelSel.Matches(lbls) { continue } } outObj := obj.DeepCopyObject() outObj.GetObjectKind().SetGroupVersionKind(c.groupVersionKind) runtimeObjs = append(runtimeObjs, outObj) } return apimeta.SetList(out, runtimeObjs) }
- internal包中的informers_map.go
- newSpecificInformersMap函数
// new一个新的specificInformersMap (和普通的InformersMap一样,但是没有实现WaitForCacheSync). func newSpecificInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string, createListWatcher createListWatcherFunc) *specificInformersMap { ip := &specificInformersMap{ config: config, Scheme: scheme, mapper: mapper, informersByGVK: make(map[schema.GroupVersionKind]*MapEntry), codecs: serializer.NewCodecFactory(scheme), paramCodec: runtime.NewParameterCodec(scheme), resync: resync, startWait: make(chan struct{}), createListWatcher: createListWatcher, namespace: namespace, } return ip }
- MapEntry 结构体 包含了一个Informer的cached 数据
type MapEntry struct { // cached informer Informer cache.SharedIndexInformer // CacheReader 包装了 Informer 并为单一类型实现了 CacheReader 接口 Reader CacheReader }
- specificInformersMap结构体
// 创建和缓存基于runtime.Object和schema.GroupVersionKind的Informers. // 使用基于给定生成方案构建的标准参数编解码器。 type specificInformersMap struct { Scheme *runtime.Scheme // 和apiserver交互使用 config *rest.Config // GroupVersionKinds to Resources mapper meta.RESTMapper // 以map形式缓存 informers, key是 groupVersionKind informersByGVK map[schema.GroupVersionKind]*MapEntry // 用于创建一个新的 REST client codecs serializer.CodecFactory // 用于list and watch paramCodec runtime.ParameterCodec // 用于 stop informers stop <-chan struct{} resync time.Duration // 锁 用于访问informersByGVK使用 mu sync.RWMutex // 标识 informers是否已启动 started bool // 在informer开始后,该chan将被关闭 startWait chan struct{} // 创建listWatch的函数 createListWatcher createListWatcherFunc // namespace 是所有 ListWatches 被限制到的命名空间 // 如果为空,则代表 all namespaces namespace string }
- Start函数 开启informersByGVK中的所有Informers,并设置属性started为true。该方法是阻塞的
func (ip *specificInformersMap) Start(ctx context.Context) { func() { ip.mu.Lock() defer ip.mu.Unlock() ip.stop = ctx.Done() // 开启没有informer for _, informer := range ip.informersByGVK { go informer.Informer.Run(ctx.Done()) } ip.started = true close(ip.startWait) }() <-ctx.Done() }
- Get函数 如果gvk对应的specificInformer不存在,那么创建一个新的并添加到map中。返回map中对应gvk对应的informer
func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) { // 存在即返回 i, started, ok := func() (*MapEntry, bool, bool) { ip.mu.RLock() defer ip.mu.RUnlock() i, ok := ip.informersByGVK[gvk] return i, ip.started, ok }() if !ok { var err error // 不存在就添加一个新的 if i, started, err = ip.addInformerToMap(gvk, obj); err != nil { return started, nil, err } } // 如果已经开始了,但是还未同步结束 if started && !i.Informer.HasSynced() { // 知道同步完成,才返回informer,如此不会从旧的缓存中读取 if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) { return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0) } } return started, i, nil }
- addInformerToMap函数 根据gvk添加informer到map中
func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) { ip.mu.Lock() defer ip.mu.Unlock() // 检查map中是否已经存在. 如果存在直接返回。 if i, ok := ip.informersByGVK[gvk]; ok { return i, ip.started, nil } // 创建一个listWatch函数 var lw *cache.ListWatch lw, err := ip.createListWatcher(gvk, ip) if err != nil { return nil, false, err } // 创建一个IndexInformer ni := cache.NewSharedIndexInformer(lw, obj, resyncPeriod(ip.resync)(), cache.Indexers{ cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, }) rm, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) if err != nil { return nil, false, err } // 封装一个MapEntry实体,包含了Informer和Reader i := &MapEntry{ Informer: ni, Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk, scopeName: rm.Scope.Name()}, } ip.informersByGVK[gvk] = i // 如果调用者已经start,那么开启新加入的informer if ip.started { go i.Informer.Run(ip.stop) } return i, ip.started, nil }
- createStructuredListWatch函数 创建listWatch结构化对象的ListWatch函数
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) { // 获取映射obj和gvk mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) if err != nil { return nil, err } // 根据gvk获取对应的client client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs) if err != nil { return nil, err } listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List") listObj, err := ip.Scheme.New(listGVK) if err != nil { return nil, err } // TODO: 这里可以使用用户自定义的额context ctx := context.TODO() return &cache.ListWatch{ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { res := listObj.DeepCopyObject() isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res) return res, err }, // Setup the watch function WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { // Watch needs to be set to true separately opts.Watch = true isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot return client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch(ctx) }, }, nil }
- internal包中的deleg_map.go
- InformersMap结构体
type InformersMap struct { structured *specificInformersMap unstructured *specificInformersMap metadata *specificInformersMap Scheme *runtime.Scheme }
- NewInformersMap函数
// 创建一个InformersMap func NewInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *InformersMap { return &InformersMap{ structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace), unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace), metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace), Scheme: scheme, } }
- InformersMap结构体
更多推荐
已为社区贡献15条内容
所有评论(0)