认识Kubebuilder

前言

没有人会喜欢黑盒,在使用一个工具之前,我习惯于尽可能多地去了解它,不然用起来会觉得不踏实。Controller的工作流程已经很熟悉了,理解kubebuilder的源码应该也比较容易。因此,大概阅读了一下了kubebuilder的源码,本篇圈出其中几个重点,用以帮助理解和认识kubebuilder。

概念名词

下面几个概念名词非常的重要,文中会多次提及。

Own Resource

CRD一般设计用作管理k8s内置的各类资源组合,用以实现自定义的部署和运行逻辑,是一种上层封装,因此被封装的下层build in Resouce实例,就称之为 Own Resource.

Owner Resource

与上面的 Own Resource对应的,CRD资源实例作为上层管理单位,称之为 Owner Resource.

Kind & Resource

两者在绝大部分情况下说是类型一一对应的关系,例如所有的Pod resources所属的Kind都是Pod Kind,可以简单理解为Kind是resource的种类标识。

GVK & GVR

GVK = Group + Version + Kind组合而来的,资源种类描述术语,例如 deployment kind的GVK是 extensions/v1beata1/deployments

GVR = Group + Version + Resource组合而来的,资源实例描述术语,例如某个deployment的name是deploy-sample,那么它的GVR则是extensions/v1beata1/deploy-sample

Scheme

每种资源的都需要有对应的Scheme,Scheme结构体包含gvkToType和typeToGVK的字段映射关系,APIServer 根据Scheme来进行资源的序列化和反序列化。

Scheme struct如下:

type Scheme struct {
    // versionMap allows one to figure out the go type of an object with
    // the given version and name.
    gvkToType map[unversioned.GroupVersionKind]reflect.Type

    // typeToGroupVersion allows one to find metadata for a given go object.
    // The reflect.Type we index by should *not* be a pointer.
    typeToGVK map[reflect.Type][]unversioned.GroupVersionKind

    // unversionedTypes are transformed without conversion in ConvertToVersion.
    unversionedTypes map[reflect.Type]unversioned.GroupVersionKind

    // unversionedKinds are the names of kinds that can be created in the context of any group
    // or version
    // TODO: resolve the status of unversioned types.
    unversionedKinds map[string]reflect.Type

    // Map from version and resource to the corresponding func to convert
    // resource field labels in that version to internal version.
    fieldLabelConversionFuncs map[string]map[string]FieldLabelConversionFunc

    // defaulterFuncs is an array of interfaces to be called with an object to provide defaulting
    // the provided object must be a pointer.
    defaulterFuncs map[reflect.Type]func(interface{})

    // converter stores all registered conversion functions. It also has
    // default coverting behavior.
    converter *conversion.Converter

    // cloner stores all registered copy functions. It also has default
    // deep copy behavior.
    cloner *conversion.Cloner
}

组件

controller使用client-go包里的informer模式工作,向APIServer watch GVK下对应的GVR,并充分利用cache、index、queue,可参考这张图片再回顾一下这个工作流程:

与此对应的,kubebuilder大致有下面几种主要组件:

Manager

Kubebuilder 的最外层管理组件,负责初始化controller、cache、client。

Cache

Kubebuilder 的内部组件,负责生成SharedInformer,watch关注的GVK下的GVR的变化(增删改),以触发 Controller 的 Reconcile 逻辑。

Clients

controller工作中需要对对资源进行CURD,CURD操作封装到Client中来进行,其中的写操作(增删改)直接访问 APIServer,其中的读操作(查)对接的是本地的 Cache。

初始化

首先使用kubebuilder初始化一个CRD项目,以便展开进入kubebuilder的内部。

在初始化之前,首先想好CRD资源的名称,名称不要与现有的资源名称冲突,api groupVersion,建议使用自定义的api groupVersion与内置的区别开。

查看现有的所有resource

kubectl api-resources -o wide

查看现有的api groupVersion

kubectl api-versions

我的CRD 名称定为: Unit, api groupVersion定为 custom/v1

init

# 自定义
export CRD=Unit
export group=custom
export version=v1

mkdir -p CRD/${CRD} && cd CRD/${CRD}

export GO111MODULE=on

# 如果路径位于GOPATH/src下,go mod这一步可省略
go mod init ${CRD}

# domian可自定义
kubebuilder init --domain my.crd.com

# 为CRD生成API groupVersion
# kubebuilder create api --group custom --version v1 --kind Unit
kubebuilder create api --group ${group} --version ${version} --kind ${CRD}

需要交互两次:

目录创建完毕,使用IDE打开看看:

目录结构

mbp-16in:Unit ywq$ tree
.
├── Dockerfile  # 制作crd-controller镜像的ockerfile
├── Makefile  # make编译文件
├── PROJECT # 项目元数据
├── api   
│   └── v1
│       ├── groupversion_info.go # GVK信息、scheme生成的方法都在这里
│       ├── unit_types.go # 自定义CRD结构
│       └── zz_generated.deepcopy.go  # 资源对象的操作一开始都是建立在deepcopy出来的复制对象身上的
├── bin
│   └── manager # go打包的二进制文件
├── config # 所有最终生成的需要kubectl apply的的资源,按照功能进行分片成不同的目录,这里有些地方可以做些自定义的配置
│   ├── certmanager
│   │   ├── certificate.yaml
│   │   ├── kustomization.yaml
│   │   └── kustomizeconfig.yaml
│   ├── crd # crd的配置
│   │   ├── kustomization.yaml
│   │   ├── kustomizeconfig.yaml
│   │   └── patches
│   │       ├── cainjection_in_units.yaml
│   │       └── webhook_in_units.yaml
│   ├── default
│   │   ├── kustomization.yaml
│   │   ├── manager_auth_proxy_patch.yaml
│   │   ├── manager_webhook_patch.yaml
│   │   └── webhookcainjection_patch.yaml 
│   ├── manager # manager的deployment在这里
│   │   ├── kustomization.yaml
│   │   └── manager.yaml
│   ├── prometheus  # metric暴露
│   │   ├── kustomization.yaml
│   │   └── monitor.yaml
│   ├── rbac  # rbac授权
│   │   ├── auth_proxy_client_clusterrole.yaml
│   │   ├── auth_proxy_role.yaml
│   │   ├── auth_proxy_role_binding.yaml
│   │   ├── auth_proxy_service.yaml
│   │   ├── kustomization.yaml
│   │   ├── leader_election_role.yaml
│   │   ├── leader_election_role_binding.yaml
│   │   ├── role_binding.yaml
│   │   ├── unit_editor_role.yaml
│   │   └── unit_viewer_role.yaml
│   ├── samples # Unit resource sample
│   │   └── custom_v1_unit.yaml
│   └── webhook # Unit webhook Service,用来接收APIServer转发而来的webhook请求
│       ├── kustomization.yaml
│       ├── kustomizeconfig.yaml
│       └── service.yaml
├── controllers
│   ├── suite_test.go
│   └── unit_controller.go # CRD controller的核心逻辑在这里
├── go.mod
├── go.sum
├── hack
│   └── boilerplate.go.txt
└── main.go # Entrypoint

源码分析

从main.go入手:

func main() {
  ...
  // new manager
  mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
    Scheme:             scheme,
    MetricsBindAddress: metricsAddr,
    Port:               9443,
    LeaderElection:     enableLeaderElection,
    LeaderElectionID:   "68e16627.my.crd.com",
  })
  if err != nil {
    setupLog.Error(err, "unable to start manager")
    os.Exit(1)
  }
  
  // register reconciler
  if err = (&controllers.UnitReconciler{
    Client: mgr.GetClient(),
    Log:    ctrl.Log.WithName("controllers").WithName("Unit"),
    Scheme: mgr.GetScheme(),
  }).SetupWithManager(mgr); err != nil {
    setupLog.Error(err, "unable to create controller", "controller", "Unit")
    os.Exit(1)
  }
  // +kubebuilder:scaffold:builder

  setupLog.Info("starting manager")
  // start manager
  if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
    setupLog.Error(err, "problem running manager")
    os.Exit(1)
  }
}

main方法里有3个步骤:

  • new manager
  • register reconciler
  • start manager

这些内部的方法,都是分布在依赖的包里面的,不再是在Unit目录下。进去分别来分析一下

New manager

main.go:57

  mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{...}

==> /Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/alias.go:101

  NewManager = manager.New

==> /Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/manager.go:229

// New returns a new Manager for creating Controllers.
func New(config *rest.Config, options Options) (Manager, error) {
  ...
  
  // 创建Cache对象,用做client的读请求,以及生成informer
  cache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})
  if err != nil {
    return nil, err
  }
  
  // 创建读请求的client,即apiReader,读请求走的Cache
  apiReader, err := client.New(config, client.Options{Scheme: options.Scheme, Mapper: mapper})
  if err != nil {
    return nil, err
  }
  
  // 创建写请求的client,写请求直连APIServer
  writeObj, err := options.NewClient(cache, config, client.Options{Scheme: options.Scheme, Mapper: mapper})
  if err != nil {
    return nil, err
  }
  // recorderProvider,记录event事件用的,kubectl describe可用到
  recorderProvider, err := options.newRecorderProvider(config, options.Scheme, log.WithName("events"), options.EventBroadcaster)
  if err != nil {
    return nil, err
  }

  // controller多副本leader选举使用的
  resourceLock, err := options.newResourceLock(config, recorderProvider, leaderelection.Options{
    LeaderElection:          options.LeaderElection,
    LeaderElectionID:        options.LeaderElectionID,
    LeaderElectionNamespace: options.LeaderElectionNamespace,
  })
  if err != nil {
    return nil, err
  }

  // 暴露/metrics给prometheus使用
  metricsListener, err := options.newMetricsListener(options.MetricsBindAddress)
  if err != nil {
    return nil, err
  }

  ...

  return &controllerManager{
    config:                config,
    scheme:                options.Scheme,
    cache:                 cache,
    fieldIndexes:          cache,
    // writeObj赋值给client字段
    client:                writeObj,
    apiReader:             apiReader,
    recorderProvider:      recorderProvider,
    resourceLock:          resourceLock,
    mapper:                mapper,
    metricsListener:       metricsListener,
    internalStop:          stop,
    internalStopper:       stop,
    port:                  options.Port,
    host:                  options.Host,
    certDir:               options.CertDir,
    leaseDuration:         *options.LeaseDuration,
    renewDeadline:         *options.RenewDeadline,
    retryPeriod:           *options.RetryPeriod,
    healthProbeListener:   healthProbeListener,
    readinessEndpointName: options.ReadinessEndpointName,
    livenessEndpointName:  options.LivenessEndpointName,
  }, nil
}

可以看到,这些步骤里面,值得继续深入的是NewCacheNewClient

NewCache

/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/manager.go:246

// options在这里,展开来去看看NewCache方法
options = setOptionsDefaults(options)
...

// 使用options.NewCache方法来生成cache对象
cache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})
  if err != nil {
    return nil, err
  }


==> /Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/manager.go:351

func setOptionsDefaults(options Options) Options {
  ...

  // Allow newCache to be mocked
  if options.NewCache == nil {
    options.NewCache = cache.New
  }
  ...
}

==> /Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/cache/cache.go:110

// New initializes and returns a new Cache.
func New(config *rest.Config, opts Options) (Cache, error) {
  opts, err := defaultOpts(config, opts)
  if err != nil {
    return nil, err
  }
  im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
  return &informerCache{InformersMap: im}, nil
}

可以发现,这里就是根据配置,来生成所需要监测的每种GVK对应的Informer。至于需要监测的这些GVK在哪配置,后面的篇章中会提及。

NewClient

/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/manager.go:256

// options在这里,展开来去看看NewClient方法
options = setOptionsDefaults(options)

// 使用options.NewClient方法来生成client对象
writeObj, err := options.NewClient(cache, config, client.Options{Scheme: options.Scheme, Mapper: mapper})
  if err != nil {
    return nil, err
  }

==> /Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/manager.go:351

// setOptionsDefaults set default values for Options fields
func setOptionsDefaults(options Options) Options {
  ...

  // Allow newClient to be mocked
  if options.NewClient == nil {
    options.NewClient = defaultNewClient
  }
  
  ...
}

==> /Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/manager.go:320

func defaultNewClient(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
  // 写操作的直连APIServer的client
  c, err := client.New(config, options)
  if err != nil {
    return nil, err
  }

  return &client.DelegatingClient{
    Reader: &client.DelegatingReader{
      // 读操作走manager里面的cache
      CacheReader:  cache,
      // 写操作的直连client
      ClientReader: c,
    },
    Writer:       c,
    StatusClient: c,
  }, nil
}

==> /Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/client/client.go:54

// 利用scheme来取得给定的资源type所属的GVK。
// 简而言之,这里的client是通过scheme与APIServer直接进行序列化和反序列化交互的
func New(config *rest.Config, options Options) (Client, error) {
  if config == nil {
    return nil, fmt.Errorf("must provide non-nil rest.Config to client.New")
  }

  // Init a scheme if none provided
  if options.Scheme == nil {
    options.Scheme = scheme.Scheme
  }

  // Init a Mapper if none provided
  if options.Mapper == nil {
    var err error
    options.Mapper, err = apiutil.NewDynamicRESTMapper(config)
    if err != nil {
      return nil, err
    }
  }

  dynamicClient, err := dynamic.NewForConfig(config)
  if err != nil {
    return nil, err
  }

  c := &client{
    typedClient: typedClient{
      cache: clientCache{
        config:         config,
        scheme:         options.Scheme,
        mapper:         options.Mapper,
        codecs:         serializer.NewCodecFactory(options.Scheme),
        resourceByType: make(map[reflect.Type]*resourceMeta),
      },
      paramCodec: runtime.NewParameterCodec(options.Scheme),
    },
    unstructuredClient: unstructuredClient{
      client:     dynamicClient,
      restMapper: options.Mapper,
    },
  }

  return c, nil
}

register reconciler

reconciler即controller,命名为reconciler,意为协调器更贴切,控制器的核心逻辑在这里面。

main.go:69

if err = (&controllers.UnitReconciler{
    Client: mgr.GetClient(),
    Log:    ctrl.Log.WithName("controllers").WithName("Unit"),
    Scheme: mgr.GetScheme(),
  }).SetupWithManager(mgr); err != nil {
    setupLog.Error(err, "unable to create controller", "controller", "Unit")
    os.Exit(1)
  }

==> controllers/unit_controller.go:49

func (r *UnitReconciler) SetupWithManager(mgr ctrl.Manager) error {
  return ctrl.NewControllerManagedBy(mgr).
    For(&customv1.Unit{}).
    Complete(r)
}

Complete方法是用作生成Builder.

==> /Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/builder/controller.go:128

==> /Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/builder/controller.go:134

// Build builds the Application ControllerManagedBy and returns the Controller it created.
func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
  ...
  
  // Set the ControllerManagedBy
  if err := blder.doController(r); err != nil {
    return nil, err
  }

  // Set the Watch
  if err := blder.doWatch(); err != nil {
    return nil, err
  }

  return blder.ctrl, nil
}

Builder最主要的是doController()doWatch()方法,分别来看下。

doController

/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/builder/controller.go:145

==> /Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/builder/controller.go:213

==> /Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/builder/controller.go:35

==> /Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/controller/controller.go:63

// New returns a new Controller registered with the Manager.  The Manager will ensure that shared Caches have
// been synced before the Controller is Started.
func New(name string, mgr manager.Manager, options Options) (Controller, error) {
  if options.Reconciler == nil {
    return nil, fmt.Errorf("must specify Reconciler")
  }

  if len(name) == 0 {
    return nil, fmt.Errorf("must specify Name for Controller")
  }

  if options.MaxConcurrentReconciles <= 0 {
    options.MaxConcurrentReconciles = 1
  }

  // Inject dependencies into Reconciler
  if err := mgr.SetFields(options.Reconciler); err != nil {
    return nil, err
  }

  // Create controller with dependencies set
  c := &controller.Controller{
    Do:       options.Reconciler,
    Cache:    mgr.GetCache(),
    Config:   mgr.GetConfig(),
    Scheme:   mgr.GetScheme(),
    Client:   mgr.GetClient(),
    Recorder: mgr.GetEventRecorderFor(name),
    MakeQueue: func() workqueue.RateLimitingInterface {
      return workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name)
    },
    MaxConcurrentReconciles: options.MaxConcurrentReconciles,
    Name:                    name,
  }

  // Add the controller as a Manager components
  return c, mgr.Add(c)
}

可以看出,doController方法是生成Controller,并将其注册进Manager的外层主体进行托管。

其中,Controller结构体实例内包含的字段如下:

c := &controller.Controller{
    // Reconciler只有一个接口方法Reconcile(),这个方法是CRD控制的核心逻辑,kubebuilder已经自动生成,但里面的逻辑需要填充,见下面
    Do:       options.Reconciler,  
    // Cache用来对接informer检测GVR状态并保存在缓存中,提供用作读
    Cache:    mgr.GetCache(),
    Config:   mgr.GetConfig(),
    // 用作资源实例Type的正反序列化
    Scheme:   mgr.GetScheme(),
    // Client用作写请求,直连APIServer
    Client:   mgr.GetClient(),
    // 记录event
    Recorder: mgr.GetEventRecorderFor(name),
    // workqueue
    MakeQueue: func() workqueue.RateLimitingInterface {
      return workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name)
    },
    // 协调器的并发数限制
    MaxConcurrentReconciles: options.MaxConcurrentReconciles,
    Name:                    name,
  }

==> controllers/unit_controller.go:40

func (r *UnitReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
   _ = context.Background()
   _ = r.Log.WithValues("unit", req.NamespacedName)

   // your logic here

   return ctrl.Result{}, nil
}

Reconcile()方法在这里,逻辑需要自己实现。后面的篇章中会描述我的需求、设计和代码实例。

doWatch
func (blder *Builder) doWatch() error {
  // Reconcile type
  src := &source.Kind{Type: blder.apiType}
  // register handler
  hdler := &handler.EnqueueRequestForObject{}
  
  // Watch CRD资源的变更请求
  err := blder.ctrl.Watch(src, hdler, blder.predicates...)
  if err != nil {
    return err
  }

  // Watch 被CRD管理的own resource的变更请求
  for _, obj := range blder.managedObjects {
    src := &source.Kind{Type: obj}
    hdler := &handler.EnqueueRequestForOwner{
      OwnerType:    blder.apiType,
      IsController: true,
    }
    if err := blder.ctrl.Watch(src, hdler, blder.predicates...); err != nil {
      return err
    }
  }

  
  for _, w := range blder.watchRequest {
    if err := blder.ctrl.Watch(w.src, w.eventhandler, blder.predicates...); err != nil {
      return err
    }

  }
  return nil
}

doWatch()主要干两件事:watch CRD 资源的变更,以及watch CRD 资源的own resouces的变更.watch到变更之后下一步做什么呢?当然是交给handler来处理,来看一下这里第二行代码中生成的handler是做什么的。

/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/handler/enqueue.go:34

type EnqueueRequestForObject struct{}

// Create implements EventHandler
func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
  if evt.Meta == nil {
    enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
    return
  }
  q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
    Name:      evt.Meta.GetName(),
    Namespace: evt.Meta.GetNamespace(),
  }})
}

// Update implements EventHandler
func (e *EnqueueRequestForObject) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
  if evt.MetaOld != nil {
    q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
      Name:      evt.MetaOld.GetName(),
      Namespace: evt.MetaOld.GetNamespace(),
    }})
  } else {
    enqueueLog.Error(nil, "UpdateEvent received with no old metadata", "event", evt)
  }

  if evt.MetaNew != nil {
    q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
      Name:      evt.MetaNew.GetName(),
      Namespace: evt.MetaNew.GetNamespace(),
    }})
  } else {
    enqueueLog.Error(nil, "UpdateEvent received with no new metadata", "event", evt)
  }
}

// Delete implements EventHandler
func (e *EnqueueRequestForObject) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
  if evt.Meta == nil {
    enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt)
    return
  }
  q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
    Name:      evt.Meta.GetName(),
    Namespace: evt.Meta.GetNamespace(),
  }})
}

不出意外,handler会增删查的写请求的对象的NamespacedName,压入workqueue里面,与此同时,在另一头检测workqueue的协调器Reconciler默默地开始运转。

需要准备的注册工作都做完了,下面就要回到main.go中,启动外层托管组件Manager了。

启动Manager

启动Manager分为两步,第一步准备好Cache组件,第二步启动controller组件

启动Cache

main.go:80

  if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
    setupLog.Error(err, "problem running manager")
    os.Exit(1)
  }

上面ctrl.NewManager()方法最终返回的是controllerManager{}对象的指针,来controllerManager里面找一下Start()方法.

/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/internal.go:403

func (cm *controllerManager) Start(stop <-chan struct{}) error {
  ...

  go cm.startNonLeaderElectionRunnables()
  
  // 多controller实例时要进行leader选举,获取leader lock的实例才开始工作。startNonLeaderElectionRunnables和startLeaderElectionRunnables内在的工作方式本质无区别。
  if cm.resourceLock != nil {
    err := cm.startLeaderElection()
    if err != nil {
      return err
    }
  } else {
    go cm.startLeaderElectionRunnables()
  }
  
  ...
}

/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/internal.go:465

func (cm *controllerManager) startLeaderElectionRunnables() {
  cm.mu.Lock()
  defer cm.mu.Unlock()
  
  // 重点关注这里的waitForCache
  cm.waitForCache()

  // Start the leader election Runnables after the cache has synced
  for _, c := range cm.leaderElectionRunnables {
    // Controllers block, but we want to return an error if any have an error starting.
    // Write any Start errors to a channel so we can return them
    ctrl := c
    go func() {
      if err := ctrl.Start(cm.internalStop); err != nil {
        cm.errSignal.SignalError(err)
      }
      // we use %T here because we don't have a good stand-in for "name",
      // and the full runnable might not serialize (mutexes, etc)
      log.V(1).Info("leader-election runnable finished", "runnable type", fmt.Sprintf("%T", ctrl))
    }()
  }

  cm.startedLeader = true
}

==> /Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/internal.go:489


func (cm *controllerManager) waitForCache() {
  if cm.started {
    return
  }

  // Start the Cache. Allow the function to start the cache to be mocked out for testing
  if cm.startCache == nil {
    cm.startCache = cm.cache.Start
  }
  go func() {
    if err := cm.startCache(cm.internalStop); err != nil {
      cm.errSignal.SignalError(err)
    }
  }()

  // Wait for the caches to sync.
  // TODO(community): Check the return value and write a test
  cm.cache.WaitForCacheSync(cm.internalStop)
  cm.started = true
}

waitForCache的主要作用是启动Cache和等待Cache的首次同步完成。

启动cache的步骤则包括:创建FIFO queue、初始化informer、reflector、LocalStorage cache、index索引等。

启动controller

controller启动之后的工作模式分析之前的controller系列文章已经讲过很多次了,这里再快速回顾一遍。

/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/internal/controller/controller.go:146

func (c *Controller) Start(stop <-chan struct{}) error {
...

      // Launch workers to process resources
      log.Info("Starting workers", "controller", c.Name, "worker count", c.MaxConcurrentReconciles)
      for i := 0; i < c.MaxConcurrentReconciles; i++ {
        // 多个workder,间隔一定时间(1s)工作一次
         go wait.Until(c.worker, c.JitterPeriod, stop)
      }

...
}

func (c *Controller) worker() {
  for c.processNextWorkItem() {
  }
}

func (c *Controller) processNextWorkItem() bool {
  obj, shutdown := c.Queue.Get()
  if shutdown {
    // Stop working
    return false
  }
  
  defer c.Queue.Done(obj)
  return c.reconcileHandler(obj)
}

func (c *Controller) reconcileHandler(obj interface{}) bool {
  ...
  
  // RunInformersAndControllers the syncHandler, passing it the namespace/Name string of the
  // resource to be synced.
  if result, err := c.Do.Reconcile(req); err != nil {
    ...
  }
  
  ...
}

worker内部的最终逻辑回到了Reconcile方法,也即是需要在controllers/unit_controller.go:40中的Reconcile()自定义逻辑的方法,按照自定义的逻辑运行。

总结

如果之前看过内置资源的Controller的源码,对Controller工作方式有了解,那么理解Kubebuilder起来也是轻车熟路。

归纳下来,kubebuilder创建的controller做的事情也是跟文章最上面的流程图一样,只是kubebuilder经过了高度的封装后,便利程度到了仅需要实现Reconcile方法内部的逻辑即可,中间所有的流程都按照标准controller的运行方式替你包揽实现了。

本篇对kubebuilder的核心重点介绍到此结束,下一篇将开始正式介绍Unit CRD的设计思路,例如Unit会管理它的own resource,以及kubebuilder如何来帮助 授权管理own resource、同步Unit与own resource。

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐