sigs.k8s.io controller-runtime系列之八 cluster分析
简介之前介绍过sigs.k8s.io controller-runtime系列之七certwatcher分析sigs.k8s.io controller-runtime-certwatcher 。本文主要介绍pkg/cluster的源码分析。目录结构cluster.goCluster结构体 提供了与集群交互的各种方法。type Cluster interface {// SetFields将设置对
·
简介
之前介绍过sigs.k8s.io controller-runtime系列之七 certwatcher分析sigs.k8s.io controller-runtime-certwatcher 。
本文主要介绍pkg/cluster的源码分析。
目录结构
- cluster.go
- Cluster结构体 提供了与集群交互的各种方法。
type Cluster interface { // SetFields将设置对象已实现inject接口的对象上的任何依赖项,例如inject.Client。 SetFields(interface{}) error // 获取一个初始化的配置 GetConfig() *rest.Config // 获取一个初始化的scheme GetScheme() *runtime.Scheme // 返回一个配置了Config的client GetClient() client.Client // 获取一个有client配置的field索引器 GetFieldIndexer() client.FieldIndexer // 获取一个cache GetCache() cache.Cache // 通过name获取EventRecorder GetEventRecorderFor(name string) record.EventRecorder // 获取RESTMapper GetRESTMapper() meta.RESTMapper // 获取一个read api server的Reader. GetAPIReader() client.Reader Start(ctx context.Context) error }
- Options结构体 群集配置的可能选项
type Options struct { // 用于解析runtime.Objects 到 GroupVersionKinds / Resources // 默认是kubernetes/client-go scheme.Scheme,如果你对scheme做了修改,最好将自己定义的scheme传入 Scheme *runtime.Scheme // 用于提供 rest mapper 用来映射 types 到 Kubernetes APIs MapperProvider func(c *rest.Config) (meta.RESTMapper, error) Logger logr.Logger // reconcile resource的最小周期. 越低同步频率越快, 但是,如果有许多被watch的资源,则会降低对更改的响应能力. // 默认是 10 hours, // 所有控制器的SyncPeriod之间会有10%的抖动,因此所有控制器不会同时发送列表请求。 SyncPeriod *time.Duration // 注意:如果指定了名称空间,控制器仍然可以监视群集范围的资源(例如节点)。 // 对于具有名称空间的资源,缓存将只保存所需命名空间中的对象。 Namespace string NewCache cache.NewCacheFunc NewClient NewClientFunc // 不适用缓存操作对象的slice ClientDisableCacheFor []client.Object DryRunClient bool // EventBroadcaster接受管理器发出的事件,并将它们发送到Kubernetes API(log、sink、watcher),使用此选项可自定义事件相关器和垃圾邮件过滤器 // 不推荐使用:如果管理器或控制器的生命周期比进程的生命周期短。 EventBroadcaster record.EventBroadcaster // makeBroadcaster允许将Broadcaster的创建推迟以避免goroutines泄漏(如果我们从来没有在manager中调用过Start方法) // 该方法应该总是返回"owned"(我理解的是manager中关联)broadcaster,并且必须有manager关闭 makeBroadcaster intrec.EventBroadcasterProducer newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster intrec.EventBroadcasterProducer) (*intrec.Provider, error) }
- New函数 构建一个全新的cluster
// 用来操作Options type Option func(*Options) func New(config *rest.Config, opts ...Option) (Cluster, error) { if config == nil { return nil, errors.New("must specify Config") } options := Options{} for _, opt := range opts { opt(&options) } // 设置Options 中fields的默认值 options = setOptionsDefaults(options) // 创建restmapper mapper, err := options.MapperProvider(config) if err != nil { options.Logger.Error(err, "Failed to get API Group-Resources") return nil, err } // Create the cache for the cached read client and registering informers // 创建缓存(读 client并注册informers) cache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace}) if err != nil { return nil, err } clientOptions := client.Options{Scheme: options.Scheme, Mapper: mapper} // 创建client(read和write都是api server) apiReader, err := client.New(config, clientOptions) if err != nil { return nil, err } // 创建client(read是通过cache和write是api server) writeObj, err := options.NewClient(cache, config, clientOptions, options.ClientDisableCacheFor...) if err != nil { return nil, err } // 如果options.DryRunClient 为true,则覆盖writeObj为dryRun模式 // client在增删改查时可以通过option.DryRun的值来判断,如果是Get/List/Delete操作,可以执行,但是涉及obj的Update/Patch等操作时,直接忽略 if options.DryRunClient { writeObj = client.NewDryRunClient(writeObj) } recorderProvider, err := options.newRecorderProvider(config, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster) if err != nil { return nil, err } return &cluster{ config: config, scheme: options.Scheme, cache: cache, fieldIndexes: cache, client: writeObj, apiReader: apiReader, recorderProvider: recorderProvider, mapper: mapper, logger: options.Logger, }, nil } // 设置Options 中fields的默认值 func setOptionsDefaults(options Options) Options { // 如果options.Scheme为nil则设置为 Kubernetes client-go scheme if options.Scheme == nil { options.Scheme = scheme.Scheme } if options.MapperProvider == nil { // 设置MapperProvider options.MapperProvider = func(c *rest.Config) (meta.RESTMapper, error) { return apiutil.NewDynamicRESTMapper(c) } } // 允许用户定义如何创建一个client if options.NewClient == nil { options.NewClient = DefaultNewClient } if options.NewCache == nil { options.NewCache = cache.New } if options.newRecorderProvider == nil { options.newRecorderProvider = intrec.NewProvider } // 这与pkg/manager是重复的(manager.Options中也有定义), // 我们需要判定使用哪个,如果options.EventBroadcaster不为nil,则使用,否则赋值options.makeBroadcaster创建新的 if options.EventBroadcaster == nil { // defer initialization to avoid leaking by default options.makeBroadcaster = func() (record.EventBroadcaster, bool) { return record.NewBroadcaster(), true } } else { options.makeBroadcaster = func() (record.EventBroadcaster, bool) { return options.EventBroadcaster, false } } if options.Logger == nil { options.Logger = logf.RuntimeLog.WithName("cluster") } return options }
- internal.go
- cluster结构体 内部对Cluster的实现。
// 和Cluster类似 不做介绍 type cluster struct { config *rest.Config scheme *runtime.Scheme cache cache.Cache client client.Client apiReader client.Reader fieldIndexes client.FieldIndexer recorderProvider *intrec.Provider mapper meta.RESTMapper logger logr.Logger } // 注入实现了inject.*中的接口config、client、reader、scheme、cache、mapper func (c *cluster) SetFields(i interface{}) error { if _, err := inject.ConfigInto(c.config, i); err != nil { return err } if _, err := inject.ClientInto(c.client, i); err != nil { return err } if _, err := inject.APIReaderInto(c.apiReader, i); err != nil { return err } if _, err := inject.SchemeInto(c.scheme, i); err != nil { return err } if _, err := inject.CacheInto(c.cache, i); err != nil { return err } if _, err := inject.MapperInto(c.mapper, i); err != nil { return err } return nil } // 启动cluster 开启cache(实际上是开启InformersMap下的所有Informer) func (c *cluster) Start(ctx context.Context) error { defer c.recorderProvider.Stop(ctx) return c.cache.Start(ctx) }
更多推荐
已为社区贡献15条内容
所有评论(0)