简介

之前介绍过sigs.k8s.io controller-runtime系列之七 certwatcher分析sigs.k8s.io controller-runtime-certwatcher
本文主要介绍pkg/cluster的源码分析。

目录结构

  1. cluster.go
    • Cluster结构体 提供了与集群交互的各种方法。
    type Cluster interface {
    	// SetFields将设置对象已实现inject接口的对象上的任何依赖项,例如inject.Client。 
    	SetFields(interface{}) error
    
    	// 获取一个初始化的配置
    	GetConfig() *rest.Config
    
    	// 获取一个初始化的scheme
    	GetScheme() *runtime.Scheme
    
    	// 返回一个配置了Config的client
    	GetClient() client.Client
    
    	// 获取一个有client配置的field索引器
    	GetFieldIndexer() client.FieldIndexer
    
    	// 获取一个cache
    	GetCache() cache.Cache
    
    	// 通过name获取EventRecorder
    	GetEventRecorderFor(name string) record.EventRecorder
    
    	// 获取RESTMapper
    	GetRESTMapper() meta.RESTMapper
    
    	// 获取一个read api server的Reader.
    	GetAPIReader() client.Reader
    
    	Start(ctx context.Context) error
    }
    
    • Options结构体 群集配置的可能选项
    type Options struct {
    
        // 用于解析runtime.Objects 到 GroupVersionKinds / Resources
        // 默认是kubernetes/client-go scheme.Scheme,如果你对scheme做了修改,最好将自己定义的scheme传入
    	Scheme *runtime.Scheme
    
    	// 用于提供 rest mapper 用来映射 types 到 Kubernetes APIs
    	MapperProvider func(c *rest.Config) (meta.RESTMapper, error)
    
    	Logger logr.Logger
    
    	// reconcile resource的最小周期. 越低同步频率越快, 但是,如果有许多被watch的资源,则会降低对更改的响应能力. 
        // 默认是 10 hours,
    	// 所有控制器的SyncPeriod之间会有10%的抖动,因此所有控制器不会同时发送列表请求。
    	SyncPeriod *time.Duration
    
    	// 注意:如果指定了名称空间,控制器仍然可以监视群集范围的资源(例如节点)。
        // 对于具有名称空间的资源,缓存将只保存所需命名空间中的对象。 
    	Namespace string
    
    	NewCache cache.NewCacheFunc
    
    	NewClient NewClientFunc
    
    	// 不适用缓存操作对象的slice
    	ClientDisableCacheFor []client.Object
    
    	DryRunClient bool
    
    	// EventBroadcaster接受管理器发出的事件,并将它们发送到Kubernetes API(log、sink、watcher),使用此选项可自定义事件相关器和垃圾邮件过滤器
    	// 不推荐使用:如果管理器或控制器的生命周期比进程的生命周期短。 
    	EventBroadcaster record.EventBroadcaster
    
    	// makeBroadcaster允许将Broadcaster的创建推迟以避免goroutines泄漏(如果我们从来没有在manager中调用过Start方法)
        // 该方法应该总是返回"owned"(我理解的是manager中关联)broadcaster,并且必须有manager关闭
    	makeBroadcaster intrec.EventBroadcasterProducer
    
    	newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster intrec.EventBroadcasterProducer) (*intrec.Provider, error)
    }
    
    • New函数 构建一个全新的cluster
    // 用来操作Options
    type Option func(*Options)
    
    func New(config *rest.Config, opts ...Option) (Cluster, error) {
    	if config == nil {
    		return nil, errors.New("must specify Config")
    	}
    
    	options := Options{}
    	for _, opt := range opts {
    		opt(&options)
    	}
        // 设置Options 中fields的默认值
    	options = setOptionsDefaults(options)
    
    	// 创建restmapper
    	mapper, err := options.MapperProvider(config)
    	if err != nil {
    		options.Logger.Error(err, "Failed to get API Group-Resources")
    		return nil, err
    	}
    
    	// Create the cache for the cached read client and registering informers
    	// 创建缓存(读 client并注册informers)
    	cache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})
    	if err != nil {
    		return nil, err
    	}
    
    	clientOptions := client.Options{Scheme: options.Scheme, Mapper: mapper}
        // 创建client(read和write都是api server)
    	apiReader, err := client.New(config, clientOptions)
    	if err != nil {
    		return nil, err
    	}
        
        // 创建client(read是通过cache和write是api server)
    	writeObj, err := options.NewClient(cache, config, clientOptions, options.ClientDisableCacheFor...)
    	if err != nil {
    		return nil, err
    	}
    
        // 如果options.DryRunClient 为true,则覆盖writeObj为dryRun模式
        // client在增删改查时可以通过option.DryRun的值来判断,如果是Get/List/Delete操作,可以执行,但是涉及obj的Update/Patch等操作时,直接忽略
    	if options.DryRunClient {
    		writeObj = client.NewDryRunClient(writeObj)
    	}
    
    	recorderProvider, err := options.newRecorderProvider(config, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster)
    	if err != nil {
    		return nil, err
    	}
    
    	return &cluster{
    		config:           config,
    		scheme:           options.Scheme,
    		cache:            cache,
    		fieldIndexes:     cache,
    		client:           writeObj,
    		apiReader:        apiReader,
    		recorderProvider: recorderProvider,
    		mapper:           mapper,
    		logger:           options.Logger,
    	}, nil
    }
    
    // 设置Options 中fields的默认值
    func setOptionsDefaults(options Options) Options {
    	// 如果options.Scheme为nil则设置为 Kubernetes client-go scheme
    	if options.Scheme == nil {
    		options.Scheme = scheme.Scheme
    	}
    
    	if options.MapperProvider == nil {
            // 设置MapperProvider
    		options.MapperProvider = func(c *rest.Config) (meta.RESTMapper, error) {
    			return apiutil.NewDynamicRESTMapper(c)
    		}
    	}
    
    	// 允许用户定义如何创建一个client
    	if options.NewClient == nil {
    		options.NewClient = DefaultNewClient
    	}
    
    	if options.NewCache == nil {
    		options.NewCache = cache.New
    	}
    
    	if options.newRecorderProvider == nil {
    		options.newRecorderProvider = intrec.NewProvider
    	}
    
        // 这与pkg/manager是重复的(manager.Options中也有定义),
    	// 我们需要判定使用哪个,如果options.EventBroadcaster不为nil,则使用,否则赋值options.makeBroadcaster创建新的
    	if options.EventBroadcaster == nil {
    		// defer initialization to avoid leaking by default
    		options.makeBroadcaster = func() (record.EventBroadcaster, bool) {
    			return record.NewBroadcaster(), true
    		}
    	} else {
    		options.makeBroadcaster = func() (record.EventBroadcaster, bool) {
    			return options.EventBroadcaster, false
    		}
    	}
    
    	if options.Logger == nil {
    		options.Logger = logf.RuntimeLog.WithName("cluster")
    	}
    
    	return options
    }
    
  2. internal.go
    • cluster结构体 内部对Cluster的实现。
    // 和Cluster类似  不做介绍
    type cluster struct {
    	config *rest.Config
    
    	scheme *runtime.Scheme
    
    	cache cache.Cache
    
    	client client.Client
    
    	apiReader client.Reader
    
    	fieldIndexes client.FieldIndexer
    
    	recorderProvider *intrec.Provider
    
    	mapper meta.RESTMapper
    
    	logger logr.Logger
    }  
     
    // 注入实现了inject.*中的接口config、client、reader、scheme、cache、mapper
    func (c *cluster) SetFields(i interface{}) error {
    	if _, err := inject.ConfigInto(c.config, i); err != nil {
    		return err
    	}
    	if _, err := inject.ClientInto(c.client, i); err != nil {
    		return err
    	}
    	if _, err := inject.APIReaderInto(c.apiReader, i); err != nil {
    		return err
    	}
    	if _, err := inject.SchemeInto(c.scheme, i); err != nil {
    		return err
    	}
    	if _, err := inject.CacheInto(c.cache, i); err != nil {
    		return err
    	}
    	if _, err := inject.MapperInto(c.mapper, i); err != nil {
    		return err
    	}
    	return nil
    }
    
    // 启动cluster  开启cache(实际上是开启InformersMap下的所有Informer)
    func (c *cluster) Start(ctx context.Context) error {
    	defer c.recorderProvider.Stop(ctx)
    	return c.cache.Start(ctx)
    }
    
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐