简介

之前介绍过sigs.k8s.io controller-runtime系列之四 client分析sigs.k8s.io controller-runtime-client
本文主要介绍pkg/cache的源码分析。

目录结构

  1. cache_suite_test.go 注册测试GVK 校验k8s环境 获取client config
    • 依赖ginkgo做集成测试,表示该文件夹内的测试例执行之前执行,
    • BeforeSuite和AfterSuite,会在所有测试例执行之前和之后执行
    • 如果BeforeSuite执行失败,则这个测试集都不会被执行
  2. cache.go
    • Cache接口 提供了充当client.Reader的实例,帮助驱动基于事件处理的Kubernetes对象并添加对象的filed索引
    // Cache充当一个在缓存中存储对象的client
    type Cache interface {
    	client.Reader
    
    	// 用于Cache加载Informers并添加filed 索引(用于过滤和查找).
    	Informers
    }
    
    • Informers接口 为不同的gvk创建和fetch Informers
    type Informers interface {
        // 为给定对应一个kind和resource的obj获取或者构造一个informer
    	GetInformer(ctx context.Context, obj client.Object) (Informer, error)
    
    	// 类似于 GetInformer,通过gvk获取
    	GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error)
    
    	// 启动该cache中所包含的所有的informers
    	Start(ctx context.Context) error
    
        // 等待所有的cache同步
    	WaitForCacheSync(ctx context.Context) bool
    
        // 用于增加filed索引
    	client.FieldIndexer
    }
    
    • Informer接口
    type Informer interface {
        // 使用共享informer的重新同步周期将事件处理程序添加到共享informer。
        // 单个handler的事件按顺序传递,但不同handler之间没有协调。
    	AddEventHandler(handler toolscache.ResourceEventHandler)
    
        // 使用特定的重新同步周期将事件处理程序添加到共享informer。
    	// 单个handler的事件按顺序传递,但不同handler之间没有协调。
    	AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration)
    
        // 添加索引, 如果在store中已经存在,在调用该方法,则undefined
    	AddIndexers(indexers toolscache.Indexers) error
    
        // informers下的store已经同步,则返回true
    	HasSynced() bool
    }
    
    • Options接口 在创建新的InformersMap的可选择的参数
      type Options struct {
       // 用于将对象映射到 GroupVersionKinds
      	Scheme *runtime.Scheme
      
      	// Mapper is the RESTMapper to use for mapping GroupVersionKinds to Resources
      	// 用于将 GroupVersionKinds映射到Resource
      	Mapper meta.RESTMapper
      
      	// informer重新同步的基本频率,默认是 defaultResyncTime.
       // informer之间的重新同步期间将增加 10% 的抖动,所以所有informer不会同时发送list请求。
      	Resync *time.Duration
      
      	// 将缓存的 ListWatch 限制为所需的命名空间
      	// 默认t watches all namespaces
      	Namespace string
      }
      
    • New函数 初始化并返回新的cache
    func New(config *rest.Config, opts Options) (Cache, error) {
    	opts, err := defaultOpts(config, opts)
    	if err != nil {
    		return nil, err
    	}
        // 下文会讲到,主要是内部实现的创建structured and unstructured objects的informer
    	im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
    	return &informerCache{InformersMap: im}, nil
    }
    
    // 检测opt,赋值默认属性
    func defaultOpts(config *rest.Config, opts Options) (Options, error) {
    	// 没有设置Scheme,使用默认的
    	if opts.Scheme == nil {
    		opts.Scheme = scheme.Scheme
    	}
    
    	// 设置新的restmapper
    	if opts.Mapper == nil {
    		var err error
    		opts.Mapper, err = apiutil.NewDiscoveryRESTMapper(config)
    		if err != nil {
    			log.WithName("setup").Error(err, "Failed to get API Group-Resources")
    			return opts, fmt.Errorf("could not create RESTMapper from config")
    		}
    	}
    
    	// 没有设置同步周期  默认为10 hours
    	if opts.Resync == nil {
    		opts.Resync = &defaultResyncTime
    	}
    	return opts, nil
    }
    
  3. multi_namespace_cache.go
    • MultiNamespacedCacheBuilder函数 顾名思义,根据多个命名空间创建一个multicache(以一个map来包含多个cache)
    // 将该cache的作用范围限定到一个命名空间list。
    // 注意在大量命名空间中使用它时可能会遇到性能问题。
    func MultiNamespacedCacheBuilder(namespaces []string) NewCacheFunc {
    	return func(config *rest.Config, opts Options) (Cache, error) {
    		opts, err := defaultOpts(config, opts)
    		if err != nil {
    			return nil, err
    		}
    		caches := map[string]Cache{}
    		for _, ns := range namespaces {
    			opts.Namespace = ns
    			c, err := New(config, opts)
    			if err != nil {
    				return nil, err
    			}
    			caches[ns] = c
    		}
    		return &multiNamespaceCache{namespaceToCache: caches, Scheme: opts.Scheme}, nil
    	}
    }
    
  4. internal/cache_reader.go 实现了client.Reader
    • CacheReader结构体 内部使用cache.Index来实现了单一类型的client.CacheReader接口
    type CacheReader struct {
    	// 修饰cache的索引器
    	indexer cache.Indexer
    
    	// resource的gvk
    	groupVersionKind schema.GroupVersionKind
    
    	// resource的范围 (namespaced or cluster-scoped).
    	scopeName apimeta.RESTScopeName
    }
    
    • Get函数 检查对象的索引器并在找到数据的副本写入参数out
      func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Object) error {
      	if c.scopeName == apimeta.RESTScopeNameRoot {
      		key.Namespace = ""
      	}
       // 获取obj store的key
      	storeKey := objectKeyToStoreKey(key)
      
      	// 从索引器缓存中查找对象
      	obj, exists, err := c.indexer.GetByKey(storeKey)
      	if err != nil {
      		return err
      	}
      
      	// 如果没有get,则抛异常
      	if !exists {
      		// Resource gets transformed into Kind in the error anyway, so this is fine
      		return errors.NewNotFound(schema.GroupResource{
      			Group:    c.groupVersionKind.Group,
      			Resource: c.groupVersionKind.Kind,
      		}, key.Name)
      	}
      
      	// 验证结果是一个 runtime.Object
      	if _, isObj := obj.(runtime.Object); !isObj {
      		// 通常 这是不会发生的
      		return fmt.Errorf("cache contained %T, which is not an Object", obj)
      	}
      
      	// 深拷贝来避免污染缓存
      	obj = obj.(runtime.Object).DeepCopyObject()
      
      	// 拷贝在cache中的item的值到返回值中
      	outVal := reflect.ValueOf(out)
      	objVal := reflect.ValueOf(obj)
       // 判断获取到的val和参数返回的type是否相同
      	if !objVal.Type().AssignableTo(outVal.Type()) {
      		return fmt.Errorf("cache had type %s, but %s was asked for", objVal.Type(), outVal.Type())
      	}
      	reflect.Indirect(outVal).Set(reflect.Indirect(objVal))
      	out.GetObjectKind().SetGroupVersionKind(c.groupVersionKind)
      
      	return nil
      }
      
    • List函数 列出索引器中的项目并将它们写出到参数out
      func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...client.ListOption) error {
      	var objs []interface{}
      	var err error
      
      	listOpts := client.ListOptions{}
      	listOpts.ApplyOptions(opts)
      
      	if listOpts.FieldSelector != nil {
      		// 目前只支持通过单一的FieldSelector,TODO通过组合多个索引、GetIndexers 等来支持更复杂的字段选择器
      		field, val, requiresExact := requiresExactMatch(listOpts.FieldSelector)
      		if !requiresExact {
      			return fmt.Errorf("non-exact field matches are not supported by the cache")
      		}
      		// 通过字段选择器列出所有对象.如果有且仅有一个namespace,则以其作为namespace的key.  否则使用 "all namespaces"作为namespace key.
           // indexname的表示形式"field:" + field,indexkey的表示形式是ns + "/" + baseKey
      		objs, err = c.indexer.ByIndex(FieldIndexName(field), KeyToNamespacedKey(listOpts.Namespace, val))
      	} else if listOpts.Namespace != "" {
           // indexname的表示形式namespace,indexkey的表示形式是ns
      		objs, err = c.indexer.ByIndex(cache.NamespaceIndex, listOpts.Namespace)
      	} else {
      		objs = c.indexer.List()
      	}
      	if err != nil {
      		return err
      	}
      
       // 标签选择器
      	var labelSel labels.Selector
      	if listOpts.LabelSelector != nil {
      		labelSel = listOpts.LabelSelector
      	}
      
      	runtimeObjs := make([]runtime.Object, 0, len(objs))
       // 遍历通过indexer获取的items,过滤掉不符合label的item
      	for _, item := range objs {
      		obj, isObj := item.(runtime.Object)
      		if !isObj {
      			return fmt.Errorf("cache contained %T, which is not an Object", obj)
      		}
      		meta, err := apimeta.Accessor(obj)
      		if err != nil {
      			return err
      		}
      		if labelSel != nil {
      			lbls := labels.Set(meta.GetLabels())
      			if !labelSel.Matches(lbls) {
      				continue
      			}
      		}
      
      		outObj := obj.DeepCopyObject()
      		outObj.GetObjectKind().SetGroupVersionKind(c.groupVersionKind)
      		runtimeObjs = append(runtimeObjs, outObj)
      	}
      	return apimeta.SetList(out, runtimeObjs)
      }      
      
  5. internal包中的informers_map.go
    • newSpecificInformersMap函数
    // new一个新的specificInformersMap (和普通的InformersMap一样,但是没有实现WaitForCacheSync).
    func newSpecificInformersMap(config *rest.Config,
    	scheme *runtime.Scheme,
    	mapper meta.RESTMapper,
    	resync time.Duration,
    	namespace string,
    	createListWatcher createListWatcherFunc) *specificInformersMap {
    	ip := &specificInformersMap{
    		config:            config,
    		Scheme:            scheme,
    		mapper:            mapper,
    		informersByGVK:    make(map[schema.GroupVersionKind]*MapEntry),
    		codecs:            serializer.NewCodecFactory(scheme),
    		paramCodec:        runtime.NewParameterCodec(scheme),
    		resync:            resync,
    		startWait:         make(chan struct{}),
    		createListWatcher: createListWatcher,
    		namespace:         namespace,
    	}
    	return ip
    }
    
    • MapEntry 结构体 包含了一个Informer的cached 数据
      type MapEntry struct {
      	// cached informer
      	Informer cache.SharedIndexInformer
      
      	// CacheReader 包装了 Informer 并为单一类型实现了 CacheReader 接口
      	Reader CacheReader
      }
      
    • specificInformersMap结构体
      // 创建和缓存基于runtime.Object和schema.GroupVersionKind的Informers.
      // 使用基于给定生成方案构建的标准参数编解码器。
      type specificInformersMap struct {
      
      	Scheme *runtime.Scheme
      
      	// 和apiserver交互使用
      	config *rest.Config
      
      	// GroupVersionKinds to Resources
      	mapper meta.RESTMapper
      
      	// 以map形式缓存 informers, key是 groupVersionKind
      	informersByGVK map[schema.GroupVersionKind]*MapEntry
      
      	// 用于创建一个新的 REST client
      	codecs serializer.CodecFactory
      
      	// 用于list and watch
      	paramCodec runtime.ParameterCodec
      
      	// 用于 stop informers
      	stop <-chan struct{}
      
      	resync time.Duration
      
      	// 锁 用于访问informersByGVK使用
      	mu sync.RWMutex
      
      	// 标识 informers是否已启动
      	started bool
      
      	// 在informer开始后,该chan将被关闭
      	startWait chan struct{}
      
      	// 创建listWatch的函数
      	createListWatcher createListWatcherFunc
      
      	// namespace 是所有 ListWatches 被限制到的命名空间
      	// 如果为空,则代表 all namespaces
      	namespace string
      }
      
      • Start函数 开启informersByGVK中的所有Informers,并设置属性started为true。该方法是阻塞的
      func (ip *specificInformersMap) Start(ctx context.Context) {
      	func() {
      		ip.mu.Lock()
      		defer ip.mu.Unlock()
      
      		ip.stop = ctx.Done()
      
      		// 开启没有informer
      		for _, informer := range ip.informersByGVK {
      			go informer.Informer.Run(ctx.Done())
      		}
      
      		ip.started = true
      		close(ip.startWait)
      	}()
      	<-ctx.Done()
      }
      
      • Get函数 如果gvk对应的specificInformer不存在,那么创建一个新的并添加到map中。返回map中对应gvk对应的informer
      func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
      	// 存在即返回
      	i, started, ok := func() (*MapEntry, bool, bool) {
      		ip.mu.RLock()
      		defer ip.mu.RUnlock()
      		i, ok := ip.informersByGVK[gvk]
      		return i, ip.started, ok
      	}()
      
      	if !ok {
      		var err error
             // 不存在就添加一个新的
      		if i, started, err = ip.addInformerToMap(gvk, obj); err != nil {
      			return started, nil, err
      		}
      	}
      
         // 如果已经开始了,但是还未同步结束
      	if started && !i.Informer.HasSynced() {
      		// 知道同步完成,才返回informer,如此不会从旧的缓存中读取
      		if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) {
      			return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0)
      		}
      	}
      
      	return started, i, nil
      }
      
      • addInformerToMap函数 根据gvk添加informer到map中
        func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) {
        	ip.mu.Lock()
        	defer ip.mu.Unlock()
        
        	// 检查map中是否已经存在.  如果存在直接返回。
        	if i, ok := ip.informersByGVK[gvk]; ok {
        		return i, ip.started, nil
        	}
        
        	// 创建一个listWatch函数
        	var lw *cache.ListWatch
        	lw, err := ip.createListWatcher(gvk, ip)
        	if err != nil {
        		return nil, false, err
        	}
          // 创建一个IndexInformer
        	ni := cache.NewSharedIndexInformer(lw, obj, resyncPeriod(ip.resync)(), cache.Indexers{
        		cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
        	})
        	rm, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
        	if err != nil {
        		return nil, false, err
        	}
          // 封装一个MapEntry实体,包含了Informer和Reader
        	i := &MapEntry{
        		Informer: ni,
        		Reader:   CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk, scopeName: rm.Scope.Name()},
        	}
        	ip.informersByGVK[gvk] = i
        
        	// 如果调用者已经start,那么开启新加入的informer
        	if ip.started {
        		go i.Informer.Run(ip.stop)
        	}
        	return i, ip.started, nil
        }
        
      • createStructuredListWatch函数 创建listWatch结构化对象的ListWatch函数
        func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
        	// 获取映射obj和gvk
        	mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
        	if err != nil {
        		return nil, err
        	}
          // 根据gvk获取对应的client
        	client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs)
        	if err != nil {
        		return nil, err
        	}
        	listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
        	listObj, err := ip.Scheme.New(listGVK)
        	if err != nil {
        		return nil, err
        	}
        
        	// TODO: 这里可以使用用户自定义的额context
        	ctx := context.TODO()
        
        	return &cache.ListWatch{
        		ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
        			res := listObj.DeepCopyObject()
        			isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
        			err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res)
        			return res, err
        		},
        		// Setup the watch function
        		WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
        			// Watch needs to be set to true separately
        			opts.Watch = true
        			isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
        			return client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch(ctx)
        		},
        	}, nil
        }                
        
  6. internal包中的deleg_map.go
    • InformersMap结构体
      type InformersMap struct {
      
        structured   *specificInformersMap
        unstructured *specificInformersMap
        metadata     *specificInformersMap
      
        Scheme *runtime.Scheme
      }
      
    • NewInformersMap函数
      // 创建一个InformersMap
      func NewInformersMap(config *rest.Config,
      	scheme *runtime.Scheme,
      	mapper meta.RESTMapper,
      	resync time.Duration,
      	namespace string) *InformersMap {
      
      	return &InformersMap{
      		structured:   newStructuredInformersMap(config, scheme, mapper, resync, namespace),
      		unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),
      		metadata:     newMetadataInformersMap(config, scheme, mapper, resync, namespace),
      
      		Scheme: scheme,
      	}
      }
      
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐