《 Kubebuilder v2 使用指南 》-P3-认识Kubebuilder
认识Kubebuilder前言没有人会喜欢黑盒,在使用一个工具之前,我习惯于尽可能多地去了解它,不然用起来会觉得不踏实。Controller的工作流程已经很熟悉了,理解kubebuilder的源码应该也比较容易。因此,大概阅读了一下了kubebuilder的源码,本篇圈出其中几个重点,用以帮助理解和认识kubebuilder。概念名词下面几个概念名词非常的重要,文中会多次提及。Own Resour
认识Kubebuilder
前言
没有人会喜欢黑盒,在使用一个工具之前,我习惯于尽可能多地去了解它,不然用起来会觉得不踏实。Controller的工作流程已经很熟悉了,理解kubebuilder的源码应该也比较容易。因此,大概阅读了一下了kubebuilder的源码,本篇圈出其中几个重点,用以帮助理解和认识kubebuilder。
概念名词
下面几个概念名词非常的重要,文中会多次提及。
Own Resource
CRD一般设计用作管理k8s内置的各类资源组合,用以实现自定义的部署和运行逻辑,是一种上层封装,因此被封装的下层build in Resouce实例,就称之为 Own Resource.
Owner Resource
与上面的 Own Resource对应的,CRD资源实例作为上层管理单位,称之为 Owner Resource.
Kind & Resource
两者在绝大部分情况下说是类型一一对应的关系,例如所有的Pod resources所属的Kind都是Pod Kind,可以简单理解为Kind是resource的种类标识。
GVK & GVR
GVK = Group + Version + Kind组合而来的,资源种类描述术语,例如 deployment kind的GVK是 extensions/v1beata1/deployments
GVR = Group + Version + Resource组合而来的,资源实例描述术语,例如某个deployment的name是deploy-sample,那么它的GVR则是extensions/v1beata1/deploy-sample
Scheme
每种资源的都需要有对应的Scheme,Scheme结构体包含gvkToType和typeToGVK的字段映射关系,APIServer 根据Scheme来进行资源的序列化和反序列化。
Scheme struct如下:
type Scheme struct {
// versionMap allows one to figure out the go type of an object with
// the given version and name.
gvkToType map[unversioned.GroupVersionKind]reflect.Type
// typeToGroupVersion allows one to find metadata for a given go object.
// The reflect.Type we index by should *not* be a pointer.
typeToGVK map[reflect.Type][]unversioned.GroupVersionKind
// unversionedTypes are transformed without conversion in ConvertToVersion.
unversionedTypes map[reflect.Type]unversioned.GroupVersionKind
// unversionedKinds are the names of kinds that can be created in the context of any group
// or version
// TODO: resolve the status of unversioned types.
unversionedKinds map[string]reflect.Type
// Map from version and resource to the corresponding func to convert
// resource field labels in that version to internal version.
fieldLabelConversionFuncs map[string]map[string]FieldLabelConversionFunc
// defaulterFuncs is an array of interfaces to be called with an object to provide defaulting
// the provided object must be a pointer.
defaulterFuncs map[reflect.Type]func(interface{})
// converter stores all registered conversion functions. It also has
// default coverting behavior.
converter *conversion.Converter
// cloner stores all registered copy functions. It also has default
// deep copy behavior.
cloner *conversion.Cloner
}
组件
controller使用client-go包里的informer模式工作,向APIServer watch GVK下对应的GVR,并充分利用cache、index、queue,可参考这张图片再回顾一下这个工作流程:
与此对应的,kubebuilder大致有下面几种主要组件:
Manager
Kubebuilder 的最外层管理组件,负责初始化controller、cache、client。
Cache
Kubebuilder 的内部组件,负责生成SharedInformer,watch关注的GVK下的GVR的变化(增删改),以触发 Controller 的 Reconcile 逻辑。
Clients
controller工作中需要对对资源进行CURD,CURD操作封装到Client中来进行,其中的写操作(增删改)直接访问 APIServer,其中的读操作(查)对接的是本地的 Cache。
初始化
首先使用kubebuilder初始化一个CRD项目,以便展开进入kubebuilder的内部。
在初始化之前,首先想好CRD资源的名称,名称不要与现有的资源名称冲突,api groupVersion,建议使用自定义的api groupVersion与内置的区别开。
查看现有的所有resource:
kubectl api-resources -o wide
查看现有的api groupVersion:
kubectl api-versions
我的CRD 名称定为: Unit, api groupVersion定为 custom/v1
init
# 自定义
export CRD=Unit
export group=custom
export version=v1
mkdir -p CRD/${CRD} && cd CRD/${CRD}
export GO111MODULE=on
# 如果路径位于GOPATH/src下,go mod这一步可省略
go mod init ${CRD}
# domian可自定义
kubebuilder init --domain my.crd.com
# 为CRD生成API groupVersion
# kubebuilder create api --group custom --version v1 --kind Unit
kubebuilder create api --group ${group} --version ${version} --kind ${CRD}
需要交互两次:
目录创建完毕,使用IDE打开看看:
目录结构
mbp-16in:Unit ywq$ tree
.
├── Dockerfile # 制作crd-controller镜像的ockerfile
├── Makefile # make编译文件
├── PROJECT # 项目元数据
├── api
│ └── v1
│ ├── groupversion_info.go # GVK信息、scheme生成的方法都在这里
│ ├── unit_types.go # 自定义CRD结构
│ └── zz_generated.deepcopy.go # 资源对象的操作一开始都是建立在deepcopy出来的复制对象身上的
├── bin
│ └── manager # go打包的二进制文件
├── config # 所有最终生成的需要kubectl apply的的资源,按照功能进行分片成不同的目录,这里有些地方可以做些自定义的配置
│ ├── certmanager
│ │ ├── certificate.yaml
│ │ ├── kustomization.yaml
│ │ └── kustomizeconfig.yaml
│ ├── crd # crd的配置
│ │ ├── kustomization.yaml
│ │ ├── kustomizeconfig.yaml
│ │ └── patches
│ │ ├── cainjection_in_units.yaml
│ │ └── webhook_in_units.yaml
│ ├── default
│ │ ├── kustomization.yaml
│ │ ├── manager_auth_proxy_patch.yaml
│ │ ├── manager_webhook_patch.yaml
│ │ └── webhookcainjection_patch.yaml
│ ├── manager # manager的deployment在这里
│ │ ├── kustomization.yaml
│ │ └── manager.yaml
│ ├── prometheus # metric暴露
│ │ ├── kustomization.yaml
│ │ └── monitor.yaml
│ ├── rbac # rbac授权
│ │ ├── auth_proxy_client_clusterrole.yaml
│ │ ├── auth_proxy_role.yaml
│ │ ├── auth_proxy_role_binding.yaml
│ │ ├── auth_proxy_service.yaml
│ │ ├── kustomization.yaml
│ │ ├── leader_election_role.yaml
│ │ ├── leader_election_role_binding.yaml
│ │ ├── role_binding.yaml
│ │ ├── unit_editor_role.yaml
│ │ └── unit_viewer_role.yaml
│ ├── samples # Unit resource sample
│ │ └── custom_v1_unit.yaml
│ └── webhook # Unit webhook Service,用来接收APIServer转发而来的webhook请求
│ ├── kustomization.yaml
│ ├── kustomizeconfig.yaml
│ └── service.yaml
├── controllers
│ ├── suite_test.go
│ └── unit_controller.go # CRD controller的核心逻辑在这里
├── go.mod
├── go.sum
├── hack
│ └── boilerplate.go.txt
└── main.go # Entrypoint
源码分析
从main.go入手:
func main() {
...
// new manager
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
LeaderElection: enableLeaderElection,
LeaderElectionID: "68e16627.my.crd.com",
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
// register reconciler
if err = (&controllers.UnitReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("Unit"),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Unit")
os.Exit(1)
}
// +kubebuilder:scaffold:builder
setupLog.Info("starting manager")
// start manager
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
}
main方法里有3个步骤:
- new manager
- register reconciler
- start manager
这些内部的方法,都是分布在依赖的包里面的,不再是在Unit目录下。进去分别来分析一下
New manager
main.go:57
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{...}
==> /Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/alias.go:101
NewManager = manager.New
==> /Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/manager.go:229
// New returns a new Manager for creating Controllers.
func New(config *rest.Config, options Options) (Manager, error) {
...
// 创建Cache对象,用做client的读请求,以及生成informer
cache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})
if err != nil {
return nil, err
}
// 创建读请求的client,即apiReader,读请求走的Cache
apiReader, err := client.New(config, client.Options{Scheme: options.Scheme, Mapper: mapper})
if err != nil {
return nil, err
}
// 创建写请求的client,写请求直连APIServer
writeObj, err := options.NewClient(cache, config, client.Options{Scheme: options.Scheme, Mapper: mapper})
if err != nil {
return nil, err
}
// recorderProvider,记录event事件用的,kubectl describe可用到
recorderProvider, err := options.newRecorderProvider(config, options.Scheme, log.WithName("events"), options.EventBroadcaster)
if err != nil {
return nil, err
}
// controller多副本leader选举使用的
resourceLock, err := options.newResourceLock(config, recorderProvider, leaderelection.Options{
LeaderElection: options.LeaderElection,
LeaderElectionID: options.LeaderElectionID,
LeaderElectionNamespace: options.LeaderElectionNamespace,
})
if err != nil {
return nil, err
}
// 暴露/metrics给prometheus使用
metricsListener, err := options.newMetricsListener(options.MetricsBindAddress)
if err != nil {
return nil, err
}
...
return &controllerManager{
config: config,
scheme: options.Scheme,
cache: cache,
fieldIndexes: cache,
// writeObj赋值给client字段
client: writeObj,
apiReader: apiReader,
recorderProvider: recorderProvider,
resourceLock: resourceLock,
mapper: mapper,
metricsListener: metricsListener,
internalStop: stop,
internalStopper: stop,
port: options.Port,
host: options.Host,
certDir: options.CertDir,
leaseDuration: *options.LeaseDuration,
renewDeadline: *options.RenewDeadline,
retryPeriod: *options.RetryPeriod,
healthProbeListener: healthProbeListener,
readinessEndpointName: options.ReadinessEndpointName,
livenessEndpointName: options.LivenessEndpointName,
}, nil
}
可以看到,这些步骤里面,值得继续深入的是NewCache
和NewClient
NewCache
/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/manager.go:246
// options在这里,展开来去看看NewCache方法
options = setOptionsDefaults(options)
...
// 使用options.NewCache方法来生成cache对象
cache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})
if err != nil {
return nil, err
}
==> /Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/manager.go:351
func setOptionsDefaults(options Options) Options {
...
// Allow newCache to be mocked
if options.NewCache == nil {
options.NewCache = cache.New
}
...
}
==> /Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/cache/cache.go:110
// New initializes and returns a new Cache.
func New(config *rest.Config, opts Options) (Cache, error) {
opts, err := defaultOpts(config, opts)
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
return &informerCache{InformersMap: im}, nil
}
可以发现,这里就是根据配置,来生成所需要监测的每种GVK对应的Informer。至于需要监测的这些GVK在哪配置,后面的篇章中会提及。
NewClient
/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/manager.go:256
// options在这里,展开来去看看NewClient方法
options = setOptionsDefaults(options)
// 使用options.NewClient方法来生成client对象
writeObj, err := options.NewClient(cache, config, client.Options{Scheme: options.Scheme, Mapper: mapper})
if err != nil {
return nil, err
}
==> /Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/manager.go:351
// setOptionsDefaults set default values for Options fields
func setOptionsDefaults(options Options) Options {
...
// Allow newClient to be mocked
if options.NewClient == nil {
options.NewClient = defaultNewClient
}
...
}
==> /Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/manager.go:320
func defaultNewClient(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
// 写操作的直连APIServer的client
c, err := client.New(config, options)
if err != nil {
return nil, err
}
return &client.DelegatingClient{
Reader: &client.DelegatingReader{
// 读操作走manager里面的cache
CacheReader: cache,
// 写操作的直连client
ClientReader: c,
},
Writer: c,
StatusClient: c,
}, nil
}
==> /Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/client/client.go:54
// 利用scheme来取得给定的资源type所属的GVK。
// 简而言之,这里的client是通过scheme与APIServer直接进行序列化和反序列化交互的
func New(config *rest.Config, options Options) (Client, error) {
if config == nil {
return nil, fmt.Errorf("must provide non-nil rest.Config to client.New")
}
// Init a scheme if none provided
if options.Scheme == nil {
options.Scheme = scheme.Scheme
}
// Init a Mapper if none provided
if options.Mapper == nil {
var err error
options.Mapper, err = apiutil.NewDynamicRESTMapper(config)
if err != nil {
return nil, err
}
}
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
return nil, err
}
c := &client{
typedClient: typedClient{
cache: clientCache{
config: config,
scheme: options.Scheme,
mapper: options.Mapper,
codecs: serializer.NewCodecFactory(options.Scheme),
resourceByType: make(map[reflect.Type]*resourceMeta),
},
paramCodec: runtime.NewParameterCodec(options.Scheme),
},
unstructuredClient: unstructuredClient{
client: dynamicClient,
restMapper: options.Mapper,
},
}
return c, nil
}
register reconciler
reconciler即controller,命名为reconciler,意为协调器更贴切,控制器的核心逻辑在这里面。
main.go:69
if err = (&controllers.UnitReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("Unit"),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Unit")
os.Exit(1)
}
==> controllers/unit_controller.go:49
func (r *UnitReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&customv1.Unit{}).
Complete(r)
}
Complete方法是用作生成Builder.
==> /Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/builder/controller.go:128
==> /Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/builder/controller.go:134
// Build builds the Application ControllerManagedBy and returns the Controller it created.
func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
...
// Set the ControllerManagedBy
if err := blder.doController(r); err != nil {
return nil, err
}
// Set the Watch
if err := blder.doWatch(); err != nil {
return nil, err
}
return blder.ctrl, nil
}
Builder最主要的是doController()
和doWatch()
方法,分别来看下。
doController
/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/builder/controller.go:145
==> /Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/builder/controller.go:213
==> /Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/builder/controller.go:35
==> /Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/controller/controller.go:63
// New returns a new Controller registered with the Manager. The Manager will ensure that shared Caches have
// been synced before the Controller is Started.
func New(name string, mgr manager.Manager, options Options) (Controller, error) {
if options.Reconciler == nil {
return nil, fmt.Errorf("must specify Reconciler")
}
if len(name) == 0 {
return nil, fmt.Errorf("must specify Name for Controller")
}
if options.MaxConcurrentReconciles <= 0 {
options.MaxConcurrentReconciles = 1
}
// Inject dependencies into Reconciler
if err := mgr.SetFields(options.Reconciler); err != nil {
return nil, err
}
// Create controller with dependencies set
c := &controller.Controller{
Do: options.Reconciler,
Cache: mgr.GetCache(),
Config: mgr.GetConfig(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor(name),
MakeQueue: func() workqueue.RateLimitingInterface {
return workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name)
},
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
Name: name,
}
// Add the controller as a Manager components
return c, mgr.Add(c)
}
可以看出,doController方法是生成Controller,并将其注册进Manager的外层主体进行托管。
其中,Controller结构体实例内包含的字段如下:
c := &controller.Controller{
// Reconciler只有一个接口方法Reconcile(),这个方法是CRD控制的核心逻辑,kubebuilder已经自动生成,但里面的逻辑需要填充,见下面
Do: options.Reconciler,
// Cache用来对接informer检测GVR状态并保存在缓存中,提供用作读
Cache: mgr.GetCache(),
Config: mgr.GetConfig(),
// 用作资源实例Type的正反序列化
Scheme: mgr.GetScheme(),
// Client用作写请求,直连APIServer
Client: mgr.GetClient(),
// 记录event
Recorder: mgr.GetEventRecorderFor(name),
// workqueue
MakeQueue: func() workqueue.RateLimitingInterface {
return workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name)
},
// 协调器的并发数限制
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
Name: name,
}
==> controllers/unit_controller.go:40
func (r *UnitReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
_ = context.Background()
_ = r.Log.WithValues("unit", req.NamespacedName)
// your logic here
return ctrl.Result{}, nil
}
Reconcile()方法在这里,逻辑需要自己实现。后面的篇章中会描述我的需求、设计和代码实例。
doWatch
func (blder *Builder) doWatch() error {
// Reconcile type
src := &source.Kind{Type: blder.apiType}
// register handler
hdler := &handler.EnqueueRequestForObject{}
// Watch CRD资源的变更请求
err := blder.ctrl.Watch(src, hdler, blder.predicates...)
if err != nil {
return err
}
// Watch 被CRD管理的own resource的变更请求
for _, obj := range blder.managedObjects {
src := &source.Kind{Type: obj}
hdler := &handler.EnqueueRequestForOwner{
OwnerType: blder.apiType,
IsController: true,
}
if err := blder.ctrl.Watch(src, hdler, blder.predicates...); err != nil {
return err
}
}
for _, w := range blder.watchRequest {
if err := blder.ctrl.Watch(w.src, w.eventhandler, blder.predicates...); err != nil {
return err
}
}
return nil
}
doWatch()主要干两件事:watch CRD 资源的变更,以及watch CRD 资源的own resouces的变更.watch到变更之后下一步做什么呢?当然是交给handler来处理,来看一下这里第二行代码中生成的handler是做什么的。
/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/handler/enqueue.go:34
type EnqueueRequestForObject struct{}
// Create implements EventHandler
func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
if evt.Meta == nil {
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
return
}
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Meta.GetName(),
Namespace: evt.Meta.GetNamespace(),
}})
}
// Update implements EventHandler
func (e *EnqueueRequestForObject) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
if evt.MetaOld != nil {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.MetaOld.GetName(),
Namespace: evt.MetaOld.GetNamespace(),
}})
} else {
enqueueLog.Error(nil, "UpdateEvent received with no old metadata", "event", evt)
}
if evt.MetaNew != nil {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.MetaNew.GetName(),
Namespace: evt.MetaNew.GetNamespace(),
}})
} else {
enqueueLog.Error(nil, "UpdateEvent received with no new metadata", "event", evt)
}
}
// Delete implements EventHandler
func (e *EnqueueRequestForObject) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
if evt.Meta == nil {
enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt)
return
}
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Meta.GetName(),
Namespace: evt.Meta.GetNamespace(),
}})
}
不出意外,handler会增删查的写请求的对象的NamespacedName,压入workqueue里面,与此同时,在另一头检测workqueue的协调器Reconciler默默地开始运转。
需要准备的注册工作都做完了,下面就要回到main.go中,启动外层托管组件Manager了。
启动Manager
启动Manager分为两步,第一步准备好Cache组件,第二步启动controller组件
启动Cache
main.go:80
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
上面ctrl.NewManager()
方法最终返回的是controllerManager{}
对象的指针,来controllerManager里面找一下Start()
方法.
/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/internal.go:403
func (cm *controllerManager) Start(stop <-chan struct{}) error {
...
go cm.startNonLeaderElectionRunnables()
// 多controller实例时要进行leader选举,获取leader lock的实例才开始工作。startNonLeaderElectionRunnables和startLeaderElectionRunnables内在的工作方式本质无区别。
if cm.resourceLock != nil {
err := cm.startLeaderElection()
if err != nil {
return err
}
} else {
go cm.startLeaderElectionRunnables()
}
...
}
/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/internal.go:465
func (cm *controllerManager) startLeaderElectionRunnables() {
cm.mu.Lock()
defer cm.mu.Unlock()
// 重点关注这里的waitForCache
cm.waitForCache()
// Start the leader election Runnables after the cache has synced
for _, c := range cm.leaderElectionRunnables {
// Controllers block, but we want to return an error if any have an error starting.
// Write any Start errors to a channel so we can return them
ctrl := c
go func() {
if err := ctrl.Start(cm.internalStop); err != nil {
cm.errSignal.SignalError(err)
}
// we use %T here because we don't have a good stand-in for "name",
// and the full runnable might not serialize (mutexes, etc)
log.V(1).Info("leader-election runnable finished", "runnable type", fmt.Sprintf("%T", ctrl))
}()
}
cm.startedLeader = true
}
==> /Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/manager/internal.go:489
func (cm *controllerManager) waitForCache() {
if cm.started {
return
}
// Start the Cache. Allow the function to start the cache to be mocked out for testing
if cm.startCache == nil {
cm.startCache = cm.cache.Start
}
go func() {
if err := cm.startCache(cm.internalStop); err != nil {
cm.errSignal.SignalError(err)
}
}()
// Wait for the caches to sync.
// TODO(community): Check the return value and write a test
cm.cache.WaitForCacheSync(cm.internalStop)
cm.started = true
}
waitForCache的主要作用是启动Cache和等待Cache的首次同步完成。
启动cache的步骤则包括:创建FIFO queue、初始化informer、reflector、LocalStorage cache、index索引等。
启动controller
controller启动之后的工作模式分析之前的controller系列文章已经讲过很多次了,这里再快速回顾一遍。
/Users/ywq/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.5.0/pkg/internal/controller/controller.go:146
func (c *Controller) Start(stop <-chan struct{}) error {
...
// Launch workers to process resources
log.Info("Starting workers", "controller", c.Name, "worker count", c.MaxConcurrentReconciles)
for i := 0; i < c.MaxConcurrentReconciles; i++ {
// 多个workder,间隔一定时间(1s)工作一次
go wait.Until(c.worker, c.JitterPeriod, stop)
}
...
}
func (c *Controller) worker() {
for c.processNextWorkItem() {
}
}
func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.Queue.Get()
if shutdown {
// Stop working
return false
}
defer c.Queue.Done(obj)
return c.reconcileHandler(obj)
}
func (c *Controller) reconcileHandler(obj interface{}) bool {
...
// RunInformersAndControllers the syncHandler, passing it the namespace/Name string of the
// resource to be synced.
if result, err := c.Do.Reconcile(req); err != nil {
...
}
...
}
worker内部的最终逻辑回到了Reconcile方法,也即是需要在controllers/unit_controller.go:40
中的Reconcile()
自定义逻辑的方法,按照自定义的逻辑运行。
总结
如果之前看过内置资源的Controller的源码,对Controller工作方式有了解,那么理解Kubebuilder起来也是轻车熟路。
归纳下来,kubebuilder创建的controller做的事情也是跟文章最上面的流程图一样,只是kubebuilder经过了高度的封装后,便利程度到了仅需要实现Reconcile方法内部的逻辑即可,中间所有的流程都按照标准controller的运行方式替你包揽实现了。
本篇对kubebuilder的核心重点介绍到此结束,下一篇将开始正式介绍Unit CRD的设计思路,例如Unit会管理它的own resource,以及kubebuilder如何来帮助 授权管理own resource、同步Unit与own resource。
更多推荐
所有评论(0)