写在之前

今天开始开更controller-runtime的源码阅读,笔者建议大家在阅读前了解以下知识,可能会帮助大家更好的理解源码逻辑。

1.client-go的基础使用
2. 使用kubebuilder搭建一个简单的controller-runtime环境
3.informer的基本思想

1.源码环境搭建

参考链接:https://book.kubebuilder.io/cronjob-tutorial/cronjob-tutorial

2.源码阅读

2.1 万物伊始,问题的关键是定位关键的问题

首先定位controller的核心代码逻辑,main.go,如果你是使用kububuilder生成的代码,该代码在cmd文件夹下。排除掉杂七杂八的flag解析、日志实体初始化逻辑后,main方法的核心逻辑大概有分为三个步骤:

  1. 构建manage
  2. 向manage中注册自定义的Reconciler方法
  3. 启动manage
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
   Scheme:                 scheme,
   MetricsBindAddress:     metricsAddr,
   Port:                   9443,
   HealthProbeBindAddress: probeAddr,
   LeaderElection:         enableLeaderElection,
   LeaderElectionID:       "80807133.tutorial.kubebuilder.io",
   // LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily
   // when the Manager ends. This requires the binary to immediately end when the
   // Manager is stopped, otherwise, this setting is unsafe. Setting this significantly
   // speeds up voluntary leader transitions as the new leader don't have to wait
   // LeaseDuration time first.
   //
   // In the default scaffold provided, the program ends immediately after
   // the manager stops, so would be fine to enable this option. However,
   // if you are doing or is intended to do any operation such as perform cleanups
   // after the manager stops then its usage might be unsafe.
   // LeaderElectionReleaseOnCancel: true,
})
....
if err = (&controller.CronJobReconciler{
   Client: mgr.GetClient(),
   Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
   setupLog.Error(err, "unable to create controller", "controller", "CronJob")
   os.Exit(1)
}
......
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
   setupLog.Error(err, "problem running manager")
   os.Exit(1)
}

接下来我按照这三条链路逐步进行代码分析。

2.2 ctrl.NewManager

这个方法的注释写的是returns a new Manager for creating Controllers.创建controller的管理器,主要是一些初始化的逻辑,构建controllerManager结构体

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
   Scheme:                 scheme,
   MetricsBindAddress:     metricsAddr,
   Port:                   9443,
   HealthProbeBindAddress: probeAddr,
   LeaderElection:         enableLeaderElection,
   LeaderElectionID:       "80807133.tutorial.kubebuilder.io",
   // LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily
   // when the Manager ends. This requires the binary to immediately end when the
   // Manager is stopped, otherwise, this setting is unsafe. Setting this significantly
   // speeds up voluntary leader transitions as the new leader don't have to wait
   // LeaseDuration time first.
   //
   // In the default scaffold provided, the program ends immediately after
   // the manager stops, so would be fine to enable this option. However,
   // if you are doing or is intended to do any operation such as perform cleanups
   // after the manager stops then its usage might be unsafe.
   // LeaderElectionReleaseOnCancel: true,
})

上述方法的入参由两部分组成,一部分是ctrl.GetConfigOrDie(),一部分是options,因k8s的client初始化的时候需要加载kubeconfig,所以猜测方法一是kubeconfig加载的核心流程。现在开始追踪ctrl.GetConfigOrDie()去看看kubeconfig是如何加载。

2.2.1 kubeconfig的加载逻辑

我们沿着代码执行链路一步步追踪加载kubeconfig的具体的实现位置。

// 这里是核心逻辑
func loadConfig(context string) (config *rest.Config, configErr error) {
   // If a flag is specified with the config location, use that
   // 1.这里的kubeconfig是哪里来的呢,这个参数不是当前function的私有参数,我们可以追踪这个参数的初始化位置,可以查询这个参数的来源
   if len(kubeconfig) > 0 {
       // loadConfigWithContext 这个方法似乎是加载client的核心逻辑,整个方法中引用了两次
      return loadConfigWithContext("", &clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfig}, context)
   }

   // 如果flag中没有传递kubeconfig,那么就从环境变量中获取kubeconfig文件的所在处,通过文件初始化rest.config
   //RecommendedConfigPathEnvVar = "KUBECONFIG",获取环境变量指向的kubeconfig所在位置
   kubeconfigPath := os.Getenv(clientcmd.RecommendedConfigPathEnvVar)
   if len(kubeconfigPath) == 0 {
        // 从容器中获取token初始化rest.config,参考2
      c, err := loadInClusterConfig()
      if err == nil {
         return c, nil
      }

      defer func() {
         if configErr != nil {
            log.Error(err, "unable to load in-cluster config")
         }
      }()
   }

   // 这里定义了kubeconfig加载的规则,会遍历~/.kube或者环境变量中定义的KUBECONFIG路径去加载配置文件
   loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
   if _, ok := os.LookupEnv("HOME"); !ok {
      u, err := user.Current()
      if err != nil {
         return nil, fmt.Errorf("could not get current user: %w", err)
      }
      loadingRules.Precedence = append(loadingRules.Precedence, filepath.Join(u.HomeDir, clientcmd.RecommendedHomeDir, clientcmd.RecommendedFileName))
   }
   // 这里是核心的client初始化逻辑,参考3.
   return loadConfigWithContext("", loadingRules, context)
}

1)在当前代码文件中检索到了kubeconfig这个参数的初始化逻辑,该参数是从名为kubeconfig的flag中解析获得的,只要在operator的启动命令中传递了kubeconfig的flag标识,就可以解析到逻辑中

func init() {
   RegisterFlags(flag.CommandLine)
}

const KubeconfigFlagName = "kubeconfig"
func RegisterFlags(fs *flag.FlagSet) {
   if fs == nil {
      fs = flag.CommandLine
   }
   // KubeconfigFlagName的值是kubeconfig
   if f := fs.Lookup(KubeconfigFlagName); f != nil {
      kubeconfig = f.Value.String()
   } else {
      fs.StringVar(&kubeconfig, KubeconfigFlagName, "", "Paths to a kubeconfig. Only required if out-of-cluster.")
   }
}

2) 从pod中初始化rest.config

//InClusterConfig 返回一个配置对象,该对象使用 kubernetes 提供给 pod 的服务帐户。
//它适用于希望在 kubernetes 上运行的 pod 内运行的客户端。如果从不在 kubernetes 环境中运行的进程调用,
//它将返回 ErrNotInCluster。
func InClusterConfig() (*Config, error) {
   const (
      tokenFile  = "/var/run/secrets/kubernetes.io/serviceaccount/token"
      rootCAFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
   )
   // 如果好奇这两个值的含义,我可以在后文中贴上在pod中检索这两个环境变量的贴图
   host, port := os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT")
   if len(host) == 0 || len(port) == 0 {
      return nil, ErrNotInCluster
   }

   token, err := os.ReadFile(tokenFile)
   if err != nil {
      return nil, err
   }

   tlsClientConfig := TLSClientConfig{}

   if _, err := certutil.NewPool(rootCAFile); err != nil {
       // 这一步主要是验证这个证书路径是否合法
      klog.Errorf("Expected to load root CA config from %s, but got err: %v", rootCAFile, err)
   } else {
      tlsClientConfig.CAFile = rootCAFile
   }

   // 返回rest.config
   return &Config{
      // TODO: switch to using cluster DNS.
      Host:            "https://" + net.JoinHostPort(host, port), //这里是吧host:port做ipv4和ipv6的格式转换
      TLSClientConfig: tlsClientConfig,
      BearerToken:     string(token),
      BearerTokenFile: tokenFile,
   }, nil
}

在这里插入图片描述
在这里插入图片描述
3)restconfig初始化逻辑
这里是client go初始化rest.config的标准function,这里不做解读了。

func loadConfigWithContext(apiServerURL string, loader clientcmd.ClientConfigLoader, context string) (*rest.Config, error) {
   return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
      loader,
      &clientcmd.ConfigOverrides{
         ClusterInfo: clientcmdapi.Cluster{
            Server: apiServerURL,
         },
         CurrentContext: context,
      }).ClientConfig()
}

2.2.2 开始探索ctrl.NewManager 这个方法

这个方法的注释写的是returns a new Manager for creating Controllers.创建controller的管理器,主要是一些初始化的逻辑,构建controllerManager结构体.

func New(config *rest.Config, options Options) (Manager, error) {
   // Set default values for options fields
   // 2.1.设置options的默认值,这里是对于options中未被用户显示传入的参数进行进行默认值赋值
   options = setOptionsDefaults(options)
   // 2.2这里是对cluster默认初始化,cluster类就是与集群进行交互的实体类
   cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {
      clusterOptions.Scheme = options.Scheme
      clusterOptions.MapperProvider = options.MapperProvider
      clusterOptions.Logger = options.Logger
      clusterOptions.SyncPeriod = options.SyncPeriod
      clusterOptions.Namespace = options.Namespace
      clusterOptions.NewCache = options.NewCache
      clusterOptions.NewClient = options.NewClient
      clusterOptions.ClientDisableCacheFor = options.ClientDisableCacheFor
      clusterOptions.DryRunClient = options.DryRunClient
      clusterOptions.EventBroadcaster = options.EventBroadcaster //nolint:staticcheck
   })
   if err != nil {
      return nil, err
   }
   ......
   return &controllerManager{
      stopProcedureEngaged:          pointer.Int64(0),
      cluster:                       cluster,
      runnables:                     runnables,
      errChan:                       errChan,
      recorderProvider:              recorderProvider,
      resourceLock:                  resourceLock,
      metricsListener:               metricsListener,
      metricsExtraHandlers:          metricsExtraHandlers,
      controllerOptions:             options.Controller,
      logger:                        options.Logger,
      elected:                       make(chan struct{}),
      port:                          options.Port,
      host:                          options.Host,
      certDir:                       options.CertDir,
      tlsOpts:                       options.TLSOpts,
      webhookServer:                 options.WebhookServer,
      leaderElectionID:              options.LeaderElectionID,
      leaseDuration:                 *options.LeaseDuration,
      renewDeadline:                 *options.RenewDeadline,
      retryPeriod:                   *options.RetryPeriod,
      healthProbeListener:           healthProbeListener,
      readinessEndpointName:         options.ReadinessEndpointName,
      livenessEndpointName:          options.LivenessEndpointName,
      gracefulShutdownTimeout:       *options.GracefulShutdownTimeout,
      internalProceduresStop:        make(chan struct{}),
      leaderElectionStopped:         make(chan struct{}),
      leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel,
   }, nil
}

2.3 SetupWithManager 注册自定义的Reconciler

我们首先把Reconciler的接口定义贴上来,这是整个opeator中为数不多的需要自定义编码的位置,这里的reconcile是用户针对指定k8s资源的变动事件(增、删除、改)的自定义处理步骤,你可以理解为informer的eventHandler中的update、add、delete处理逻辑都放在 Reconcile(context.Context, Request) (Result, error)的方法实现中,由用户自己判断资源object处于哪一类事件的状态中,并执行相应的处理逻辑。

type Reconciler interface {
   Reconcile(context.Context, Request) (Result, error)
}

在main方法中的应用位置是:

if err = (&controller.CronJobReconciler{
   Client: mgr.GetClient(),
   Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
   setupLog.Error(err, "unable to create controller", "controller", "CronJob")
   os.Exit(1)
}

我们追踪SetupWithManager的代码逻辑,从NewControllerManagedBy到For,再到Owns都是Builder这个结构体的构建过程,这里不做展开,这里只介绍Complete的代码逻辑。

func (r *CronJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
  .....

   // 核心逻辑
   return ctrl.NewControllerManagedBy(mgr).
      For(&batchv1.CronJob{}).
      Owns(&kbatch.Job{}).
      Complete(r)
}

跳过一些冗余的代码逻辑,我们快进到核心逻辑。在下面的Build方法中,主要执行了两块核心逻辑:

  • 核心逻辑一:初始化controller
  • 核心逻辑二:初始化watch逻辑
func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
   if r == nil {
      return nil, fmt.Errorf("must provide a non-nil Reconciler")
   }
   if blder.mgr == nil {
      return nil, fmt.Errorf("must provide a non-nil Manager")
   }
   if blder.forInput.err != nil {
      return nil, blder.forInput.err
   }

   // Set the ControllerManagedBy
   // 核心逻辑一:初始化controller
   if err := blder.doController(r); err != nil {
      return nil, err
   }

   // Set the Watch
    // 核心逻辑二:初始化watch逻辑
   if err := blder.doWatch(); err != nil {
      return nil, err
   }

   return blder.ctrl, nil
}

我们沿着这两条链路逐条进行分析。

2.3.1 controller的初始化

func (blder *Builder) doController(r reconcile.Reconciler) error {
    ...... 构建ctrlOptions
   // Build the controller and return.
   // 初始化controller
   blder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions)
   return err
}

继续深入newController看一下这个结构体是怎么初始化的,下面是核心实现代码,主要做了两件事情

  • 1.初始化一个controller结构体
  • 2.把controller添加到了controllerManage中

func New(name string, mgr manager.Manager, options Options) (Controller, error) {
   c, err := NewUnmanaged(name, mgr, options)
   if err != nil {
      return nil, err
   }

   // Add the controller as a Manager components
   return c, mgr.Add(c)
}

mgr.Add©这个逻辑比较简单,就是把新生成的controller添加到manager中的缓存结构体中,我们来看看NewUnmanaged中的controller
初始化字段有哪些:

func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {

   // Create controller with dependencies set
   return &controller.Controller{
       // 这个就是自定义的Reconciler
      Do: options.Reconciler,
      // 这个有一个队列,记住,后面要考
      MakeQueue: func() workqueue.RateLimitingInterface {
         return workqueue.NewRateLimitingQueueWithConfig(options.RateLimiter, workqueue.RateLimitingQueueConfig{
            Name: name,
         })
      },
      MaxConcurrentReconciles: options.MaxConcurrentReconciles,
      CacheSyncTimeout:        options.CacheSyncTimeout,
      Name:                    name,
      LogConstructor:          options.LogConstructor,
      RecoverPanic:            options.RecoverPanic,
      LeaderElected:           options.NeedLeaderElection,
   }, nil
}

2.3.2 doWatch初始化watch逻辑

代码实现展示在下文,这里有两个问题没有解释,一个是
source.Kind这个结构体有什么作用,一个就是allPredicates这个实体也没有解释。我们先留个坑,目前只有定义,在实际调用的时候我们在进行解释。

func (blder *Builder) doWatch() error {
   // Reconcile type
   if blder.forInput.object != nil {
      obj, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
      if err != nil {
         return err
      }
      // 这一个是什么source?
      src := source.Kind(blder.mgr.GetCache(), obj)
      // 这里是事件处理的eventHandler结构体,定义了队列处理逻辑
      hdler := &handler.EnqueueRequestForObject{}
      allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
      allPredicates = append(allPredicates, blder.forInput.predicates...)
      // 看起来这里是核心逻辑执行的位置
      if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
         return err
      }
   }
   ......后续逻辑大同小异



我们继续深入blder.ctrl.Watch的逻辑中

func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
   c.mu.Lock()
   defer c.mu.Unlock()

   if !c.Started {
      // 首次调用这个方法的时候进入到这个逻辑中,看起来只是把src、handler、predicate包装成一个机构体存储在watches中
      c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
      return nil
   }

   c.LogConstructor(nil).Info("Starting EventSource", "source", src)
   return src.Start(c.ctx, evthdler, c.Queue, prct...)
}

关于下面的src.Start,这个步骤虽然在我们的核心流程中没有执行,但是这个方法的实现似乎有助于我们理解source.Kind这个数据结构,我们就浪费点时间,进入到start中看一下:

  • 启动了一个定时任务,循环检索献相应的informer是否已经被声明
  • 往这个informer中添加handler的处理逻辑
func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
   prct ...predicate.Predicate) error {
  ....省略无关逻辑
   go func() {
      var (
         i       cache.Informer
         lastErr error
      )

      // Tries to get an informer until it returns true,
      // 这个步骤一直循环在寻找一个相应的informer
      if err := wait.PollUntilContextCancel(ctx, 10*time.Second, true, func(ctx context.Context) (bool, error) {
         // Lookup the Informer from the Cache and add an EventHandler which populates the Queue
         i, lastErr = ks.Cache.GetInformer(ctx, ks.Type)
         if lastErr != nil {
            kindMatchErr := &meta.NoKindMatchError{}
            switch {
            case errors.As(lastErr, &kindMatchErr):
               log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start",
                  "kind", kindMatchErr.GroupKind)
            case runtime.IsNotRegisteredError(lastErr):
               log.Error(lastErr, "kind must be registered to the Scheme")
            default:
               log.Error(lastErr, "failed to get informer from cache")
            }
            return false, nil // Retry.
         }
         return true, nil
      }); err != nil {
         if lastErr != nil {
            ks.started <- fmt.Errorf("failed to get informer from cache: %w", lastErr)
            return
         }
         ks.started <- err
         return
      }
      // 对informer添加handler处理逻辑
      _, err := i.AddEventHandler(NewEventHandler(ctx, queue, handler, prct).HandlerFuncs())
      if err != nil {
         ks.started <- err
         return
      }
      if !ks.Cache.WaitForCacheSync(ctx) {
         // Would be great to return something more informative here
         ks.started <- errors.New("cache did not sync")
      }
      close(ks.started)
   }()

   return nil
}

看到这里,可能大家已经迷糊了,不要着急,我们一步一步的进行分析,因为不仅是大家迷糊,我也迷糊,这乱七八糟的是什么啊。首先看一下这个cache的真实实现是什么?

func New(cfg *rest.Config, opts Options) (Cache, error) {
  ....配置项解析
   newCacheFunc := newCache(cfg, opts)
   var defaultCache Cache
   if len(opts.DefaultNamespaces) > 0 {
       //这一步暂时忽略
      defaultConfig := optionDefaultsToConfig(&opts)
      defaultCache = newMultiNamespaceCache(newCacheFunc, opts.Scheme, opts.Mapper, opts.DefaultNamespaces, &defaultConfig)
   } else {
       // 这一步就是构建的informerCache的实体类
      defaultCache = newCacheFunc(optionDefaultsToConfig(&opts), corev1.NamespaceAll)
   }

   if len(opts.ByObject) == 0 {
      return defaultCache, nil
   }

// 分类将不同资源的informer对象保存在这个结构体中
   delegating := &delegatingByGVKCache{
      scheme:       opts.Scheme,
      //这里的ByObject已经通过追踪是在初始化的options中的cache字段配置的
      caches:       make(map[schema.GroupVersionKind]Cache, len(opts.ByObject)),
      defaultCache: defaultCache,
   }

   for obj, config := range opts.ByObject {
      gvk, err := apiutil.GVKForObject(obj, opts.Scheme)
      if err != nil {
         return nil, fmt.Errorf("failed to get GVK for type %T: %w", obj, err)
      }
      var cache Cache
      if len(config.Namespaces) > 0 {
         cache = newMultiNamespaceCache(newCacheFunc, opts.Scheme, opts.Mapper, config.Namespaces, nil)
      } else {
         cache = newCacheFunc(byObjectToConfig(config), corev1.NamespaceAll)
      }
      delegating.caches[gvk] = cache
   }

   return delegating, nil
}

//这个方法就是构建一个informerCache的实体类
func newCache(restConfig *rest.Config, opts Options) newCacheFunc {
   return func(config Config, namespace string) Cache {
      return &informerCache{
         scheme: opts.Scheme,
         Informers: internal.NewInformers(restConfig, &internal.InformersOpts{
            HTTPClient:   opts.HTTPClient,
            Scheme:       opts.Scheme,
            Mapper:       opts.Mapper,
            ResyncPeriod: *opts.SyncPeriod,
            Namespace:    namespace,
            Selector: internal.Selector{
               Label: config.LabelSelector,
               Field: config.FieldSelector,
            },
            Transform:             config.Transform,
            UnsafeDisableDeepCopy: pointer.BoolDeref(config.UnsafeDisableDeepCopy, false),
            NewInformer:           opts.newInformer,
         }),
         readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer,
      }
   }
}

这一步骤看起来和sharedInformerFactory比较像,里面定义了一个map缓存,保存了GVK和对应的informer的单例实现。现在我们可以根据delegatingByGVKCache来寻找GetInformer的具体实现方法是什么了,从下面的三个方法来看,这一步骤主要是用来检索实体结构体对应的informerCache实体。

func (dbt *delegatingByGVKCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {
   // 这一步是从delegatingByGVKCache的map缓存中获取informerCache的结构体
   cache, err := dbt.cacheForObject(obj)
   if err != nil {
      return nil, err
   }
   // 这一步真正实现是在informerCache的方法中体现的
   return cache.GetInformer(ctx, obj, opts...)
}
func (dbt *delegatingByGVKCache) cacheForObject(o runtime.Object) (Cache, error) {
   gvk, err := apiutil.GVKForObject(o, dbt.scheme)
   if err != nil {
      return nil, err
   }
   gvk.Kind = strings.TrimSuffix(gvk.Kind, "List")
   return dbt.cacheForGVK(gvk), nil
}

func (dbt *delegatingByGVKCache) cacheForGVK(gvk schema.GroupVersionKind) Cache {
   if specific, hasSpecific := dbt.caches[gvk]; hasSpecific {
      return specific
   }

   return dbt.defaultCache
}


func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {
   gvk, err := apiutil.GVKForObject(obj, ic.scheme)
   if err != nil {
      return nil, err
   }

// 这一步有兴趣的读者可以深入了解,这里主要是进行数据的转换,转换的结果就是返回了一个sharedIndexedInformer
   _, i, err := ic.Informers.Get(ctx, gvk, obj, applyGetOptions(opts...))
   if err != nil {
      return nil, err
   }
   return i.Informer, nil
}

到这里,似乎我们就可以继续分析_, err := i.AddEventHandler(NewEventHandler(ctx, queue, handler, prct).HandlerFuncs()),这行的代码逻辑了。这里就和informer串起来了。


func NewEventHandler(ctx context.Context, queue workqueue.RateLimitingInterface, handler handler.EventHandler, predicates []predicate.Predicate) *EventHandler {
   return &EventHandler{
      ctx:        ctx,
      handler:    handler,
      queue:      queue,
      predicates: predicates,
   }
}
func (e *EventHandler) HandlerFuncs() cache.ResourceEventHandlerFuncs {
   return cache.ResourceEventHandlerFuncs{
      AddFunc:    e.OnAdd,
      UpdateFunc: e.OnUpdate,
      DeleteFunc: e.OnDelete,
   }
}

这三个处理逻辑是差不多的,我们看一下这里OnAdd这个方法的实现逻辑,以点代面去看一下具体的执行逻辑。

// OnAdd creates CreateEvent and calls Create on EventHandler.
func (e *EventHandler) OnAdd(obj interface{}) {
   c := event.CreateEvent{}

   // Pull Object out of the object
   if o, ok := obj.(client.Object); ok {
      c.Object = o
   } else {
      log.Error(nil, "OnAdd missing Object",
         "object", obj, "type", fmt.Sprintf("%T", obj))
      return
   }

  //上面的内容索然无味,这里值得注意一下,先放直接放结论,之前我们没有分析predicates这个结构体,看起来这里是filer,用来过滤一些不关注的事件
   for _, p := range e.predicates {
      if !p.Create(c) {
         return
      }
   }

   // Invoke create handler
   ctx, cancel := context.WithCancel(e.ctx)
   defer cancel()
   // 
   e.handler.Create(ctx, c, e.queue)
}

既然提到了,我们去找一下这个predicates的实现位置,他是在Builder结构体提供的一个WithEventFilter方法设置的,具体的使用方式的构建一个predicate.Predicate实体类,在这个实体类中定义不同事件类型的filter逻辑,可以过滤掉一些我们不关系的变更事件。

type Predicate interface {
   // Create returns true if the Create event should be processed
   Create(event.CreateEvent) bool

   // Delete returns true if the Delete event should be processed
   Delete(event.DeleteEvent) bool

   // Update returns true if the Update event should be processed
   Update(event.UpdateEvent) bool

   // Generic returns true if the Generic event should be processed
   Generic(event.GenericEvent) bool
}

我们继续追踪 e.handler.Create(ctx, c, e.queue)这个方法,看看里面发生了什么,这里的handler的真实实现位置是hdler := &handler.EnqueueRequestForObject{}。

// Create implements EventHandler.
func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) {
   if evt.Object == nil {
      enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
      return
   }
   //这里是往队列中添加了一个request事件
   q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
      Name:      evt.Object.GetName(),
      Namespace: evt.Object.GetNamespace(),
   }})
}

现在我们可以总结一下这里的source.start做了一些什么事情了。当事件的informer监听到资源发生变换时,会触发一个handler.EnqueueRequestForObject{}的事件处理逻辑,将这个事封装成reconcile.Request{}结构体放置到controller对应的限速队列中去。

2.4 mgr.Start(ctrl.SetupSignalHandler())

启动controllerManage。我们仅截取关键代码进行分析。

2.4.1 cluster初始化=informer初始化

....
// Start and wait for caches.
if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {
   if err != nil {
      return fmt.Errorf("failed to start caches: %w", err)
   }
}

这里的cluster我们在上文中已经见识过他的初始化逻辑了。

return &cluster{
   config:           originalConfig,
   httpClient:       options.HTTPClient,
   scheme:           options.Scheme,
   cache:            cache,
   fieldIndexes:     cache,
   client:           clientWriter,
   apiReader:        clientReader,
   recorderProvider: recorderProvider,
   mapper:           mapper,
   logger:           options.Logger,
}, nil

我们来看一下这里的cm.add做了些什么事情,他往cm的runnables这个结构体中添加了一个Runnable实体,Runnable接口中只包含了一个Start()方法。

func (cm *controllerManager) add(r Runnable) error {
   return cm.runnables.Add(r)
}

我们先看一下runnables里面的定义,看起来是对Runnable进行了分类存放。

type runnables struct {
   HTTPServers    *runnableGroup
   Webhooks       *runnableGroup
   Caches         *runnableGroup
   LeaderElection *runnableGroup
   Others         *runnableGroup
}

type runnableGroup struct {
   ctx    context.Context
   cancel context.CancelFunc

   start        sync.Mutex
   startOnce    sync.Once
   started      bool
   startQueue   []*readyRunnable
   startReadyCh chan *readyRunnable

   stop     sync.RWMutex
   stopOnce sync.Once
   stopped  bool

   // errChan is the error channel passed by the caller
   // when the group is created.
   // All errors are forwarded to this channel once they occur.
   errChan chan error

   // ch is the internal channel where the runnables are read off from.
   ch chan *readyRunnable

   // wg is an internal sync.WaitGroup that allows us to properly stop
   // and wait for all the runnables to finish before returning.
   wg *sync.WaitGroup
}

我们看一下Add方法,印证了我们之前的猜测

func (r *runnables) Add(fn Runnable) error {
   switch runnable := fn.(type) {
   case *server:
      return r.HTTPServers.Add(fn, nil)
   case hasCache:
      return r.Caches.Add(fn, func(ctx context.Context) bool {
         return runnable.GetCache().WaitForCacheSync(ctx)
      })
   case webhook.Server:
      return r.Webhooks.Add(fn, nil)
   case LeaderElectionRunnable:
      if !runnable.NeedLeaderElection() {
         return r.Others.Add(fn, nil)
      }
      return r.LeaderElection.Add(fn, nil)
   default:
      return r.LeaderElection.Add(fn, nil)
   }
}

cluster属于hasCache的实现,我们看看他做了些什么事情。把runnable和runnale启动检验的逻辑包装到readyRunnable这个实体,然后做了两件事情:

  • 如果没有启动,把runnable放到startQueue这个队列中
  • 如果启动了,把runnable放到 r.ch这个channel通道中

接下来就是启动的步骤,我们看看start中做了些什么事情。

func (r *runnableGroup) Start(ctx context.Context) error {
   var retErr error

   r.startOnce.Do(func() {
      defer close(r.startReadyCh)

      // Start the internal reconciler.
      // 启动reconcile,这里启动所有的自定义逻辑
      go r.reconcile()

      // Start the group and queue up all
      // the runnables that were added prior.
      r.start.Lock()
      r.started = true
      //还记得上文Add逻辑吗,这里是将startQueue中的readyRunnable实体塞到ch这个channel中
      for _, rn := range r.startQueue {
         rn.signalReady = true
         r.ch <- rn
      }
      r.start.Unlock()

      // If we don't have any queue, return.
      if len(r.startQueue) == 0 {
         return
      }

      // Wait for all runnables to signal.
      // 判断是否所有的runnables都已经启动
      for {
         select {
         case <-ctx.Done():
            if err := ctx.Err(); !errors.Is(err, context.Canceled) {
               retErr = err
            }
            // 这里判断readyRunnable是不是已经启动了,如果启动了就从startQueue中删除
         case rn := <-r.startReadyCh:
            for i, existing := range r.startQueue {
               if existing == rn {
                  // Remove the item from the start queue.
                  r.startQueue = append(r.startQueue[:i], r.startQueue[i+1:]...)
                  break
               }
            }
            // We're done waiting if the queue is empty, return.
            if len(r.startQueue) == 0 {
               return
            }
         }
      }
   })

   return retErr
}

我们回过头来分析一下,r.reconcile()做了些什么事情

func (r *runnableGroup) reconcile() {
   for runnable := range r.ch {
     ....
      // Start the runnable.
      go func(rn *readyRunnable) {
         go func() {
             //如果检查执行完毕后
            if rn.Check(r.ctx) {
               if rn.signalReady {
                   //传入到startReadyCh的channel中来
                  r.startReadyCh <- rn
               }
            }
         }()
         .....
         // 执行start函数
         if err := rn.Start(r.ctx); err != nil {
            r.errChan <- err
         }
      }(runnable)
   }
}

这里cluster的start函数执行逻辑是

func (c *cluster) Start(ctx context.Context) error {
   defer c.recorderProvider.Stop(ctx)
   return c.cache.Start(ctx)
}

我们找到这个start的具体实现逻辑,追踪下去,我们就见到了核心的informer.Run(ip.ctx.Done()),这个就是原生的informer的用法。

2.4.2 自定义的的reconciler的启动

如果还记得SetUpWithManager的逻辑,我们知道自定义的reconciler被包装成了Controller实体放到了cm.runnables.LeaderElection这个分组中了,如果,我们继续追踪Start方法内的代码逻辑.

{
   ctx, cancel := context.WithCancel(context.Background())
   cm.leaderElectionCancel = cancel
   go func() {
       // 如果没有抢占资源锁,就继续等待
      if cm.resourceLock != nil {
         if err := cm.startLeaderElection(ctx); err != nil {
            cm.errChan <- err
         }
      } else {
         // Treat not having leader election enabled the same as being elected.
         // 如果选主成功,执行核心启动逻辑
         if err := cm.startLeaderElectionRunnables(); err != nil {
            cm.errChan <- err
         }
         close(cm.elected)
      }
   }()
}

最后一步了,cm.startLeaderElectionRunnables(),我们追踪一下核心启动逻辑,与cluster是不是异曲同工。

func (r *runnableGroup) Start(ctx context.Context) error {
   var retErr error

   r.startOnce.Do(func() {
      defer close(r.startReadyCh)

      // Start the internal reconciler.
      go r.reconcile()

      // Start the group and queue up all
      // the runnables that were added prior.
      r.start.Lock()
      r.started = true
      for _, rn := range r.startQueue {
         rn.signalReady = true
         r.ch <- rn
      }
      r.start.Unlock()

      // If we don't have any queue, return.
      if len(r.startQueue) == 0 {
         return
      }

      // Wait for all runnables to signal.
      for {
         select {
         case <-ctx.Done():
            if err := ctx.Err(); !errors.Is(err, context.Canceled) {
               retErr = err
            }
         case rn := <-r.startReadyCh:
            for i, existing := range r.startQueue {
               if existing == rn {
                  // Remove the item from the start queue.
                  r.startQueue = append(r.startQueue[:i], r.startQueue[i+1:]...)
                  break
               }
            }
            // We're done waiting if the queue is empty, return.
            if len(r.startQueue) == 0 {
               return
            }
         }
      }
   })

   return retErr
}

我们深入看一下关键是Controller的start方法是什么。

func (c *Controller) Start(ctx context.Context) error {
   .....

   c.Queue = c.MakeQueue()
  ......
   err := func() error {
      defer c.mu.Unlock()

      // TODO(pwittrock): Reconsider HandleCrash
      defer utilruntime.HandleCrash()

      // NB(directxman12): launch the sources *before* trying to wait for the
      // caches to sync so that they have a chance to register their intendeded
      // caches.
      for _, watch := range c.startWatches {
         c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))
         // 这个start是不是很眼熟,就是上文分析的Kind的start方法,主要从informer中监听事件放到queue中
         if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
            return err
         }
      }
      ......
      wg.Add(c.MaxConcurrentReconciles)
      for i := 0; i < c.MaxConcurrentReconciles; i++ {
         go func() {
            defer wg.Done()
            // Run a worker thread that just dequeues items, processes them, and marks them done.
            // It enforces that the reconcileHandler is never invoked concurrently with the same object.
            for c.processNextWorkItem(ctx) {
            }
         }()
      }
      ....
   }()
   if err != nil {
      return err
   }
   .......
}

我们之前之说生产者是怎么往controller的队列中添加数据的,而没有说队列中的事件是怎么消费数据的,核心逻辑就在processNextWorkItem这个方法中,

func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
  .......
  // 这里的c.Reconcile就是controller
   result, err := c.Reconcile(ctx, req)
   switch {
   case err != nil:
      if errors.Is(err, reconcile.TerminalError(nil)) {
         ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc()
      } else {
         c.Queue.AddRateLimited(req)
      }
      ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
      ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()
      if !result.IsZero() {
         log.Info("Warning: Reconciler returned both a non-zero result and a non-nil error. The result will always be ignored if the error is non-nil and the non-nil error causes reqeueuing with exponential backoff. For more details, see: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile#Reconciler")
      }
      log.Error(err, "Reconciler error")
   case result.RequeueAfter > 0:
      log.V(5).Info(fmt.Sprintf("Reconcile done, requeueing after %s", result.RequeueAfter))
      // The result.RequeueAfter request will be lost, if it is returned
      // along with a non-nil error. But this is intended as
      // We need to drive to stable reconcile loops before queuing due
      // to result.RequestAfter
      c.Queue.Forget(obj)
      c.Queue.AddAfter(req, result.RequeueAfter)
      ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()
   case result.Requeue:
      log.V(5).Info("Reconcile done, requeueing")
      c.Queue.AddRateLimited(req)
      ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()
   default:
      log.V(5).Info("Reconcile successful")
      // Finally, if no error occurs we Forget this item so it does not
      // get queued again until another change happens.
      c.Queue.Forget(obj)
      ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc()
   }
}

我们先分析c.Reconcile做了些什么事情,主要是调用了controller中的Do属性的Reconcile方法,而Do就是我们自定义的Reconcile实体属性。

func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) {
......
   return c.Do.Reconcile(ctx, req)
}

现在回过头来分析c.reconcileHandler(ctx, obj).

switch {
     // 如果上一步执行有异常报错,1)忽略,添加通知 2)重新入队
case err != nil:
   
   if errors.Is(err, reconcile.TerminalError(nil)) {
      ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc()
   } else {
      c.Queue.AddRateLimited(req)
   }
  .......
  // 如果返回的result有RequeueAfter这个属性字段,选择在一段时间后延迟入队
case result.RequeueAfter > 0:
   log.V(5).Info(fmt.Sprintf("Reconcile done, requeueing after %s", result.RequeueAfter))
   // The result.RequeueAfter request will be lost, if it is returned
   // along with a non-nil error. But this is intended as
   // We need to drive to stable reconcile loops before queuing due
   // to result.RequestAfter
   c.Queue.Forget(obj)
   c.Queue.AddAfter(req, result.RequeueAfter)
   ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()
case result.Requeue:
    // 如果返回的result,直接入队
   log.V(5).Info("Reconcile done, requeueing")
   c.Queue.AddRateLimited(req)
   ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()
default:
      // 默认情况下,从队列中移除
   log.V(5).Info("Reconcile successful")
   // Finally, if no error occurs we Forget this item so it does not
   // get queued again until another change happens.
   c.Queue.Forget(obj)
   ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc()
}

至此,controller-runtime的源码分析完毕.

遗留

workqueue.RateLimitingInterface这个队列的分析没写

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐