sigs.k8s.io controller-runtime系列之一 builder分析
简介之前介绍过controller-runtime总览sigs.k8s.io controller-runtime总览 。本文主要介绍pkg/builder的源码分析。目录结构builder_suite_test.go 校验k8s环境获取client config依赖ginkgo做集成测试,表示该文件夹内的测试例执行之前执行,BeforeSuite和AfterSuite,会在所有测试例执行之前和之
·
简介
之前介绍过controller-runtime总览sigs.k8s.io controller-runtime总览 。
本文主要介绍pkg/builder的源码分析。
目录结构
-
builder_suite_test.go 校验k8s环境 获取client config
- 依赖ginkgo做集成测试,表示该文件夹内的测试例执行之前执行,
- BeforeSuite和AfterSuite,会在所有测试例执行之前和之后执行
- 如果BeforeSuite执行失败,则这个测试集都不会被执行
-
controller.go
- Builder结构体 构建controller所需要的参数
type Builder struct { forInput ForInput //封装实现了runtime.Object的结构体 ownsInput []OwnsInput // 封装了apiType的owner,用于EnqueueRequestForOwner使用 watchesInput []WatchesInput // 封装type和evenHandler,一般使用owns和For代替,如果想扩展,可以使用 mgr manager.Manager // mgr 主要使用内部的参数构造controller globalPredicates []predicate.Predicate // 全局过滤事件 ctrl controller.Controller // 构造的controller ctrlOptions controller.Options // 构造的controller 的参数包括最大并发reconcile数和reconciler name string //默认是对应apiType的小写形式,符合普罗米修斯的命名格式 }
- ControllerManagedBy函数 根据提供的mgr生成一个新的builder
func ControllerManagedBy(m manager.Manager) *Builder { return &Builder{mgr: m} }
- ForInput结构体 作为For方法的入参 (reconcile的object)
type ForInput struct { object client.Object //封装实现了runtime.Object的结构体 predicates []predicate.Predicate //过滤事件 objectProjection objectProjection //标识我们可以发送/接收一个给定的resource(metadata-only, unstructured, etc) err error }
- blder For方法 新建forInput,设置forInput的predicates并给blder的forInput赋值
func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder { if blder.forInput.object != nil { blder.forInput.err = fmt.Errorf("For(...) should only be called once, could not assign multiple objects for reconciliation") return blder } input := ForInput{object: object} for _, opt := range opts { opt.ApplyToFor(&input) } blder.forInput = input return blder }
- OwnsInput结构体 作为Owns方法的入参 (ForInput的owner的object)
type OwnsInput struct { object client.Object //封装实现了runtime.Object的结构体 predicates []predicate.Predicate //过滤事件 objectProjection objectProjection //标识我们可以发送/接收一个给定的resource(metadata-only, unstructured, etc) }
- blder Owns方法 给blder的ownsInput追加对象,具体看blder.ownsInput的作用
func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder { input := OwnsInput{object: object} for _, opt := range opts { opt.ApplyToOwns(&input) } blder.ownsInput = append(blder.ownsInput, input) return blder }
- watchRequest结构体 封装watch对象和事件处理器
type watchRequest struct { src source.Source eventhandler handler.EventHandler predicates []predicate.Predicate objectProjection objectProjection }
- blder Watches方法 给blder.watchesInput追加WatchesInput
func (blder *Builder) Watches(src source.Source, eventhandler handler.EventHandler, opts ...WatchesOption) *Builder { input := WatchesInput{src: src, eventhandler: eventhandler} for _, opt := range opts { opt.ApplyToWatches(&input) } blder.watchesInput = append(blder.watchesInput, input) return blder }
- blder WithOptions方法 设置controller的Options
- blder WithLogger方法 设置controller的Options.Log
- blder WithEventFilter方法 给blder.globalPredicates追加过滤事件对象
func (blder *Builder) WithEventFilter(p predicate.Predicate) *Builder { blder.globalPredicates = append(blder.globalPredicates, p) return blder }
- blder Named方法 给blder.name赋值
- blder Complete方法 注意了 ,这个是核心方法,根据提供的reconciler生成blder.ctrl开启ctrl.watch
func (blder *Builder) Complete(r reconcile.Reconciler) error { //调用Build _, err := blder.Build(r) return err } func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) { if r == nil { return nil, fmt.Errorf("must provide a non-nil Reconciler") } if blder.mgr == nil { return nil, fmt.Errorf("must provide a non-nil Manager") } if blder.forInput.err != nil { return nil, blder.forInput.err } // 校验要reconcile的对象是否存在 if blder.forInput.object == nil { return nil, fmt.Errorf("must provide an object for reconciliation") } // 创建controller赋值给blder.ctrl if err := blder.doController(r); err != nil { return nil, err } // 开启ctrl.watch(包括owner和apiType) if err := blder.doWatch(); err != nil { return nil, err } return blder.ctrl, nil } func (blder *Builder) doWatch() error { // Reconcile 对应的apitype typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection) if err != nil { return err } src := &source.Kind{Type: typeForSrc} hdler := &handler.EnqueueRequestForObject{} allPredicates := append(blder.globalPredicates, blder.forInput.predicates...) if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { return err } // 监听owns对应的type for _, own := range blder.ownsInput { typeForSrc, err := blder.project(own.object, own.objectProjection) if err != nil { return err } src := &source.Kind{Type: typeForSrc} hdler := &handler.EnqueueRequestForOwner{ OwnerType: blder.forInput.object, IsController: true, } allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, own.predicates...) if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { return err } } // 监听watchs对应的source for _, w := range blder.watchesInput { allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, w.predicates...) // If the source of this watch is of type *source.Kind, project it. if srckind, ok := w.src.(*source.Kind); ok { typeForSrc, err := blder.project(srckind.Type, w.objectProjection) if err != nil { return err } srckind.Type = typeForSrc } if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil { return err } } return nil } func (blder *Builder) doController(r reconcile.Reconciler) error { // 获取全局的opts globalOpts := blder.mgr.GetControllerOptions() // 获取controller的opts ctrlOptions := blder.ctrlOptions // 设置reconciler if ctrlOptions.Reconciler == nil { ctrlOptions.Reconciler = r } // 重新获取我们将要 reconciling的GVK gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme()) if err != nil { return err } // 设置最大并发Reconcile的数目 if ctrlOptions.MaxConcurrentReconciles == 0 { groupKind := gvk.GroupKind().String() // 如果获取当前的reconcile数大于0,就设置为当前的 if concurrency, ok := globalOpts.GroupKindConcurrency[groupKind]; ok && concurrency > 0 { ctrlOptions.MaxConcurrentReconciles = concurrency } } // 设置cache同步过期时间 if ctrlOptions.CacheSyncTimeout == 0 && globalOpts.CacheSyncTimeout != nil { ctrlOptions.CacheSyncTimeout = *globalOpts.CacheSyncTimeout } // 设置controller的log if ctrlOptions.Log == nil { ctrlOptions.Log = blder.mgr.GetLogger() } ctrlOptions.Log = ctrlOptions.Log.WithValues("reconciler group", gvk.Group, "reconciler kind", gvk.Kind) //构造controller 调用pkg/controller的New方法 blder.ctrl, err = newController(blder.getControllerName(gvk), blder.mgr, ctrlOptions) return err }
-
controller_test.go 测试操作builder的文件
-
doc.go 提供一个log
-
example_test.go 基于ReplicaSet的测试例子
-
example_webhook_test.go 此示例使用Webhook构建器创建一个简单的Webhook,该Webhook由CRD ChaosPod的mgr进行管理。
-
options.go 为controller提供设置Option的文件(设置Predicates和objectProjection)
-
webhook.go
- WebhookBuilder结构体 构建webhook所需要的参数
type WebhookBuilder struct { apiType runtime.Object // 钩子触发的apiType(实现了runtime.Object的结构体) gvk schema.GroupVersionKind // 钩子触发的apiType的gvk mgr manager.Manager // mgr提供构建webhook所需的属性 config *rest.Config // k8s的client配置 }
- WebhookManagedBy函数 根据提供的mgr生成一个新的webhookBuilder
func WebhookManagedBy(m manager.Manager) *WebhookBuilder { return &WebhookBuilder{mgr: m} }
- blder For方法 给blder的apiType赋值
func (blder *WebhookBuilder) For(apiType runtime.Object) *WebhookBuilder { blder.apiType = apiType return blder }
- blder Complete方法 注意了 ,这个是核心方法,为apiType对应的GVK注册DefaultingWebhook(改变)、ValidatingWebhook(验证)和ConversionWebhook(version转换)
func (blder *WebhookBuilder) Complete() error { // 加载client config blder.loadRestConfig() // 注册需要的webhook return blder.registerWebhooks() } func (blder *WebhookBuilder) registerWebhooks() error { // 获取gvk var err error blder.gvk, err = apiutil.GVKForObject(blder.apiType, blder.mgr.GetScheme()) if err != nil { return err } // 注册mutaing webhook blder.registerDefaultingWebhook() // 注册validate webhook blder.registerValidatingWebhook() // 注册conversion webhook err = blder.registerConversionWebhook() if err != nil { return err } return nil } func (blder *WebhookBuilder) registerDefaultingWebhook() { //类型转换 defaulter, isDefaulter := blder.apiType.(admission.Defaulter) if !isDefaulter { log.Info("skip registering a mutating webhook, admission.Defaulter interface is not implemented", "GVK", blder.gvk) return } //根据default validate使用pkg/admission获取mutaing webhook mwh := admission.DefaultingWebhookFor(defaulter) if mwh != nil { // 生成mutate path path := generateMutatePath(blder.gvk) // 校验该path是否已经注册 // 如果true,跳过 if !blder.isAlreadyHandled(path) { log.Info("Registering a mutating webhook", "GVK", blder.gvk, "path", path) blder.mgr.GetWebhookServer().Register(path, mwh) } } } func (blder *WebhookBuilder) registerValidatingWebhook() { validator, isValidator := blder.apiType.(admission.Validator) if !isValidator { log.Info("skip registering a validating webhook, admission.Validator interface is not implemented", "GVK", blder.gvk) return } vwh := admission.ValidatingWebhookFor(validator) if vwh != nil { path := generateValidatePath(blder.gvk) // Checking if the path is already registered. // If so, just skip it. if !blder.isAlreadyHandled(path) { log.Info("Registering a validating webhook", "GVK", blder.gvk, "path", path) blder.mgr.GetWebhookServer().Register(path, vwh) } } } func (blder *WebhookBuilder) registerConversionWebhook() error { ok, err := conversion.IsConvertible(blder.mgr.GetScheme(), blder.apiType) if err != nil { log.Error(err, "conversion check failed", "object", blder.apiType) return err } if ok { if !blder.isAlreadyHandled("/convert") { blder.mgr.GetWebhookServer().Register("/convert", &conversion.Webhook{}) } log.Info("conversion webhook enabled", "object", blder.apiType) } return nil } //判断path是否已经注册过 func (blder *WebhookBuilder) isAlreadyHandled(path string) bool { if blder.mgr.GetWebhookServer().WebhookMux == nil { return false } h, p := blder.mgr.GetWebhookServer().WebhookMux.Handler(&http.Request{URL: &url.URL{Path: path}}) if p == path && h != nil { return true } return false } //生成mutate path func generateMutatePath(gvk schema.GroupVersionKind) string { return "/mutate-" + strings.Replace(gvk.Group, ".", "-", -1) + "-" + gvk.Version + "-" + strings.ToLower(gvk.Kind) } //生成validate path func generateValidatePath(gvk schema.GroupVersionKind) string { return "/validate-" + strings.Replace(gvk.Group, ".", "-", -1) + "-" + gvk.Version + "-" + strings.ToLower(gvk.Kind) }
-
webhook_test.go 钩子测试文件
更多推荐
已为社区贡献15条内容
所有评论(0)