sigs.k8s.io controller-runtime系列之三 controller分析
简介之前介绍过sigs.k8s.io controller-runtime系列之二manager分析sigs.k8s.io controller-runtime-manager 。本文主要介绍pkg/controller的源码分析。目录结构controller_suite_test.go 注册测试GVK 校验k8s环境获取client config依赖ginkgo做集成测试,表示该文件夹内的测试例
·
简介
之前介绍过sigs.k8s.io controller-runtime系列之二 manager分析sigs.k8s.io controller-runtime-manager 。
本文主要介绍pkg/controller的源码分析。
目录结构
- controller_suite_test.go 注册测试GVK 校验k8s环境 获取client config
- 依赖ginkgo做集成测试,表示该文件夹内的测试例执行之前执行,
- BeforeSuite和AfterSuite,会在所有测试例执行之前和之后执行
- 如果BeforeSuite执行失败,则这个测试集都不会被执行
- controller.go
- Manager接口 构建controller所需要的参数
type Manager interface { // Cluster 提供了许多方法来操作集群 cluster.Cluster // 该方法将设置manager得一些属性到实现了Runnable的结构体(实现了inject接口)中,并在mgr.Start启动时启动 // 如果实现了Runnable的结构体也实现了LeaderElectionRunnable接口,将工作在leader election 模式下(被选举为leader后运行) // 如果实现了Runnable的结构体没有实现LeaderElectionRunnable接口,将工作在 non-leaderelection 模式下(一直运行) Add(Runnable) error // 标识mgr是否为leader closed状态为leader // Elected为关闭状态的两种情况: // 1.mgr成为leader // 2.该mgr再集群中不是多个的存在,不需要leader选举,one is leader Elected() <-chan struct{} // 添加除了defaultMetricsEndpoint以外的任意指标(path/handler) // 对于一些诊断性的端点可能十分有用,值得注意的是,这些端点大多是比较敏感的,最好不要公开暴漏 // 对于简单的 path -> handler形式的映射方式不能满足需求, 我们可以提供一个server/listener形式的Runnable,调用Add方法添加到mgr AddMetricsExtraHandler(path string, handler http.Handler) error // 添加一个健康检查 AddHealthzCheck(name string, check healthz.Checker) error // 添加一个就绪检查 AddReadyzCheck(name string, check healthz.Checker) error // 启动所有已注册的控制器并阻塞,直到取消context为止 // 如果启动任何控制器时出错,则返回错误 // 如果开启了选举, 如果该mgr的leader lock丢失,那么处理程序必须立即退出, // 否则组件再失去leader lock的时候还会继续运行 Start(ctx context.Context) error // 获取webhook server GetWebhookServer() *webhook.Server // 获取该mgr的log GetLogger() logr.Logger // 返回全局的controller options GetControllerOptions() v1alpha1.ControllerConfigurationSpec }
- Options结构体 创建mgr的参数 提供创建mgr选填的参数
type Options struct { // 同时Reconciles的最大个数. 默认是 1. MaxConcurrentReconciles int // 实现了Reconciler接口的对象,用来reconcile需要最终达到的状态 Reconciler reconcile.Reconciler // 用于限制请求入队的频率。 // 默认为MaxOfRateLimiter,同时具有整体(BucketRateLimiter)和逐项速率限制(ItemExponentialFailureRateLimiter 间隔 baseDelay*2^<num-failures>)。 // 整体是一个令牌桶,每项是指数的. RateLimiter ratelimiter.RateLimiter // 日志是用于此控制器的记录器,并通过上下文字段传递给每个reconcile request Log logr.Logger // 等待同步缓存的时间限制.默认为 2 分钟. CacheSyncTimeout time.Duration }
- Controller接口 管理来自source.Sources的请求并协调一致的工作队列,以使系统状态与对象Spec中指定的状态相匹配。
type Controller interface { // 通过协调对象的名称空间/名称调用 reconcile.Reconciler // Watch接受Source提供的事件,并使用EventHandler将reconcile.Request放入队列以响应事件 // 可以通过predicates来过滤事件. Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error // 开启controller. Start(ctx context.Context) error // 返回此控制器logger GetLogger() logr.Logger }
- New函数 使用mgr生成新控制器。 mgr将确保在启动控制器之前已同步共享的缓存
func New(name string, mgr manager.Manager, options Options) (Controller, error) { c, err := NewUnmanaged(name, mgr, options) if err != nil { return nil, err } // 将控制器添加为Manager组件(可以参考 manager篇) return c, mgr.Add(c) } // 返回一个新的还未添加到管理器中的控制器. 调用者负责启动返回的控制器mgr.Start. func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) { if options.Reconciler == nil { return nil, fmt.Errorf("must specify Reconciler") } if len(name) == 0 { return nil, fmt.Errorf("must specify Name for Controller") } if options.Log == nil { options.Log = mgr.GetLogger() } // 如果最大同时reconcile数小于等于0,则设置为1,也是默认值 if options.MaxConcurrentReconciles <= 0 { options.MaxConcurrentReconciles = 1 } // 如果同步cache时间等于0,则设置为2分钟,也是默认值 if options.CacheSyncTimeout == 0 { options.CacheSyncTimeout = 2 * time.Minute } // 如果入队速率限制器为空,则设置为MaxOfRateLimiter,也是默认值 if options.RateLimiter == nil { options.RateLimiter = workqueue.DefaultControllerRateLimiter() } // mgr 注入属性到 Reconciler if err := mgr.SetFields(options.Reconciler); err != nil { return nil, err } // 创建一个controller return &controller.Controller{ Do: options.Reconciler, MakeQueue: func() workqueue.RateLimitingInterface { return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name) }, MaxConcurrentReconciles: options.MaxConcurrentReconciles, CacheSyncTimeout: options.CacheSyncTimeout, SetFields: mgr.SetFields, Name: name, Log: options.Log.WithName("controller").WithName(name), }, nil }
- controller_test.go 测试操作controller的文件
- doc.go 暂时没用,做一个解释文档
- example_test.go 测试例子
- controller_integration_test.go 集成测试例子
- controllerutil包中的controllerutil.go 一般用在reconcile中
- AlreadyOwnedError 结构体 如果您要分配ownerReferences的对象中已有Controller=true的ref,
那么AlreadyOwnedError将返回错误
type AlreadyOwnedError struct { Object metav1.Object Owner metav1.OwnerReference }
- SetControllerReference 函数
func SetControllerReference(owner, controlled metav1.Object, scheme *runtime.Scheme) error { // 验证owner是否实现了runtime.Object. ro, ok := owner.(runtime.Object) if !ok { return fmt.Errorf("%T is not a runtime.Object, cannot call SetControllerReference", owner) } // 验证owner和object的namespace,有两种情况会产生异常: // 1.owner的namespace不为空,object的namespace为空 // 2.owner的namespace不为空,object的namespace不等于owner的namespace if err := validateOwner(owner, controlled); err != nil { return err } // 根据scheme和controlled对应的runtime.object 获取gvk gvk, err := apiutil.GVKForObject(ro, scheme) if err != nil { return err } // 构造一个OwnerReference,设置Controller和BlockOwnerDeletion为true ref := metav1.OwnerReference{ APIVersion: gvk.GroupVersion().String(), Kind: gvk.Kind, Name: owner.GetName(), UID: owner.GetUID(), BlockOwnerDeletion: pointer.BoolPtr(true), Controller: pointer.BoolPtr(true), } // 如果controlled的OwnerReferences中包含了Controller=true并且其对应的gvk和上面构造的gvk不一致,抛异常 if existing := metav1.GetControllerOf(controlled); existing != nil && !referSameObject(*existing, ref) { return newAlreadyOwnedError(controlled, *existing) } // 更新controlled对应的OwnerReferences中Controller=true并且其对应的gvk和ref的gvk一致的引用 upsertOwnerRef(ref, controlled) return nil }
- SetOwnerReference 函数
如果已经存在对同一对象的引用,那么它将被新提供的版本覆盖。
该方法使您可以声明owner对object具有依赖关系,而无需将其指定为控制器(Controller=true)func SetOwnerReference(owner, object metav1.Object, scheme *runtime.Scheme) error { // 验证owner是否实现了runtime.Object. ro, ok := owner.(runtime.Object) if !ok { return fmt.Errorf("%T is not a runtime.Object, cannot call SetOwnerReference", owner) } // 验证owner和object的namespace,有两种情况会产生异常: // 1.owner的namespace不为空,object的namespace为空 // 2.owner的namespace不为空,object的namespace不等于owner的namespace if err := validateOwner(owner, object); err != nil { return err } // 根据scheme和controlled对应的runtime.object 获取gvk gvk, err := apiutil.GVKForObject(ro, scheme) if err != nil { return err } // 构造一个OwnerReference,无需设置Controller和BlockOwnerDeletion ref := metav1.OwnerReference{ APIVersion: gvk.GroupVersion().String(), Kind: gvk.Kind, UID: owner.GetUID(), Name: owner.GetName(), } // 更新controlled对应的OwnerReferences中Controller=true并且其对应的gvk和ref的gvk一致的引用 upsertOwnerRef(ref, object) return nil }
- CreateOrUpdate 函数 在Kubernetes集群中创建或更新给定对象。
该obj reconciled的所需状态必须使用回调MutateFn中存在的state。
无论创建还是更新对象,都将调用MutateFn
func CreateOrUpdate(ctx context.Context, c client.Client, obj client.Object, f MutateFn) (OperationResult, error) { // 获取obj的namespace和name所组成的object key := client.ObjectKeyFromObject(obj) if err := c.Get(ctx, key, obj); err != nil { // 不存在,新建 if !errors.IsNotFound(err) { return OperationResultNone, err } if err := mutate(f, key, obj); err != nil { return OperationResultNone, err } if err := c.Create(ctx, obj); err != nil { return OperationResultNone, err } return OperationResultCreated, nil } // 存在该obj 深拷贝对应的obj existing := obj.DeepCopyObject() if err := mutate(f, key, obj); err != nil { return OperationResultNone, err } // 如果mutate后 DeepEqual相等 则表示没有改变 if equality.Semantic.DeepEqual(existing, obj) { return OperationResultNone, nil } // mutate后 status改变了 update if err := c.Update(ctx, obj); err != nil { return OperationResultNone, err } return OperationResultUpdated, nil }
- CreateOrPatch函数 在Kubernetes集群中创建或修补给定对象
该obj reconciled的所需状态必须使用回调MutateFn中存在的state。
无论创建还是更新对象,都将调用MutateFn
func CreateOrPatch(ctx context.Context, c client.Client, obj client.Object, f MutateFn) (OperationResult, error) { // 获取obj的namespace和name所组成的object key := client.ObjectKeyFromObject(obj) if err := c.Get(ctx, key, obj); err != nil { // 不存在,新建 if !errors.IsNotFound(err) { return OperationResultNone, err } if f != nil { if err := mutate(f, key, obj); err != nil { return OperationResultNone, err } } if err := c.Create(ctx, obj); err != nil { return OperationResultNone, err } return OperationResultCreated, nil } // 为对象及其可能的状态创建补丁。 objPatch := client.MergeFrom(obj.DeepCopyObject().(client.Object)) statusPatch := client.MergeFrom(obj.DeepCopyObject().(client.Object)) // 创建原始对象的副本,并将该副本转换为非结构化数据(即map[string]interface{} 属性为key 值为value) // ToUnstructured : 如果obj实现了Unstructured接口则直接obj.(Unstructured),否则使用反射转化 before, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj.DeepCopyObject()) if err != nil { return OperationResultNone, err } // 尝试从资源中提取status,以便以后进行比较 // NestedFieldCopy: 参数fields,嵌套字段,从before中嵌套获取filed的value beforeStatus, hasBeforeStatus, err := unstructured.NestedFieldCopy(before, "status") if err != nil { return OperationResultNone, err } // 如果资源包含状态,则将其从非结构化副本中删除,以避免以后不必要的修补. if hasBeforeStatus { unstructured.RemoveNestedField(before, "status") } // 修改obj的status. if f != nil { if err := mutate(f, key, obj); err != nil { return OperationResultNone, err } } // 将资源转换为非结构化资源,以与我们之前的副本进行比较。 after, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) if err != nil { return OperationResultNone, err } // 尝试从资源中提取status,以便以后进行比较 afterStatus, hasAfterStatus, err := unstructured.NestedFieldCopy(after, "status") if err != nil { return OperationResultNone, err } // 如果资源包含状态,则将其从非结构化副本中删除,以避免以后不必要的修补(只update resource 因为status是子资源). if hasAfterStatus { unstructured.RemoveNestedField(after, "status") } result := OperationResultNone // 如果before和after 不相等 if !reflect.DeepEqual(before, after) { // 仅仅更新resource(不包含status) if err := c.Patch(ctx, obj, objPatch); err != nil { return result, err } result = OperationResultUpdated } // 如果befor或者after都有status字段,并且 beforeStatus和afterStatus不相等 if (hasBeforeStatus || hasAfterStatus) && !reflect.DeepEqual(beforeStatus, afterStatus) { // 仅仅更新status字段 if err := c.Status().Patch(ctx, obj, statusPatch); err != nil { return result, err } // 如果之前更新了resource,那么result跟新为 及更新了resource又更新了status if result == OperationResultUpdated { result = OperationResultUpdatedStatus } else { result = OperationResultUpdatedStatusOnly } } return result, nil }
- MutateFn函数结构体 将现有对象的status更改为所需状态。
type MutateFn func() error
- AddFinalizer函数 给obj添加一个finalizer(obj的Finalizers属性中不存在)
- RemoveFinalizer函数 删除obj中Finalizers中的finalizer(obj的Finalizers属性中存在)
- ContainsFinalizer函数 判断obj中Finalizers中是否存在finalizer
- CreateOrUpdate 函数 在Kubernetes集群中创建或更新给定对象。
- AlreadyOwnedError 结构体 如果您要分配ownerReferences的对象中已有Controller=true的ref,
更多推荐
已为社区贡献15条内容
所有评论(0)