简介

之前介绍过sigs.k8s.io controller-runtime系列之二 manager分析sigs.k8s.io controller-runtime-manager
本文主要介绍pkg/controller的源码分析。

目录结构

  1. controller_suite_test.go 注册测试GVK 校验k8s环境 获取client config
    • 依赖ginkgo做集成测试,表示该文件夹内的测试例执行之前执行,
    • BeforeSuite和AfterSuite,会在所有测试例执行之前和之后执行
    • 如果BeforeSuite执行失败,则这个测试集都不会被执行
  2. controller.go
    • Manager接口 构建controller所需要的参数
    type Manager interface {
        // Cluster 提供了许多方法来操作集群
        cluster.Cluster
    
        // 该方法将设置manager得一些属性到实现了Runnable的结构体(实现了inject接口)中,并在mgr.Start启动时启动
        // 如果实现了Runnable的结构体也实现了LeaderElectionRunnable接口,将工作在leader election 模式下(被选举为leader后运行)
        // 如果实现了Runnable的结构体没有实现LeaderElectionRunnable接口,将工作在 non-leaderelection 模式下(一直运行)
    	Add(Runnable) error
    
        // 标识mgr是否为leader  closed状态为leader
        // Elected为关闭状态的两种情况:
        // 1.mgr成为leader
        // 2.该mgr再集群中不是多个的存在,不需要leader选举,one is leader
        Elected() <-chan struct{}
        
        // 添加除了defaultMetricsEndpoint以外的任意指标(path/handler)
        // 对于一些诊断性的端点可能十分有用,值得注意的是,这些端点大多是比较敏感的,最好不要公开暴漏
        // 对于简单的 path -> handler形式的映射方式不能满足需求, 我们可以提供一个server/listener形式的Runnable,调用Add方法添加到mgr
        AddMetricsExtraHandler(path string, handler http.Handler) error
        
        // 添加一个健康检查
        AddHealthzCheck(name string, check healthz.Checker) error
        
        // 添加一个就绪检查
        AddReadyzCheck(name string, check healthz.Checker) error
        
        // 启动所有已注册的控制器并阻塞,直到取消context为止
        // 如果启动任何控制器时出错,则返回错误
        // 如果开启了选举, 如果该mgr的leader lock丢失,那么处理程序必须立即退出,
        // 否则组件再失去leader lock的时候还会继续运行
        Start(ctx context.Context) error
        
        // 获取webhook server
        GetWebhookServer() *webhook.Server
        
        // 获取该mgr的log
        GetLogger() logr.Logger
        
        // 返回全局的controller options
        GetControllerOptions() v1alpha1.ControllerConfigurationSpec
    }
    
    • Options结构体 创建mgr的参数 提供创建mgr选填的参数
    type Options struct {
    	// 同时Reconciles的最大个数. 默认是 1.
    	MaxConcurrentReconciles int
    
    	// 实现了Reconciler接口的对象,用来reconcile需要最终达到的状态
    	Reconciler reconcile.Reconciler
    
    	// 用于限制请求入队的频率。
    	// 默认为MaxOfRateLimiter,同时具有整体(BucketRateLimiter)和逐项速率限制(ItemExponentialFailureRateLimiter 间隔 baseDelay*2^<num-failures>)。
    	// 整体是一个令牌桶,每项是指数的.
    	RateLimiter ratelimiter.RateLimiter
    
    	// 日志是用于此控制器的记录器,并通过上下文字段传递给每个reconcile request
    	Log logr.Logger
    
    	// 等待同步缓存的时间限制.默认为 2 分钟.
    	CacheSyncTimeout time.Duration
    }
    
    • Controller接口 管理来自source.Sources的请求并协调一致的工作队列,以使系统状态与对象Spec中指定的状态相匹配。
    type Controller interface {
    	// 通过协调对象的名称空间/名称调用
    	reconcile.Reconciler
    
    	// Watch接受Source提供的事件,并使用EventHandler将reconcile.Request放入队列以响应事件
    	// 可以通过predicates来过滤事件.
    	Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error
    
    	// 开启controller.
    	Start(ctx context.Context) error
    
    	// 返回此控制器logger
    	GetLogger() logr.Logger
    }
    
    • New函数 使用mgr生成新控制器。 mgr将确保在启动控制器之前已同步共享的缓存
    func New(name string, mgr manager.Manager, options Options) (Controller, error) {
    	c, err := NewUnmanaged(name, mgr, options)
    	if err != nil {
    		return nil, err
    	}
    
    	// 将控制器添加为Manager组件(可以参考 manager篇)
    	return c, mgr.Add(c)
    }
    
    // 返回一个新的还未添加到管理器中的控制器. 调用者负责启动返回的控制器mgr.Start.
    func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {
    	if options.Reconciler == nil {
    		return nil, fmt.Errorf("must specify Reconciler")
    	}
    
    	if len(name) == 0 {
    		return nil, fmt.Errorf("must specify Name for Controller")
    	}
    
    	if options.Log == nil {
    		options.Log = mgr.GetLogger()
    	}
    
        // 如果最大同时reconcile数小于等于0,则设置为1,也是默认值
    	if options.MaxConcurrentReconciles <= 0 {
    		options.MaxConcurrentReconciles = 1
    	}
        // 如果同步cache时间等于0,则设置为2分钟,也是默认值
    	if options.CacheSyncTimeout == 0 {
    		options.CacheSyncTimeout = 2 * time.Minute
    	}
        // 如果入队速率限制器为空,则设置为MaxOfRateLimiter,也是默认值
    	if options.RateLimiter == nil {
    		options.RateLimiter = workqueue.DefaultControllerRateLimiter()
    	}
    
    	// mgr 注入属性到 Reconciler
    	if err := mgr.SetFields(options.Reconciler); err != nil {
    		return nil, err
    	}
    
    	// 创建一个controller
    	return &controller.Controller{
    		Do: options.Reconciler,
    		MakeQueue: func() workqueue.RateLimitingInterface {
    			return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
    		},
    		MaxConcurrentReconciles: options.MaxConcurrentReconciles,
    		CacheSyncTimeout:        options.CacheSyncTimeout,
    		SetFields:               mgr.SetFields,
    		Name:                    name,
    		Log:                     options.Log.WithName("controller").WithName(name),
    	}, nil
    }
    
  3. controller_test.go 测试操作controller的文件
  4. doc.go 暂时没用,做一个解释文档
  5. example_test.go 测试例子
  6. controller_integration_test.go 集成测试例子
  7. controllerutil包中的controllerutil.go 一般用在reconcile中
    • AlreadyOwnedError 结构体 如果您要分配ownerReferences的对象中已有Controller=true的ref,
      那么AlreadyOwnedError将返回错误
    type AlreadyOwnedError struct {
    	Object metav1.Object
    	Owner  metav1.OwnerReference
    }
    
    • SetControllerReference 函数
    func SetControllerReference(owner, controlled metav1.Object, scheme *runtime.Scheme) error {
    	// 验证owner是否实现了runtime.Object.
    	ro, ok := owner.(runtime.Object)
    	if !ok {
    		return fmt.Errorf("%T is not a runtime.Object, cannot call SetControllerReference", owner)
    	}
     // 验证owner和object的namespace,有两种情况会产生异常:
     // 1.owner的namespace不为空,object的namespace为空
     // 2.owner的namespace不为空,object的namespace不等于owner的namespace
    	if err := validateOwner(owner, controlled); err != nil {
    		return err
    	}
    
    	// 根据scheme和controlled对应的runtime.object 获取gvk
    	gvk, err := apiutil.GVKForObject(ro, scheme)
    	if err != nil {
    		return err
    	}
     // 构造一个OwnerReference,设置Controller和BlockOwnerDeletion为true
    	ref := metav1.OwnerReference{
    		APIVersion:         gvk.GroupVersion().String(),
    		Kind:               gvk.Kind,
    		Name:               owner.GetName(),
    		UID:                owner.GetUID(),
    		BlockOwnerDeletion: pointer.BoolPtr(true),
    		Controller:         pointer.BoolPtr(true),
    	}
    
    	// 如果controlled的OwnerReferences中包含了Controller=true并且其对应的gvk和上面构造的gvk不一致,抛异常
    	if existing := metav1.GetControllerOf(controlled); existing != nil && !referSameObject(*existing, ref) {
    		return newAlreadyOwnedError(controlled, *existing)
    	}
    
    	// 更新controlled对应的OwnerReferences中Controller=true并且其对应的gvk和ref的gvk一致的引用
    	upsertOwnerRef(ref, controlled)
    	return nil
    }
    
    • SetOwnerReference 函数
      如果已经存在对同一对象的引用,那么它将被新提供的版本覆盖。
      该方法使您可以声明owner对object具有依赖关系,而无需将其指定为控制器(Controller=true)
      func SetOwnerReference(owner, object metav1.Object, scheme *runtime.Scheme) error {
      	// 验证owner是否实现了runtime.Object.
      	ro, ok := owner.(runtime.Object)
      	if !ok {
      		return fmt.Errorf("%T is not a runtime.Object, cannot call SetOwnerReference", owner)
      	}
        // 验证owner和object的namespace,有两种情况会产生异常:
        // 1.owner的namespace不为空,object的namespace为空
        // 2.owner的namespace不为空,object的namespace不等于owner的namespace
      	if err := validateOwner(owner, object); err != nil {
      		return err
      	}
      
      	// 根据scheme和controlled对应的runtime.object 获取gvk
      	gvk, err := apiutil.GVKForObject(ro, scheme)
      	if err != nil {
      		return err
      	}
        // 构造一个OwnerReference,无需设置Controller和BlockOwnerDeletion
      	ref := metav1.OwnerReference{
      		APIVersion: gvk.GroupVersion().String(),
      		Kind:       gvk.Kind,
      		UID:        owner.GetUID(),
      		Name:       owner.GetName(),
      	}
      
      	// 更新controlled对应的OwnerReferences中Controller=true并且其对应的gvk和ref的gvk一致的引用
      	upsertOwnerRef(ref, object)
      	return nil
      
      }
      
      • CreateOrUpdate 函数 在Kubernetes集群中创建或更新给定对象。
        该obj reconciled的所需状态必须使用回调MutateFn中存在的state。
        无论创建还是更新对象,都将调用MutateFn
      func CreateOrUpdate(ctx context.Context, c client.Client, obj client.Object, f MutateFn) (OperationResult, error) {
      	// 获取obj的namespace和name所组成的object
         key := client.ObjectKeyFromObject(obj)
      	if err := c.Get(ctx, key, obj); err != nil {
             // 不存在,新建
      		if !errors.IsNotFound(err) {
      			return OperationResultNone, err
      		}
      		if err := mutate(f, key, obj); err != nil {
      			return OperationResultNone, err
      		}
      		if err := c.Create(ctx, obj); err != nil {
      			return OperationResultNone, err
      		}
      		return OperationResultCreated, nil
      	}
         // 存在该obj 深拷贝对应的obj
      	existing := obj.DeepCopyObject()
      	if err := mutate(f, key, obj); err != nil {
      		return OperationResultNone, err
      	}
         // 如果mutate后 DeepEqual相等 则表示没有改变
      	if equality.Semantic.DeepEqual(existing, obj) {
      		return OperationResultNone, nil
      	}
         // mutate后  status改变了  update
      	if err := c.Update(ctx, obj); err != nil {
      		return OperationResultNone, err
      	}
      	return OperationResultUpdated, nil
      }
      
      • CreateOrPatch函数 在Kubernetes集群中创建或修补给定对象
        该obj reconciled的所需状态必须使用回调MutateFn中存在的state。
        无论创建还是更新对象,都将调用MutateFn
      func CreateOrPatch(ctx context.Context, c client.Client, obj client.Object, f MutateFn) (OperationResult, error) {
      	// 获取obj的namespace和name所组成的object
         key := client.ObjectKeyFromObject(obj)
      	if err := c.Get(ctx, key, obj); err != nil {
             // 不存在,新建
      		if !errors.IsNotFound(err) {
      			return OperationResultNone, err
      		}
      		if f != nil {
      			if err := mutate(f, key, obj); err != nil {
      				return OperationResultNone, err
      			}
      		}
      		if err := c.Create(ctx, obj); err != nil {
      			return OperationResultNone, err
      		}
      		return OperationResultCreated, nil
      	}
      
      	// 为对象及其可能的状态创建补丁。
      	objPatch := client.MergeFrom(obj.DeepCopyObject().(client.Object))
      	statusPatch := client.MergeFrom(obj.DeepCopyObject().(client.Object))
      
      	// 创建原始对象的副本,并将该副本转换为非结构化数据(即map[string]interface{}  属性为key 值为value)
         // ToUnstructured : 如果obj实现了Unstructured接口则直接obj.(Unstructured),否则使用反射转化
      	before, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj.DeepCopyObject())
      	if err != nil {
      		return OperationResultNone, err
      	}
      
      	// 尝试从资源中提取status,以便以后进行比较
         // NestedFieldCopy: 参数fields,嵌套字段,从before中嵌套获取filed的value
      	beforeStatus, hasBeforeStatus, err := unstructured.NestedFieldCopy(before, "status")
      	if err != nil {
      		return OperationResultNone, err
      	}
      
      	// 如果资源包含状态,则将其从非结构化副本中删除,以避免以后不必要的修补.
      	if hasBeforeStatus {
      		unstructured.RemoveNestedField(before, "status")
      	}
      
      	// 修改obj的status.
      	if f != nil {
      		if err := mutate(f, key, obj); err != nil {
      			return OperationResultNone, err
      		}
      	}
      
      	// 将资源转换为非结构化资源,以与我们之前的副本进行比较。
      	after, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
      	if err != nil {
      		return OperationResultNone, err
      	}
      
      	// 尝试从资源中提取status,以便以后进行比较
      	afterStatus, hasAfterStatus, err := unstructured.NestedFieldCopy(after, "status")
      	if err != nil {
      		return OperationResultNone, err
      	}
      
      	// 如果资源包含状态,则将其从非结构化副本中删除,以避免以后不必要的修补(只update resource  因为status是子资源).
      	if hasAfterStatus {
      		unstructured.RemoveNestedField(after, "status")
      	}
      
      	result := OperationResultNone
      
         // 如果before和after 不相等
      	if !reflect.DeepEqual(before, after) {
      		// 仅仅更新resource(不包含status)
      		if err := c.Patch(ctx, obj, objPatch); err != nil {
      			return result, err
      		}
      		result = OperationResultUpdated
      	}
      
         // 如果befor或者after都有status字段,并且 beforeStatus和afterStatus不相等
      	if (hasBeforeStatus || hasAfterStatus) && !reflect.DeepEqual(beforeStatus, afterStatus) {
      		// 仅仅更新status字段
      		if err := c.Status().Patch(ctx, obj, statusPatch); err != nil {
      			return result, err
      		}
             // 如果之前更新了resource,那么result跟新为  及更新了resource又更新了status
      		if result == OperationResultUpdated {
      			result = OperationResultUpdatedStatus
      		} else {
      			result = OperationResultUpdatedStatusOnly
      		}
      	}
      	return result, nil
      }
      
      • MutateFn函数结构体 将现有对象的status更改为所需状态。
      type MutateFn func() error
      
      • AddFinalizer函数 给obj添加一个finalizer(obj的Finalizers属性中不存在)
      • RemoveFinalizer函数 删除obj中Finalizers中的finalizer(obj的Finalizers属性中存在)
      • ContainsFinalizer函数 判断obj中Finalizers中是否存在finalizer
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐