本文基于controller-runtime v0.11.2版本进行源码学习

kubebuilder、operator-sdk这些框架都是在controller-runtime基础上做了一层封装,方便开发者快速生成项目的脚手架,本文会以kuebuilder搭建工程作为使用controller-runtime的demo进行源码分析

1、kuebuilder搭建工程

创建脚手架工程

kubebuilder init --domain blog.com

创建API

kubebuilder create api --group apps --version v1alpha1 --kind Application

工程结构如下:

$ tree application-controller
application-controller
├── Dockerfile
├── Makefile
├── PROJECT
├── README.md
├── api
│   └── v1alpha1
│       ├── application_types.go # 自定义CRD的地方
│       ├── groupversion_info.go # GV的通用元数据用于CRD生成,以及Scheme创建方法
│       └── zz_generated.deepcopy.go # 包含代码生成的runtime.Object接口的实现,DeepCopy是核心
├── bin
│   └── controller-gen
├── config
│   ├── crd # 部署CRD的yaml
│   ├── default
│   ├── manager # 部署Controller的yaml
│   ├── prometheus
│   ├── rbac # Controller运行所需的RBAC权限
│   └── samples
├── controllers
│   ├── application_controller.go # 实现自定义Controller业务逻辑的地方
│   └── suite_test.go
├── go.mod
├── go.sum
├── hack
│   └── boilerplate.go.txt
└── main.go # Controller的入口

2、整体架构

  1. Manager管理多个Controller的运行,负责初始化Cache、Client等公共依赖,并提供各个runnbale使用
  2. Client封装了对资源的CRUD操作,其中读操作实际查询的是本地Cache,写操作直接访问API Server
  3. Cache负责在Controller进程里面根据Scheme同步API Server中所有该Controller关心的资源对象,其核心是相关Resource的Informer,Informer会负责监听对应Resource的创建/删除/更新操作,以触发Controller的Reconcile逻辑
  4. Controller是控制器的业务逻辑所在的地方,一个Manager可能会有多个Controller,我们一般只需要实现Reconcile方法即可。上图的Predicate是事件过滤器,我们可以在Controller中过滤掉我们不关心的事件信息
  5. WebHook是我们准入控制实现的地方了,主要是有两类接口,一个是MutatingAdmissionWebhook需要实现Defaulter接口,一个是ValidatingAdmissionWebhook需要实现Validator接口

3、main.go

kubebuilder生成的main.go是整个项目的入口,代码如下:

var (
	scheme   = runtime.NewScheme()
	setupLog = ctrl.Log.WithName("setup")
)

func init() {
	utilruntime.Must(clientgoscheme.AddToScheme(scheme))

	utilruntime.Must(appsv1alpha1.AddToScheme(scheme))
	//+kubebuilder:scaffold:scheme
}

func main() {
	var metricsAddr string
	var enableLeaderElection bool
	var probeAddr string
	flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
	flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
	flag.BoolVar(&enableLeaderElection, "leader-elect", false,
		"Enable leader election for controller manager. "+
			"Enabling this will ensure there is only one active controller manager.")
	opts := zap.Options{
		Development: true,
	}
	opts.BindFlags(flag.CommandLine)
	flag.Parse()

	ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))

	// 1)init Manager
	mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
		Scheme:                 scheme,
		MetricsBindAddress:     metricsAddr,
		Port:                   9443,
		HealthProbeBindAddress: probeAddr,
		LeaderElection:         enableLeaderElection,
		LeaderElectionID:       "fcd03b9b.blog.com",
	})
	if err != nil {
		setupLog.Error(err, "unable to start manager")
		os.Exit(1)
	}

	// 2)init Controller
	if err = (&controllers.ApplicationReconciler{
		Client: mgr.GetClient(),
		Scheme: mgr.GetScheme(),
	}).SetupWithManager(mgr); err != nil {
		setupLog.Error(err, "unable to create controller", "controller", "Application")
		os.Exit(1)
	}
	//+kubebuilder:scaffold:builder

	if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
		setupLog.Error(err, "unable to set up health check")
		os.Exit(1)
	}
	if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
		setupLog.Error(err, "unable to set up ready check")
		os.Exit(1)
	}

	setupLog.Info("starting manager")
	// 3)start Manager
	if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
		setupLog.Error(err, "problem running manager")
		os.Exit(1)
	}
}

main.go主要逻辑如下:

  1. 初始化Manager
  2. 将Manager的Client传给Controller,并且调用SetupWithManager()方法传入Manager进行Controller的初始化
  3. 启动Manager

4、Manager

Manager是一个用于初始化共享依赖关系的接口,接口定义如下:

// pkg/manager/manager.go
// Manager初始化共享的依赖关系,比如Cache和Client,并将他们提供给Runnables
type Manager interface {
	// cluster中包含了Cache和Client
	cluster.Cluster

	// Add将在组件上设置所需的依赖关系,并在调用Start时启动组件
	Add(Runnable) error

	// Start启动所有已注册的控制器
	Start(ctx context.Context) error
	...
}

type Runnable interface {
  
	Start(context.Context) error
}

Manager可以管理Runnable的生命周期(添加/启动),Controller只是Runnable的一个特例

  1. 持有Runnable共同的依赖:client、cache、scheme等
  2. 提供了getter(例如GetClient()),还有一个简单的依赖注入机制(runtime/inject)
  3. 支持领导者选举,只需用选项指定即可,还提供了一个用于优雅关闭的信号处理程序
1)、Manager实例化
// pkg/manager/manager.go
func New(config *rest.Config, options Options) (Manager, error) {
	// 1)设置options属性的默认值
	options = setOptionsDefaults(options)

	// 2)创建cluster
	cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {
		clusterOptions.Scheme = options.Scheme
		clusterOptions.MapperProvider = options.MapperProvider
		clusterOptions.Logger = options.Logger
		clusterOptions.SyncPeriod = options.SyncPeriod
		clusterOptions.Namespace = options.Namespace
		clusterOptions.NewCache = options.NewCache
		clusterOptions.NewClient = options.NewClient
		clusterOptions.ClientDisableCacheFor = options.ClientDisableCacheFor
		clusterOptions.DryRunClient = options.DryRunClient
		clusterOptions.EventBroadcaster = options.EventBroadcaster //nolint:staticcheck
	})
	if err != nil {
		return nil, err
	}

	recorderProvider, err := options.newRecorderProvider(config, cluster.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster)
	if err != nil {
		return nil, err
	}

	leaderConfig := options.LeaderElectionConfig
	if leaderConfig == nil {
		leaderConfig = rest.CopyConfig(config)
	}
	resourceLock, err := options.newResourceLock(leaderConfig, recorderProvider, leaderelection.Options{
		LeaderElection:             options.LeaderElection,
		LeaderElectionResourceLock: options.LeaderElectionResourceLock,
		LeaderElectionID:           options.LeaderElectionID,
		LeaderElectionNamespace:    options.LeaderElectionNamespace,
	})
	if err != nil {
		return nil, err
	}

	metricsListener, err := options.newMetricsListener(options.MetricsBindAddress)
	if err != nil {
		return nil, err
	}

	metricsExtraHandlers := make(map[string]http.Handler)

	healthProbeListener, err := options.newHealthProbeListener(options.HealthProbeBindAddress)
	if err != nil {
		return nil, err
	}

	errChan := make(chan error)
	// 3)创建runnables
	runnables := newRunnables(errChan)

	// 4)将cluster和runnables赋值给controllerManager
	return &controllerManager{
		stopProcedureEngaged:          pointer.Int64(0),
		cluster:                       cluster,
		runnables:                     runnables,
		errChan:                       errChan,
		recorderProvider:              recorderProvider,
		resourceLock:                  resourceLock,
		metricsListener:               metricsListener,
		metricsExtraHandlers:          metricsExtraHandlers,
		controllerOptions:             options.Controller,
		logger:                        options.Logger,
		elected:                       make(chan struct{}),
		port:                          options.Port,
		host:                          options.Host,
		certDir:                       options.CertDir,
		webhookServer:                 options.WebhookServer,
		leaseDuration:                 *options.LeaseDuration,
		renewDeadline:                 *options.RenewDeadline,
		retryPeriod:                   *options.RetryPeriod,
		healthProbeListener:           healthProbeListener,
		readinessEndpointName:         options.ReadinessEndpointName,
		livenessEndpointName:          options.LivenessEndpointName,
		gracefulShutdownTimeout:       *options.GracefulShutdownTimeout,
		internalProceduresStop:        make(chan struct{}),
		leaderElectionStopped:         make(chan struct{}),
		leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel,
	}, nil
}

func newRunnables(errChan chan error) *runnables {
	// 包含Webhooks、Caches、需要支持领导者选举的、Others四种类型的RunnableGroup
	return &runnables{
		Webhooks:       newRunnableGroup(errChan),
		Caches:         newRunnableGroup(errChan),
		LeaderElection: newRunnableGroup(errChan),
		Others:         newRunnableGroup(errChan),
	}
}

New()方法用于Manager实例化,主要逻辑如下:

  1. 调用setOptionsDefaults(),设置options属性的默认值
  2. 创建cluster
  3. 创建runnables,runnables分为Webhooks、Caches、需要支持领导者选举的、Others四种类型的RunnableGroup
  4. 将cluster和runnables赋值给controllerManager,controllerManager结构体是Manager接口的一个具体实现
2)、Manager启动
// pkg/manager/internal.go
func (cm *controllerManager) Start(ctx context.Context) (err error) {
	cm.Lock()
	if cm.started {
		cm.Unlock()
		return errors.New("manager already started")
	}
	var ready bool
	defer func() {
		if !ready {
			cm.Unlock()
		}
	}()

	cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)

	stopComplete := make(chan struct{})
	defer close(stopComplete)
	defer func() {
		stopErr := cm.engageStopProcedure(stopComplete)
		if stopErr != nil {
			if err != nil {
				err = kerrors.NewAggregate([]error{err, stopErr})
			} else {
				err = stopErr
			}
		}
	}()

	if err := cm.add(cm.cluster); err != nil {
		return fmt.Errorf("failed to add cluster to runnables: %w", err)
	}

	if cm.metricsListener != nil {
		cm.serveMetrics()
	}

	if cm.healthProbeListener != nil {
		cm.serveHealthProbes()
	}

	// 启动webhooks的runnables
	if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil {
		if err != wait.ErrWaitTimeout {
			return err
		}
	}

	// 启动caches的runnables
	if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {
		if err != wait.ErrWaitTimeout {
			return err
		}
	}

	// 启动不需要领导者选举的runnables
	if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {
		if err != wait.ErrWaitTimeout {
			return err
		}
	}

	{
		ctx, cancel := context.WithCancel(context.Background())
		cm.leaderElectionCancel = cancel
		go func() {
			if cm.resourceLock != nil {
				// 启动领导者选举
				if err := cm.startLeaderElection(ctx); err != nil {
					cm.errChan <- err
				}
			} else {
				// 启动需要领导者选举的runnables
				if err := cm.startLeaderElectionRunnables(); err != nil {
					cm.errChan <- err
				}
				close(cm.elected)
			}
		}()
	}

	ready = true
	cm.Unlock()
	select {
	case <-ctx.Done():
		return nil
	case err := <-cm.errChan:
		return err
	}
}

Start()用于Manager启动,其实就是去启动所有添加到Manager中的Runnable(Controller)

3)、小结

1)创建Manager

  • 创建并注册scheme
  • 创建cluster
  • 为runnable创建map

2)注册Runnable

添加runnable到map

3)启动Manager

启动runnable

5、Controller

1)、Controller创建

kuebuilder生成的Controller的具体实现:

type ApplicationReconciler struct {
	client.Client
	Scheme *runtime.Scheme
}

// 实现Controller的业务逻辑
func (r *ApplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	_ = log.FromContext(ctx)

	return ctrl.Result{}, nil
}

// 将Controller添加到Manager中	
func (r *ApplicationReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&appsv1alpha1.Application{}).
		Complete(r)
}

ApplicationReconciler包含两个方法:

  1. Reconcile()用于实现Controller的业务逻辑
  2. SetupWithManager()将Controller添加到Manager中

SetupWithManager()中先调用了ctrl.NewControllerManagedBy()返回了一个新的控制器构造器Builder对象:

// pkg/builder/controller.go
// 控制器构造器
type Builder struct {
	forInput         ForInput
	ownsInput        []OwnsInput
	watchesInput     []WatchesInput
	mgr              manager.Manager
	globalPredicates []predicate.Predicate
	ctrl             controller.Controller
	ctrlOptions      controller.Options
	name             string
}

// ControllerManagedBy返回一个新的控制器构造器
func ControllerManagedBy(m manager.Manager) *Builder {
	return &Builder{mgr: m}
}

controller-runtime封装了一个Builde的结构体用来生成Controller,将Manager传递给这个构造器,然后是调用构造器的For()方法:

// pkg/builder/controller.go
// ForInput表示For方法设置的信息
type ForInput struct {
	object           client.Object
	predicates       []predicate.Predicate
	objectProjection objectProjection
	err              error
}

// For方法定义了reconciled的对象类型
// 并配置ControllerManagedBy通过调谐对象来响应create/delete/update事件
// 调用For函数相当于调用Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{})
func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder {
	if blder.forInput.object != nil {
		blder.forInput.err = fmt.Errorf("For(...) should only be called once, could not assign multiple objects for reconciliation")
		return blder
	}
	input := ForInput{object: object}
	for _, opt := range opts {
		opt.ApplyToFor(&input)
	}

	blder.forInput = input
	return blder
}

For()方法就是用来定义我们要处理的对象类型的,然后就是最重要的Complete()方法:

// pkg/builder/controller.go
func (blder *Builder) Complete(r reconcile.Reconciler) error {
	//  调用Build函数构建Controller
	_, err := blder.Build(r)
	return err
}

func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
	if r == nil {
		return nil, fmt.Errorf("must provide a non-nil Reconciler")
	}
	if blder.mgr == nil {
		return nil, fmt.Errorf("must provide a non-nil Manager")
	}
	if blder.forInput.err != nil {
		return nil, blder.forInput.err
	}
	if blder.forInput.object == nil {
		return nil, fmt.Errorf("must provide an object for reconciliation")
	}

	// 配置ControllerManagedBy
	if err := blder.doController(r); err != nil {
		return nil, err
	}

	// 配置Watch
	if err := blder.doWatch(); err != nil {
		return nil, err
	}

	return blder.ctrl, nil
}

Complete()方法通过调用Build函数来构建Controller,其中比较重要的就是doController()doWatch()两个方法,先来看doController()方法:

// pkg/builder/controller.go
func (blder *Builder) doController(r reconcile.Reconciler) error {
	globalOpts := blder.mgr.GetControllerOptions()

	ctrlOptions := blder.ctrlOptions
	if ctrlOptions.Reconciler == nil {
		ctrlOptions.Reconciler = r
	}

	// 获取gvk
	gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme())
	if err != nil {
		return err
	}

	if ctrlOptions.MaxConcurrentReconciles == 0 {
		groupKind := gvk.GroupKind().String()

		if concurrency, ok := globalOpts.GroupKindConcurrency[groupKind]; ok && concurrency > 0 {
			ctrlOptions.MaxConcurrentReconciles = concurrency
		}
	}

	if ctrlOptions.CacheSyncTimeout == 0 && globalOpts.CacheSyncTimeout != nil {
		ctrlOptions.CacheSyncTimeout = *globalOpts.CacheSyncTimeout
	}

	if ctrlOptions.Log.GetSink() == nil {
		ctrlOptions.Log = blder.mgr.GetLogger()
	}
	ctrlOptions.Log = ctrlOptions.Log.WithValues("reconciler group", gvk.Group, "reconciler kind", gvk.Kind)

	// 构造Controller
	blder.ctrl, err = newController(blder.getControllerName(gvk), blder.mgr, ctrlOptions)
	return err
}

doController()方法通过获取资源对象的GVK来获取Controller的名称,最后通过一个newController()方法来实例化一个真正的Controller:

// pkg/controller/controller.go
func New(name string, mgr manager.Manager, options Options) (Controller, error) {
	c, err := NewUnmanaged(name, mgr, options)
	if err != nil {
		return nil, err
	}

	// 将controller注册manager中
	return c, mgr.Add(c)
}

func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {
	if options.Reconciler == nil {
		return nil, fmt.Errorf("must specify Reconciler")
	}

	if len(name) == 0 {
		return nil, fmt.Errorf("must specify Name for Controller")
	}

	if options.Log.GetSink() == nil {
		options.Log = mgr.GetLogger()
	}

	if options.MaxConcurrentReconciles <= 0 {
		options.MaxConcurrentReconciles = 1
	}

	if options.CacheSyncTimeout == 0 {
		options.CacheSyncTimeout = 2 * time.Minute
	}

	if options.RateLimiter == nil {
		options.RateLimiter = workqueue.DefaultControllerRateLimiter()
	}

	// 在Reconciler中注入依赖关系
	if err := mgr.SetFields(options.Reconciler); err != nil {
		return nil, err
	}

	// 创建Controller并配置依赖关系
	return &controller.Controller{
		Do: options.Reconciler,
		MakeQueue: func() workqueue.RateLimitingInterface {
			return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
		},
		MaxConcurrentReconciles: options.MaxConcurrentReconciles,
		CacheSyncTimeout:        options.CacheSyncTimeout,
		SetFields:               mgr.SetFields,
		Name:                    name,
		Log:                     options.Log.WithName("controller").WithName(name),
		RecoverPanic:            options.RecoverPanic,
	}, nil
}

NewUnmanaged()方法是真正实例化Controller的地方,Controller实例化完成后,又通过mgr.Add()方法将Controller添加到Manager中去进行管理

Controller结构体定义如下:

// pkg/internal/controller/controller.go
type Controller struct {
  
	Name string

	// 可以运行的最大并发Reconciles数量,默认值为1
	MaxConcurrentReconciles int

	// 定义了Reconcile()方法,包含了Controller同步的业务逻辑
	// Reconcile()能在任意时刻被调用,接收一个对象的Name与Namespace,并同步集群当前实际状态至该对象被设置的期望状态
	Do reconcile.Reconciler

	// 用于在Controller启动时,创建工作队列
	MakeQueue func() workqueue.RateLimitingInterface

	Queue workqueue.RateLimitingInterface

	// 用来将依赖关系注入到其他对象,比如Sources、EventHandlers以及Predicates
	SetFields func(i interface{}) error

	mu sync.Mutex

	// Controller是否已经启动
	Started bool

	ctx context.Context

	CacheSyncTimeout time.Duration

	// 定义了一组watch操作的属性,会在Controller启动时,根据属性进行watch操作
	startWatches []watchDescription

	Log logr.Logger

	RecoverPanic bool
}

// watchDescription包含Event的源Source、event的入队方法EventHandler以及Event的过滤方法Predicate
type watchDescription struct {
	src        source.Source
	handler    handler.EventHandler
	predicates []predicate.Predicate
}
2)、Controller启动

Controller实现了Runnable接口的Start()方法,Controller注册到Manager之后,Manager启动的时候会调用Controller的Start()方法:

// pkg/internal/controller/controller.go
func (c *Controller) Start(ctx context.Context) error {
	c.mu.Lock()
	if c.Started {
		return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
	}

	c.initMetrics()

	c.ctx = ctx

	// 创建工作队列
	c.Queue = c.MakeQueue()
	go func() {
		<-ctx.Done()
		c.Queue.ShutDown()
	}()

	wg := &sync.WaitGroup{}
	err := func() error {
		defer c.mu.Unlock()

		defer utilruntime.HandleCrash()

		for _, watch := range c.startWatches {
			c.Log.Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))

			// 启动startWatches中的watch
			if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
				return err
			}
		}

		c.Log.Info("Starting Controller")

		for _, watch := range c.startWatches {
			syncingSource, ok := watch.src.(source.SyncingSource)
			if !ok {
				continue
			}

			if err := func() error {
				sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
				defer cancel()

				if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
					err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
					c.Log.Error(err, "Could not wait for Cache to sync")
					return err
				}

				return nil
			}(); err != nil {
				return err
			}
		}

		c.startWatches = nil

		c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
		wg.Add(c.MaxConcurrentReconciles)
		for i := 0; i < c.MaxConcurrentReconciles; i++ {
			go func() {
				defer wg.Done()
				// 启动workers来处理资源
				for c.processNextWorkItem(ctx) {
				}
			}()
		}

		c.Started = true
		return nil
	}()
	if err != nil {
		return err
	}

	<-ctx.Done()
	c.Log.Info("Shutdown signal received, waiting for all workers to finish")
	wg.Wait()
	c.Log.Info("All workers finished")
	return nil
}

Start()方法中调用c.processNextWorkItem()来启动workers来处理资源对象,processNextWorkItem()方法代码如下:

// pkg/internal/controller/controller.go
// processNextWorkItem将从工作队列中弹出一个元素,并尝试通过调用reconcileHandler来处理它
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
	// 从队列中弹出元素
	obj, shutdown := c.Queue.Get()
	if shutdown {
		return false
	}

	// 标记为处理完成
	defer c.Queue.Done(obj)

	ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
	defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)

	// 调用reconcileHandler进行元素处理
	c.reconcileHandler(ctx, obj)
	return true
}

func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
	reconcileStartTS := time.Now()
	defer func() {
		c.updateMetrics(time.Since(reconcileStartTS))
	}()

	req, ok := obj.(reconcile.Request)
	if !ok {
		c.Queue.Forget(obj)
		c.Log.Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj)
		return
	}

	log := c.Log.WithValues("name", req.Name, "namespace", req.Namespace)
	ctx = logf.IntoContext(ctx, log)

	// 调用Reconciler函数来处理这个元素,也就是我们真正去编写业务逻辑的地方
	result, err := c.Reconcile(ctx, req)
	switch {
	case err != nil:
		// 如果业务逻辑处理出错,重新添加到限速队列中去
		c.Queue.AddRateLimited(req)
		ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
		ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()
		log.Error(err, "Reconciler error")
	case result.RequeueAfter > 0:
		// 如果Reconcile处理结果中包含大于0的RequeueAfter,忘记元素,然后延迟加入队列
		c.Queue.Forget(obj)
		c.Queue.AddAfter(req, result.RequeueAfter)
		ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()
	case result.Requeue:
		// 重新加入队列
		c.Queue.AddRateLimited(req)
		ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()
	default:
		// 最后如果没有发生错误,就会Forget这个元素,这样直到发送另一个变化它就不会再被排队了
		c.Queue.Forget(obj)
		ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc()
	}
}

func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) {
	if c.RecoverPanic {
		defer func() {
			if r := recover(); r != nil {
				for _, fn := range utilruntime.PanicHandlers {
					fn(r)
				}
				err = fmt.Errorf("panic: %v [recovered]", r)
			}
		}()
	}
	log := c.Log.WithValues("name", req.Name, "namespace", req.Namespace)
	ctx = logf.IntoContext(ctx, log)
	return c.Do.Reconcile(ctx, req)
}

reconcileHandler()方法就是我们真正执行元素业务处理的地方,函数中包含了事件处理以及错误处理,真正的事件处理是通过c.Do.Reconcile()暴露给开发者的,所以对于开发者来说,只需要在Reconcile()方法中去处理业务逻辑就可以了

根据c.Do.Reconcile()方法的返回值来判断是否需要将元素重新加入队列进行处理:

  • 如果返回错误,则将元素重新添加到限速队列中
  • 如果返回的result.RequeueAfter > 0,则先将元素忘记,然后在result.RequeueAfter时间后加入到队列中
  • 如果返回result.Requeue,则直接将元素重新加入到限速队列中
  • 如果正常返回,则直接忘记这个元素

Controller启动流程:

3)、Controller监听事件

在讲解Controller创建的部分时提到,Complete()方法通过调用Build函数来构建Controller,里面会调用doController()doWatch()两个方法,doWatch()方法代码如下:

// pkg/builder/controller.go
func (blder *Builder) doWatch() error {
	// Reconcile type
	typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
	if err != nil {
		return err
	}
	// 创建source.Source
	src := &source.Kind{Type: typeForSrc}
	// 创建handler.EventHandler
	hdler := &handler.EnqueueRequestForObject{}
	allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
	// 调用Controller的Watch方法
	if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
		return err
	}

	for _, own := range blder.ownsInput {
		typeForSrc, err := blder.project(own.object, own.objectProjection)
		if err != nil {
			return err
		}
		src := &source.Kind{Type: typeForSrc}
		hdler := &handler.EnqueueRequestForOwner{
			OwnerType:    blder.forInput.object,
			IsController: true,
		}
		allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
		allPredicates = append(allPredicates, own.predicates...)
		if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
			return err
		}
	}

	for _, w := range blder.watchesInput {
		allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
		allPredicates = append(allPredicates, w.predicates...)

		if srckind, ok := w.src.(*source.Kind); ok {
			typeForSrc, err := blder.project(srckind.Type, w.objectProjection)
			if err != nil {
				return err
			}
			srckind.Type = typeForSrc
		}

		if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {
			return err
		}
	}
	return nil
}

doWatch()方法会调用Controller的Watch()方法,Controller的Watch()方法代码如下:

// pkg/internal/controller/controller.go
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	if err := c.SetFields(src); err != nil {
		return err
	}
	if err := c.SetFields(evthdler); err != nil {
		return err
	}
	for _, pr := range prct {
		if err := c.SetFields(pr); err != nil {
			return err
		}
	}

	// Controller还未启动时,将watchDescription添加到startWatches中,Controller启动时会启动startWatches中的watch
	if !c.Started {
		c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
		return nil
	}

	c.Log.Info("Starting EventSource", "source", src)
	return src.Start(c.ctx, evthdler, c.Queue, prct...)
}

此时Controller还未启动,会把watchDescription添加到startWatches中,在Controller启动时会启动startWatches中的watch,代码如下:

// pkg/internal/controller/controller.go
func (c *Controller) Start(ctx context.Context) error {
	c.mu.Lock()
	if c.Started {
		return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
	}

	c.initMetrics()

	c.ctx = ctx

	// 创建工作队列
	c.Queue = c.MakeQueue()
	go func() {
		<-ctx.Done()
		c.Queue.ShutDown()
	}()

	wg := &sync.WaitGroup{}
	err := func() error {
		defer c.mu.Unlock()

		defer utilruntime.HandleCrash()

		for _, watch := range c.startWatches {
			c.Log.Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))

      		// 1)启动startWatches中的watch
			if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
				return err
			}
		}

		c.Log.Info("Starting Controller")

		for _, watch := range c.startWatches {
			syncingSource, ok := watch.src.(source.SyncingSource)
			if !ok {
				continue
			}

			if err := func() error {
				sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
				defer cancel()

				if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
					err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
					c.Log.Error(err, "Could not wait for Cache to sync")
					return err
				}

				return nil
			}(); err != nil {
				return err
			}
		}

		c.startWatches = nil

		c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
		wg.Add(c.MaxConcurrentReconciles)
		for i := 0; i < c.MaxConcurrentReconciles; i++ {
			go func() {
				defer wg.Done()
				// 启动workers来处理资源
				for c.processNextWorkItem(ctx) {
				}
			}()
		}

		c.Started = true
		return nil
	}()
	if err != nil {
		return err
	}

	<-ctx.Done()
	c.Log.Info("Shutdown signal received, waiting for all workers to finish")
	wg.Wait()
	c.Log.Info("All workers finished")
	return nil
}

代码1)处可以看到最终是去调用的Source这个参数的Start()方法,Source是事件的源,如对资源对象的Create、Update、Delete操作,需要由event.EventHandlersreconcile.Requests入队列进行处理

  • 使用Kind来处理来自集群的事件(如资源对象的Create、Update、Delete操作)
  • 使用Channel来处理来自集群外部的事件(如GitHub Webhook回调、轮询外部URL)
// pkg/source/source.go
type Source interface {
	// Start是一个内部函数,只应该由Controller调用,向Informer注册一个EventHandler,将reconcile.Request放入队列
	Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}

Source是一个接口,doWatch()方法中可以看到传入的是source.Kind类型,该结构体就实现了source.Source接口:

// pkg/source/source.go
// Kind用于提供来自集群内部的事件源,这些事件来自于Watches(例如创建Pod事件)
type Kind struct {
	// Type是watch对象的类型,比如&v1.Pod{}
	Type client.Object

	// cache用于watch的API接口
	cache cache.Cache

	started     chan error
	startCancel func()
}

func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
	prct ...predicate.Predicate) error {
	if ks.Type == nil {
		return fmt.Errorf("must specify Kind.Type")
	}

	if ks.cache == nil {
		return fmt.Errorf("must call CacheInto on Kind before calling Start")
	}

	ctx, ks.startCancel = context.WithCancel(ctx)
	ks.started = make(chan error)
	go func() {
		var (
			i       cache.Informer
			lastErr error
		)

		if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) {
			// 从Cache中获取Informer
			i, lastErr = ks.cache.GetInformer(ctx, ks.Type)
			if lastErr != nil {
				kindMatchErr := &meta.NoKindMatchError{}
				if errors.As(lastErr, &kindMatchErr) {
					log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start",
						"kind", kindMatchErr.GroupKind)
				}
				return false, nil
			}
			return true, nil
		}); err != nil {
			if lastErr != nil {
				ks.started <- fmt.Errorf("failed to get informer from cache: %w", lastErr)
				return
			}
			ks.started <- err
			return
		}

		// 向Informer中添加EventHandler
		i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
		if !ks.cache.WaitForCacheSync(ctx) {
			ks.started <- errors.New("cache did not sync")
		}
		close(ks.started)
	}()

	return nil
}

watch.src.Start()实际调用了Kind.Start()方法,该方法实现的获取资源对象的Informer以及注册事件监听函数。其中调用的AddEventHandler()方法中传入了internal.EventHandler类型的实例,internal.EventHandler结构体实现了client-go中提供的ResourceEventHandler接口,也就是OnAdd()/OnUpdate()/OnDelete()几个方法:

// pkg/source/internal/eventsource.go
// EventHandler实现了client-go中的cache.ResourceEventHandler接口
type EventHandler struct {
	EventHandler handler.EventHandler
	Queue        workqueue.RateLimitingInterface
	Predicates   []predicate.Predicate
}

func (e EventHandler) OnAdd(obj interface{}) {
	// kubernetes对象被创建的事件
	c := event.CreateEvent{}

	if o, ok := obj.(client.Object); ok {
		c.Object = o
	} else {
		log.Error(nil, "OnAdd missing Object",
			"object", obj, "type", fmt.Sprintf("%T", obj))
		return
	}

	// Predicates用于事件过滤,循环调用Predicates的Create方法
	for _, p := range e.Predicates {
		if !p.Create(c) {
			return
		}
	}

	// 调用EventHandler的Create方法
	e.EventHandler.Create(c, e.Queue)
}

func (e EventHandler) OnUpdate(oldObj, newObj interface{}) {
	u := event.UpdateEvent{}

	if o, ok := oldObj.(client.Object); ok {
		u.ObjectOld = o
	} else {
		log.Error(nil, "OnUpdate missing ObjectOld",
			"object", oldObj, "type", fmt.Sprintf("%T", oldObj))
		return
	}

	if o, ok := newObj.(client.Object); ok {
		u.ObjectNew = o
	} else {
		log.Error(nil, "OnUpdate missing ObjectNew",
			"object", newObj, "type", fmt.Sprintf("%T", newObj))
		return
	}

	for _, p := range e.Predicates {
		if !p.Update(u) {
			return
		}
	}

	// 调用EventHandler的Update方法
	e.EventHandler.Update(u, e.Queue)
}

func (e EventHandler) OnDelete(obj interface{}) {
	d := event.DeleteEvent{}

	var ok bool
	if _, ok = obj.(client.Object); !ok {
		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
		if !ok {
			log.Error(nil, "Error decoding objects.  Expected cache.DeletedFinalStateUnknown",
				"type", fmt.Sprintf("%T", obj),
				"object", obj)
			return
		}

		obj = tombstone.Obj
	}

	if o, ok := obj.(client.Object); ok {
		d.Object = o
	} else {
		log.Error(nil, "OnDelete missing Object",
			"object", obj, "type", fmt.Sprintf("%T", obj))
		return
	}

	for _, p := range e.Predicates {
		if !p.Delete(d) {
			return
		}
	}

	// 调用EventHandler的Delete方法
	e.EventHandler.Delete(d, e.Queue)
}

EventHandler结构体实现了client-go中的cache.ResourceEventHandler接口,实现过程中调用了Predicates进行事件过滤,过滤后才是真正的事件处理,真正的事件处理通过handler.EventHandler处理的,这个函数通过前面的doWatch()方法可以看出来它是一个&handler.EnqueueRequestForObject{}对象,所以真正的事件处理逻辑是这个函数去实现的

// pkg/handler/enqueue.go
// EnqueueRequestForObject是一个包含了作为事件源的对象的Name和Namespace的入队列的Request
type EnqueueRequestForObject struct{}

func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
	if evt.Object == nil {
		enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
		return
	}
	// 添加一个Request对象到工作队列
	q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
		Name:      evt.Object.GetName(),
		Namespace: evt.Object.GetNamespace(),
	}})
}

func (e *EnqueueRequestForObject) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
	switch {
	// 如果新的meta对象不为空,添加到工作队列中
	case evt.ObjectNew != nil:
		q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
			Name:      evt.ObjectNew.GetName(),
			Namespace: evt.ObjectNew.GetNamespace(),
		}})
	// 如果旧的meta对象不为空,添加到工作队列中
	case evt.ObjectOld != nil:
		q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
			Name:      evt.ObjectOld.GetName(),
			Namespace: evt.ObjectOld.GetNamespace(),
		}})
	default:
		enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
	}
}

func (e *EnqueueRequestForObject) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
	if evt.Object == nil {
		enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt)
		return
	}
	// 添加一个Request对象到工作队列
	q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
		Name:      evt.Object.GetName(),
		Namespace: evt.Object.GetNamespace(),
	}})
}

通过EnqueueRequestForObject的Create()/Update()/Delete()实现可以看出我们放入到工作队列中的元素不是以前默认的元素唯一的KEY,而是经过封装的reconcile.Request对象,当然通过这个对象也可以很方便获取对象的唯一标识KEY

总结起来就是watch.src.Start()方法就是来实现Informer初始化以及事件监听函数的注册

在这里插入图片描述

Controller监听事件流程:

在这里插入图片描述

4)、小结

1)Controller是如何被Manager管理的?

作为组件,实现了Runnable的接口,由Manager的Add()方法将其加入Manager中,在Manager启动时,调用Start()方法启动

2)Controller是如何创建的?

通过controller-runtime提供的builder,构件好所需要的Controller选项,并调用Controller.New()方法

3)Reconcile是如何被调用的?

Controller.New()方法中,会将我们的Reconcile添加到Controller中,这样当workqueue中存在事件时,会交由Reconcile处理

4)Reconcile的参数和返回值如何处理?

参数被封装为reconcile.Request,其实就是namespace和name

返回值被封装为reconcile.Result,通过它我们可以控制元素是否重新入队

5)Controller是如何添加事件处理方法的?

在builder中的doWatch()方法会根据创建对应的handler,Kind类型的Source,当启动Controller时,启动Source,在Kind.Start()方法中会将handler注册到Cache的Informer中

6)Controller是如何过滤事件的?

在builder中,可以添加Predicates用来决定create、update、delete事件是否处理

7)Builder提供的其他能力?

Builder还支持通过WithOptions()设置Controller的Option,通过WithEventFilter()添加事件过滤方法

6、Cache

Cache负责同步Controller关心的资源,其核心是GVK->Informer的映射,Informer会负责监听对应GVK的GVRs的创建/删除/更新操作,以触发Controller的Reconcile逻辑,Cache接口定义如下:

// pkg/cache/cache.go
type Cache interface {
	// 用于从Cache中获取及列举Kubernetes集群的资源
	client.Reader

	// 为不同的GVK创建或获取对应的Informer,并将Index添加到对应的Informer中
	Informers
}

type Informers interface {
	GetInformer(ctx context.Context, obj client.Object) (Informer, error)

	GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error)

	Start(ctx context.Context) error

	WaitForCacheSync(ctx context.Context) bool

	client.FieldIndexer
}
1)、Cache依赖注入

如果想要被注入Cache依赖,需要实现InjectCache()方法,也就实现了Cache接口

// pkg/runtime/inject/inject.go
type Cache interface {
	InjectCache(cache cache.Cache) error
}

func CacheInto(c cache.Cache, i interface{}) (bool, error) {
	if s, ok := i.(Cache); ok {
		return true, s.InjectCache(c)
	}
	return false, nil
}

在Controller创建过程中,NewUnmanaged()方法是真正实例化Controller的地方,这里会调用controllerManager的SetFields()方法,在Reconciler中注入依赖关系,代码如下:

// pkg/controller/controller.go
func New(name string, mgr manager.Manager, options Options) (Controller, error) {
	c, err := NewUnmanaged(name, mgr, options)
	if err != nil {
		return nil, err
	}

	// 将controller注册manager中
	return c, mgr.Add(c)
}

func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {
	if options.Reconciler == nil {
		return nil, fmt.Errorf("must specify Reconciler")
	}

	if len(name) == 0 {
		return nil, fmt.Errorf("must specify Name for Controller")
	}

	if options.Log.GetSink() == nil {
		options.Log = mgr.GetLogger()
	}

	if options.MaxConcurrentReconciles <= 0 {
		options.MaxConcurrentReconciles = 1
	}

	if options.CacheSyncTimeout == 0 {
		options.CacheSyncTimeout = 2 * time.Minute
	}

	if options.RateLimiter == nil {
		options.RateLimiter = workqueue.DefaultControllerRateLimiter()
	}

	// 在Reconciler中注入依赖关系
	if err := mgr.SetFields(options.Reconciler); err != nil {
		return nil, err
	}

	// 创建Controller并配置依赖关系
	return &controller.Controller{
		Do: options.Reconciler,
		MakeQueue: func() workqueue.RateLimitingInterface {
			return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
		},
		MaxConcurrentReconciles: options.MaxConcurrentReconciles,
		CacheSyncTimeout:        options.CacheSyncTimeout,
		SetFields:               mgr.SetFields,
		Name:                    name,
		Log:                     options.Log.WithName("controller").WithName(name),
		RecoverPanic:            options.RecoverPanic,
	}, nil
}

controllerManager的SetFields()方法代码如下:

// pkg/manager/internal.go
func (cm *controllerManager) SetFields(i interface{}) error {
	// 调用cluster的SetFields
	if err := cm.cluster.SetFields(i); err != nil {
		return err
	}
	if _, err := inject.InjectorInto(cm.SetFields, i); err != nil {
		return err
	}
	if _, err := inject.StopChannelInto(cm.internalProceduresStop, i); err != nil {
		return err
	}
	if _, err := inject.LoggerInto(cm.logger, i); err != nil {
		return err
	}

	return nil
}
// pkg/cluster/internal.go
func (c *cluster) SetFields(i interface{}) error {
	if _, err := inject.ConfigInto(c.config, i); err != nil {
		return err
	}
	if _, err := inject.ClientInto(c.client, i); err != nil {
		return err
	}
	if _, err := inject.APIReaderInto(c.apiReader, i); err != nil {
		return err
	}
	if _, err := inject.SchemeInto(c.scheme, i); err != nil {
		return err
	}
	if _, err := inject.CacheInto(c.cache, i); err != nil {
		return err
	}
	if _, err := inject.MapperInto(c.mapper, i); err != nil {
		return err
	}
	return nil
}

controllerManager持有了Cache的实例,通过SetFields()方法将其注入到Controller中,Controller的Watch()方法中再将Cache注入到Source中

// pkg/internal/controller/controller.go
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	// 将cache注入到Source中
	if err := c.SetFields(src); err != nil {
		return err
	}
	if err := c.SetFields(evthdler); err != nil {
		return err
	}
	for _, pr := range prct {
		if err := c.SetFields(pr); err != nil {
			return err
		}
	}

	// Controller还未启动时,将watchDescription添加到startWatches中,Controller启动时会启动startWatches中的watch
	if !c.Started {
		c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
		return nil
	}

	c.Log.Info("Starting EventSource", "source", src)
	return src.Start(c.ctx, evthdler, c.Queue, prct...)
}
2)、Cache实例化

在Manager实例化时会创建cluster,创建cluster时会调用options.NewCache实例化Cache,默认是是调用cache的New方法进行实例化,代码如下:

// pkg/cache/cache.go
func New(config *rest.Config, opts Options) (Cache, error) {
	// 设置默认参数
	opts, err := defaultOpts(config, opts)
	if err != nil {
		return nil, err
	}
	selectorsByGVK, err := convertToSelectorsByGVK(opts.SelectorsByObject, opts.DefaultSelector, opts.Scheme)
	if err != nil {
		return nil, err
	}
	disableDeepCopyByGVK, err := convertToDisableDeepCopyByGVK(opts.UnsafeDisableDeepCopyByObject, opts.Scheme)
	if err != nil {
		return nil, err
	}
	// 初始化InformersMap
	im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, selectorsByGVK, disableDeepCopyByGVK)
	return &informerCache{InformersMap: im}, nil
}

NewInformersMap方法实现如下:

// pkg/cache/internal/deleg_map.go
func NewInformersMap(config *rest.Config,
	scheme *runtime.Scheme,
	mapper meta.RESTMapper,
	resync time.Duration,
	namespace string,
	selectors SelectorsByGVK,
	disableDeepCopy DisableDeepCopyByGVK,
) *InformersMap {
	// 为structured、unstructured、metadata分别构建InformersMap
	return &InformersMap{
		structured:   newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
		unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
		metadata:     newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),

		Scheme: scheme,
	}
}

func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
	namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK) *specificInformersMap {
	// 传入createStructuredListWatch函数,通过该函数对GVK进行List和Watch操作
	return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createStructuredListWatch)
}

createStructuredListWatch函数实现如下:

// pkg/cache/internal/informers_map.go
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
	mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
	if err != nil {
		return nil, err
	}

	client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs)
	if err != nil {
		return nil, err
	}
	listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
	listObj, err := ip.Scheme.New(listGVK)
	if err != nil {
		return nil, err
	}

	ctx := context.TODO()
	return &cache.ListWatch{
		ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
			ip.selectors(gvk).ApplyToList(&opts)
			res := listObj.DeepCopyObject()
			namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
			isNamespaceScoped := namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
			err := client.Get().NamespaceIfScoped(namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res)
			return res, err
		},
		WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
			ip.selectors(gvk).ApplyToList(&opts)
			opts.Watch = true
			namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
			isNamespaceScoped := namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
			return client.Get().NamespaceIfScoped(namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch(ctx)
		},
	}, nil
}

Cache实例化流程:

  1. NewInformersMap:为structured、unstructured、metadata分别构建InformersMap
  2. newStructuredInformersMap:该接口通过不同类型Object(structured、unstructured、metadata)与GVK的组合信息创建并缓存Informers
  3. 定义List-Watch函数:为3种不同类型的Object实现List-Watch函数,通过该函数可对GVK进行List和Watch操作

Cache的初始化流程中,Cache主要创建了InformersMap,Scheme中的每个GVK都会创建对应的Informers,再通过informersByGVKde Map,实现GVK到Informer的映射;每个Informer都会通过List-Watch函数对相应的GVK进行List和Watch操作

3)、Cache启动

Manager启动的时候执行Cache启动的启动,Manager的Start()方法代码如下:

// pkg/manager/internal.go
func (cm *controllerManager) Start(ctx context.Context) (err error) {
	cm.Lock()
	if cm.started {
		cm.Unlock()
		return errors.New("manager already started")
	}
	var ready bool
	defer func() {
		if !ready {
			cm.Unlock()
		}
	}()

	cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)

	stopComplete := make(chan struct{})
	defer close(stopComplete)
	defer func() {
		stopErr := cm.engageStopProcedure(stopComplete)
		if stopErr != nil {
			if err != nil {
				err = kerrors.NewAggregate([]error{err, stopErr})
			} else {
				err = stopErr
			}
		}
	}()

	// 将cluster注册manager中
	if err := cm.add(cm.cluster); err != nil {
		return fmt.Errorf("failed to add cluster to runnables: %w", err)
	}

	if cm.metricsListener != nil {
		cm.serveMetrics()
	}

	if cm.healthProbeListener != nil {
		cm.serveHealthProbes()
	}

	// 启动webhooks的runnables
	if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil {
		if err != wait.ErrWaitTimeout {
			return err
		}
	}

	// 启动caches的runnables
	if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {
		if err != wait.ErrWaitTimeout {
			return err
		}
	}

	// 启动不需要领导者选举的runnables
	if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {
		if err != wait.ErrWaitTimeout {
			return err
		}
	}

	{
		ctx, cancel := context.WithCancel(context.Background())
		cm.leaderElectionCancel = cancel
		go func() {
			if cm.resourceLock != nil {
				// 启动领导者选举
				if err := cm.startLeaderElection(ctx); err != nil {
					cm.errChan <- err
				}
			} else {s
				// 启动需要领导者选举的runnables
				if err := cm.startLeaderElectionRunnables(); err != nil {
					cm.errChan <- err
				}
				close(cm.elected)
			}
		}()
	}

	ready = true
	cm.Unlock()
	select {
	case <-ctx.Done():
		return nil
	case err := <-cm.errChan:
		return err
	}
}

Start()方法中先调用cm.add()将cluster注册到manager中,cluster中包含了cache,然后调用cm.runnables.Caches.Start()启动caches的runnables,就会启动注册的cluster,实际会调用InformersMap的Start()方法,这里的核心逻辑就是启动所有的Informer,代码如下:

// pkg/cache/internal/deleg_map.go
func (m *InformersMap) Start(ctx context.Context) error {
	go m.structured.Start(ctx)
	go m.unstructured.Start(ctx)
	go m.metadata.Start(ctx)
	<-ctx.Done()
	return nil
}
// pkg/cache/internal/informers_map.go
func (ip *specificInformersMap) Start(ctx context.Context) {
	func() {
		ip.mu.Lock()
		defer ip.mu.Unlock()

		ip.stop = ctx.Done()

		// 启动Informer
		for _, informer := range ip.informersByGVK {
			go informer.Informer.Run(ctx.Done())
		}

		ip.started = true
		close(ip.startWait)
	}()
	<-ctx.Done()
}

Controller监听事件中讲到,Controller会先向Informer注册特定资源的EventHandler,然后这里Cache会启动Informer,Informer向ApiServer发出请求,建立连接。当Informer检测到有资源变动后,使用Controller注册进来的EventHandler判断是否推入队列中

Cache启动流程:

在这里插入图片描述

7、Client

1)、Client实例化

在Manager实例化时会创建cluster,创建cluster时会调用options.NewClient实例化Client,默认是调用DefaultNewClient()方法进行实例化,代码如下:

// pkg/cluster/cluster.go
func DefaultNewClient(cache cache.Cache, config *rest.Config, options client.Options, uncachedObjects ...client.Object) (client.Client, error) {
	c, err := client.New(config, options)
	if err != nil {
		return nil, err
	}

	return client.NewDelegatingClient(client.NewDelegatingClientInput{
		CacheReader:     cache,
		Client:          c,
		UncachedObjects: uncachedObjects,
	})
}

DefaultNewClient()方法调用了client.NewDelegatingClient()方法,代码如下:

// pkg/client/split.go
func NewDelegatingClient(in NewDelegatingClientInput) (Client, error) {
	uncachedGVKs := map[schema.GroupVersionKind]struct{}{}
	for _, obj := range in.UncachedObjects {
		gvk, err := apiutil.GVKForObject(obj, in.Client.Scheme())
		if err != nil {
			return nil, err
		}
		uncachedGVKs[gvk] = struct{}{}
	}

	return &delegatingClient{
		scheme: in.Client.Scheme(),
		mapper: in.Client.RESTMapper(),
		Reader: &delegatingReader{
			CacheReader:       in.CacheReader,
			ClientReader:      in.Client,
			scheme:            in.Client.Scheme(),
			uncachedGVKs:      uncachedGVKs,
			cacheUnstructured: in.CacheUnstructured,
		},
		Writer:       in.Client,
		StatusClient: in.Client,
	}, nil
}

Reader中包含了CacheReader,实际是前面传入的Cache,Writer的赋值为Client。所以,调用Client时,读操作实际查询的是本地Cache,写操作直接访问API Server

2)、小结

在创建Manager时,会创建一个Cluster的组件,这个组件中会创建Cache和Client。在controller-runtime中,Cache和Client实际是对client-go中的Informer和RESTClient的封装。最后其他组件,比如Reconciler和Source就可以通过相应的setFields()方法将Cache、Client等依赖注入进去,从而使用它们

8、总结

controller-runtime整体工作流程:

首先Controller会先向Informer注册特定资源的eventHandler;然后Cache会启动Informer,Informer向APIServer发出请求,建立连接;当Informer检测到有资源变动后,使用Controller注册进来的eventHandler判断是否推入队列中;当队列中有元素被推入时,Controller会将元素取出,并执行用户侧的Reconciler

参考:

2022年最新k8s编程operator篇

kubebuilder 进阶: 源码分析

controller-runtime 之 manager 实现

controller-runtime 之控制器实现

Controller Runtime 的四种使用姿势

《云原生应用开发 Operator原理与实践》

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐