简介

之前介绍过controller-runtime总览sigs.k8s.io controller-runtime总览
本文主要介绍pkg/builder的源码分析。

目录结构

  1. builder_suite_test.go 校验k8s环境 获取client config

    • 依赖ginkgo做集成测试,表示该文件夹内的测试例执行之前执行,
    • BeforeSuite和AfterSuite,会在所有测试例执行之前和之后执行
    • 如果BeforeSuite执行失败,则这个测试集都不会被执行
  2. controller.go

    • Builder结构体 构建controller所需要的参数
    type Builder struct {
    forInput         ForInput //封装实现了runtime.Object的结构体
    ownsInput        []OwnsInput // 封装了apiType的owner,用于EnqueueRequestForOwner使用
    watchesInput     []WatchesInput // 封装type和evenHandler,一般使用owns和For代替,如果想扩展,可以使用
    mgr              manager.Manager // mgr  主要使用内部的参数构造controller
    globalPredicates []predicate.Predicate // 全局过滤事件
    ctrl             controller.Controller // 构造的controller
    ctrlOptions      controller.Options // 构造的controller 的参数包括最大并发reconcile数和reconciler
    name             string //默认是对应apiType的小写形式,符合普罗米修斯的命名格式
    }
    
    • ControllerManagedBy函数 根据提供的mgr生成一个新的builder
    func ControllerManagedBy(m manager.Manager) *Builder {
    	return &Builder{mgr: m}
    }
    
    • ForInput结构体 作为For方法的入参 (reconcile的object)
    type ForInput struct {
    object           client.Object //封装实现了runtime.Object的结构体
    predicates       []predicate.Predicate //过滤事件
    objectProjection objectProjection //标识我们可以发送/接收一个给定的resource(metadata-only, unstructured, etc)
    err              error
    }
    
    • blder For方法 新建forInput,设置forInput的predicates并给blder的forInput赋值
    func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder {
    	if blder.forInput.object != nil {
    		blder.forInput.err = fmt.Errorf("For(...) should only be called once, could not assign multiple objects for reconciliation")
    		return blder
    	}
    	input := ForInput{object: object}
    	for _, opt := range opts {
    		opt.ApplyToFor(&input)
    	}
    
    	blder.forInput = input
    	return blder
    }
    
    • OwnsInput结构体 作为Owns方法的入参 (ForInput的owner的object)
      type OwnsInput struct {
      	object           client.Object //封装实现了runtime.Object的结构体
      	predicates       []predicate.Predicate //过滤事件
      	objectProjection objectProjection //标识我们可以发送/接收一个给定的resource(metadata-only, unstructured, etc)
      }
      
    • blder Owns方法 给blder的ownsInput追加对象,具体看blder.ownsInput的作用
    func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {
    	input := OwnsInput{object: object}
    	for _, opt := range opts {
    		opt.ApplyToOwns(&input)
    	}
    
    	blder.ownsInput = append(blder.ownsInput, input)
    	return blder
    }
    
    • watchRequest结构体 封装watch对象和事件处理器
    type watchRequest struct {
    	src          source.Source
    	eventhandler handler.EventHandler
        predicates       []predicate.Predicate
        objectProjection objectProjection
    }
    
    • blder Watches方法 给blder.watchesInput追加WatchesInput
    func (blder *Builder) Watches(src source.Source, eventhandler handler.EventHandler, opts ...WatchesOption) *Builder {
    	input := WatchesInput{src: src, eventhandler: eventhandler}
    	for _, opt := range opts {
    		opt.ApplyToWatches(&input)
    	}
    
    	blder.watchesInput = append(blder.watchesInput, input)
    	return blder
    }
    
    • blder WithOptions方法 设置controller的Options
    • blder WithLogger方法 设置controller的Options.Log
    • blder WithEventFilter方法 给blder.globalPredicates追加过滤事件对象
    func (blder *Builder) WithEventFilter(p predicate.Predicate) *Builder {
    	blder.globalPredicates = append(blder.globalPredicates, p)
    	return blder
    }
    
    • blder Named方法 给blder.name赋值
    • blder Complete方法 注意了 ,这个是核心方法,根据提供的reconciler生成blder.ctrl开启ctrl.watch
        func (blder *Builder) Complete(r reconcile.Reconciler) error {
            //调用Build
        	_, err := blder.Build(r)
        	return err
        }
        
        func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
        	if r == nil {
        		return nil, fmt.Errorf("must provide a non-nil Reconciler")
        	}
        	if blder.mgr == nil {
        		return nil, fmt.Errorf("must provide a non-nil Manager")
        	}
        	if blder.forInput.err != nil {
        		return nil, blder.forInput.err
        	}
        	// 校验要reconcile的对象是否存在
        	if blder.forInput.object == nil {
        		return nil, fmt.Errorf("must provide an object for reconciliation")
        	}
        
        	// 创建controller赋值给blder.ctrl
        	if err := blder.doController(r); err != nil {
        		return nil, err
        	}
        
        	// 开启ctrl.watch(包括owner和apiType)
        	if err := blder.doWatch(); err != nil {
        		return nil, err
        	}
        
        	return blder.ctrl, nil
        }
    
        func (blder *Builder) doWatch() error {
        	// Reconcile 对应的apitype
        	typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
        	if err != nil {
        		return err
        	}
        	src := &source.Kind{Type: typeForSrc}
        	hdler := &handler.EnqueueRequestForObject{}
        	allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
        	if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
        		return err
        	}
        
        	// 监听owns对应的type
        	for _, own := range blder.ownsInput {
        		typeForSrc, err := blder.project(own.object, own.objectProjection)
        		if err != nil {
        			return err
        		}
        		src := &source.Kind{Type: typeForSrc}
        		hdler := &handler.EnqueueRequestForOwner{
        			OwnerType:    blder.forInput.object,
        			IsController: true,
        		}
        		allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
        		allPredicates = append(allPredicates, own.predicates...)
        		if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
        			return err
        		}
        	}
        
        	// 监听watchs对应的source
        	for _, w := range blder.watchesInput {
        		allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
        		allPredicates = append(allPredicates, w.predicates...)
        
        		// If the source of this watch is of type *source.Kind, project it.
        		if srckind, ok := w.src.(*source.Kind); ok {
        			typeForSrc, err := blder.project(srckind.Type, w.objectProjection)
        			if err != nil {
        				return err
        			}
        			srckind.Type = typeForSrc
        		}
        
        		if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {
        			return err
        		}
        	}
        	return nil
        }
    
        func (blder *Builder) doController(r reconcile.Reconciler) error {
            // 获取全局的opts
        	globalOpts := blder.mgr.GetControllerOptions()
        
            // 获取controller的opts
        	ctrlOptions := blder.ctrlOptions
            // 设置reconciler
        	if ctrlOptions.Reconciler == nil {
        		ctrlOptions.Reconciler = r
        	}
        
        	// 重新获取我们将要 reconciling的GVK
        	gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme())
        	if err != nil {
        		return err
        	}
        
        	// 设置最大并发Reconcile的数目
        	if ctrlOptions.MaxConcurrentReconciles == 0 {
        		groupKind := gvk.GroupKind().String()
                // 如果获取当前的reconcile数大于0,就设置为当前的
        		if concurrency, ok := globalOpts.GroupKindConcurrency[groupKind]; ok && concurrency > 0 {
        			ctrlOptions.MaxConcurrentReconciles = concurrency
        		}
        	}
        
        	// 设置cache同步过期时间
        	if ctrlOptions.CacheSyncTimeout == 0 && globalOpts.CacheSyncTimeout != nil {
        		ctrlOptions.CacheSyncTimeout = *globalOpts.CacheSyncTimeout
        	}
        
        	// 设置controller的log
        	if ctrlOptions.Log == nil {
        		ctrlOptions.Log = blder.mgr.GetLogger()
        	}
        	ctrlOptions.Log = ctrlOptions.Log.WithValues("reconciler group", gvk.Group, "reconciler kind", gvk.Kind)
        
        	//构造controller  调用pkg/controller的New方法
        	blder.ctrl, err = newController(blder.getControllerName(gvk), blder.mgr, ctrlOptions)
        	return err
        }
    
  3. controller_test.go 测试操作builder的文件

  4. doc.go 提供一个log

  5. example_test.go 基于ReplicaSet的测试例子

  6. example_webhook_test.go 此示例使用Webhook构建器创建一个简单的Webhook,该Webhook由CRD ChaosPod的mgr进行管理。

  7. options.go 为controller提供设置Option的文件(设置Predicates和objectProjection)

  8. webhook.go

    • WebhookBuilder结构体 构建webhook所需要的参数
    type WebhookBuilder struct {
    	apiType runtime.Object // 钩子触发的apiType(实现了runtime.Object的结构体)
    	gvk     schema.GroupVersionKind // 钩子触发的apiType的gvk
    	mgr     manager.Manager // mgr提供构建webhook所需的属性
    	config  *rest.Config // k8s的client配置
    }
    
    • WebhookManagedBy函数 根据提供的mgr生成一个新的webhookBuilder
    func WebhookManagedBy(m manager.Manager) *WebhookBuilder {
      return &WebhookBuilder{mgr: m}
    }
    
    • blder For方法 给blder的apiType赋值
    func (blder *WebhookBuilder) For(apiType runtime.Object) *WebhookBuilder {
      blder.apiType = apiType
      return blder
    }
    
    • blder Complete方法 注意了 ,这个是核心方法,为apiType对应的GVK注册DefaultingWebhook(改变)、ValidatingWebhook(验证)和ConversionWebhook(version转换)
    func (blder *WebhookBuilder) Complete() error {
    	// 加载client config
    	blder.loadRestConfig()
    
    	// 注册需要的webhook
    	return blder.registerWebhooks()
    }
    
    func (blder *WebhookBuilder) registerWebhooks() error {
    	// 获取gvk 
    	var err error
    	blder.gvk, err = apiutil.GVKForObject(blder.apiType, blder.mgr.GetScheme())
    	if err != nil {
    		return err
    	}
    
        // 注册mutaing webhook
    	blder.registerDefaultingWebhook()
    	
    	// 注册validate webhook
    	blder.registerValidatingWebhook()
    
        // 注册conversion webhook
    	err = blder.registerConversionWebhook()
    	if err != nil {
    		return err
    	}
    	return nil
    }
    
    func (blder *WebhookBuilder) registerDefaultingWebhook() {
        //类型转换
    	defaulter, isDefaulter := blder.apiType.(admission.Defaulter)
    	if !isDefaulter {
    		log.Info("skip registering a mutating webhook, admission.Defaulter interface is not implemented", "GVK", blder.gvk)
    		return
    	}
    	//根据default validate使用pkg/admission获取mutaing webhook
    	mwh := admission.DefaultingWebhookFor(defaulter)
    	if mwh != nil {
    	    // 生成mutate path
    		path := generateMutatePath(blder.gvk)
    
    		// 校验该path是否已经注册
    		// 如果true,跳过
    		if !blder.isAlreadyHandled(path) {
    			log.Info("Registering a mutating webhook",
    				"GVK", blder.gvk,
    				"path", path)
    			blder.mgr.GetWebhookServer().Register(path, mwh)
    		}
    	}
    }
    
    func (blder *WebhookBuilder) registerValidatingWebhook() {
    	validator, isValidator := blder.apiType.(admission.Validator)
    	if !isValidator {
    		log.Info("skip registering a validating webhook, admission.Validator interface is not implemented", "GVK", blder.gvk)
    		return
    	}
    	vwh := admission.ValidatingWebhookFor(validator)
    	if vwh != nil {
    		path := generateValidatePath(blder.gvk)
    
    		// Checking if the path is already registered.
    		// If so, just skip it.
    		if !blder.isAlreadyHandled(path) {
    			log.Info("Registering a validating webhook",
    				"GVK", blder.gvk,
    				"path", path)
    			blder.mgr.GetWebhookServer().Register(path, vwh)
    		}
    	}
    }
    
    func (blder *WebhookBuilder) registerConversionWebhook() error {
    	ok, err := conversion.IsConvertible(blder.mgr.GetScheme(), blder.apiType)
    	if err != nil {
    		log.Error(err, "conversion check failed", "object", blder.apiType)
    		return err
    	}
    	if ok {
    		if !blder.isAlreadyHandled("/convert") {
    			blder.mgr.GetWebhookServer().Register("/convert", &conversion.Webhook{})
    		}
    		log.Info("conversion webhook enabled", "object", blder.apiType)
    	}
    
    	return nil
    }
    
    //判断path是否已经注册过
    func (blder *WebhookBuilder) isAlreadyHandled(path string) bool {
    	if blder.mgr.GetWebhookServer().WebhookMux == nil {
    		return false
    	}
    	h, p := blder.mgr.GetWebhookServer().WebhookMux.Handler(&http.Request{URL: &url.URL{Path: path}})
    	if p == path && h != nil {
    		return true
    	}
    	return false
    }
    
    //生成mutate path
    func generateMutatePath(gvk schema.GroupVersionKind) string {
    	return "/mutate-" + strings.Replace(gvk.Group, ".", "-", -1) + "-" +
    		gvk.Version + "-" + strings.ToLower(gvk.Kind)
    }
    
    //生成validate path
    func generateValidatePath(gvk schema.GroupVersionKind) string {
    	return "/validate-" + strings.Replace(gvk.Group, ".", "-", -1) + "-" +
    		gvk.Version + "-" + strings.ToLower(gvk.Kind)
    }
    
  9. webhook_test.go 钩子测试文件

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐