sigs.k8s.io controller-runtime系列之二 manager分析
简介之前介绍过sigs.k8s.io controller-runtime系列之一builder分析sigs.k8s.io controller-runtime-builder 。本文主要介绍pkg/manager的源码分析。目录结构manager_suite_test.go校验k8s环境获取client config依赖ginkgo做集成测试,表示该文件夹内的测试例执行之前执行,BeforeSu
·
简介
之前介绍过sigs.k8s.io controller-runtime系列之一 builder分析sigs.k8s.io controller-runtime-builder 。
本文主要介绍pkg/manager的源码分析。
目录结构
- manager_suite_test.go 校验k8s环境 获取client config
- 依赖ginkgo做集成测试,表示该文件夹内的测试例执行之前执行,
- BeforeSuite和AfterSuite,会在所有测试例执行之前和之后执行
- 如果BeforeSuite执行失败,则这个测试集都不会被执行
- manager.go
- Manager接口 构建controller所需要的参数
type Manager interface { // Cluster 提供了许多方法来操作集群 cluster.Cluster // 该方法将设置manager得一些属性到实现了Runnable的结构体(实现了inject接口)中,并在mgr.Start启动时启动 // 如果实现了Runnable的结构体也实现了LeaderElectionRunnable接口,将工作在leader election 模式下(被选举为leader后运行) // 如果实现了Runnable的结构体没有实现LeaderElectionRunnable接口,将工作在 non-leaderelection 模式下(一直运行) Add(Runnable) error // 标识mgr是否为leader closed状态为leader // Elected为关闭状态的两种情况: // 1.mgr成为leader // 2.该mgr再集群中不是多个的存在,不需要leader选举,one is leader Elected() <-chan struct{} // 添加除了defaultMetricsEndpoint以外的任意指标(path/handler) // 对于一些诊断性的端点可能十分有用,值得注意的是,这些端点大多是比较敏感的,最好不要公开暴漏 // 对于简单的 path -> handler形式的映射方式不能满足需求, 我们可以提供一个server/listener形式的Runnable,调用Add方法添加到mgr AddMetricsExtraHandler(path string, handler http.Handler) error // 添加一个健康检查 AddHealthzCheck(name string, check healthz.Checker) error // 添加一个就绪检查 AddReadyzCheck(name string, check healthz.Checker) error // 启动所有已注册的控制器并阻塞,直到取消context为止 // 如果启动任何控制器时出错,则返回错误 // 如果开启了选举, 如果该mgr的leader lock丢失,那么处理程序必须立即退出, // 否则组件再失去leader lock的时候还会继续运行 Start(ctx context.Context) error // 获取webhook server GetWebhookServer() *webhook.Server // 获取该mgr的log GetLogger() logr.Logger // 返回全局的controller options GetControllerOptions() v1alpha1.ControllerConfigurationSpec }
- Options结构体 创建mgr的参数 提供创建mgr选填的参数
type Options struct { // 解析 GroupVersionKinds / Resources的对应转化 Scheme *runtime.Scheme // 提供了 rest 格式的映射, 用来构建 Kubernetes APIs MapperProvider func(c *rest.Config) (meta.RESTMapper, error) // 同步周期(reconcile的最小时间间隔),默认为10小时,可以手动修改设置 // 所有控制器的SyncPeriod之间将有10%的抖动,因此所有控制器将不会同时发送列表请求。 // // 同步周期发生的两个原因: // 1. 为了防止控制器中的错误导致对象不被重新排队,除此之外应该重新排队。 // 2. 为了确保不会在controller-runtime或其依赖项时发生未知错误,导致对象不被重新排队,除此之外应重新排队/应将其从队列中删除/不应该将其从队列中删除。 // // 如果你想 // 1. 为避免错过watch事件 // 2. 轮询无法监视的服务, // 那么我们建议您不要更改默认期限 SyncPeriod *time.Duration // mgr的log,默认是全局的log.Log Logger logr.Logger // 确定启动mgr时是否使用领导者选举 LeaderElection bool // 确定用于leader选举的resource lock, // 默认是 "configmapsleases". 仅当你知道自己要作什么才改变此值. // 否则, 控制器的用户可能会遇到多个正在运行的实例, // 每个实例在升级过程中都通过不同的资源锁获得了leader,因此并发地对同一资源进行操作. // 如果要迁移到"leases" resource lock, 您可以先迁移到相应的multilock(“ configmapsleases”或“ endpointsleases”)来完成此操作,这将在多个资源上获得一个leader锁. // 将所有用户迁移到 multilock之后, 你可以继续迁移到"leases". // 请注意,用户可能会跳过您的控制器版本. // // 注意:在conntroller-runtime版本v0.7之前,资源锁定设置为“ configmaps”。 // 在为控制器规划正确的迁移路径时,请记住这一点。 LeaderElectionResourceLock string // 确定将在其中创建leader election resource的名称空间. LeaderElectionNamespace string // 持有leader lock的资源的名称. LeaderElectionID string // 覆盖构建领导者选举客户端的默认配置. LeaderElectionConfig *rest.Config // 定义leader是否应该在mgr结束时自动down // 管理器停止时立即结束, 否则,此设置不安全 // 自己设置此为true可加快自动leader的过渡,因为新leader无需先等待 LeaseDuration时间 LeaderElectionReleaseOnCancel bool // 非领导候选人将等待获得领导的持续时间。 // 这是根据最后观察到的ack的时间来衡量的. // 默认15 seconds. LeaseDuration *time.Duration // acting的controlplane重新获取leader的持续时间 // 默认 10 seconds. RenewDeadline *time.Duration // LeaderElector clients 应在尝试获取锁之间等待的时间 // 默认 2 seconds. RetryPeriod *time.Duration // 限制mgr的cache以监视该命名空间中的对象 // 默认是 all namespaces // // 注意:如果指定了名称空间,则控制器仍可以监视cluster-scoped的资源(例如Node)。 // 对于namespaced资源,cache将仅保存所需命名空间中的对象. Namespace string // 控制器应绑定到的TCP地址以提供普罗米修斯指标. // 可以将其设置为“ 0”以禁用指标服务. MetricsBindAddress string // 控制器应绑定的TCP地址,以便为运行健康探测服务 HealthProbeBindAddress string // 准备就绪探针端点名称, 默认是 "readyz" ReadinessEndpointName string // 存活探针端点名称, 默认是 "healthz" LivenessEndpointName string // webhook服务器服务的端口.用于设置webhook.Server.Port Port int // Webhook服务器绑定到的主机名.用于设置webhook.Server.Host. Host string // 包含服务器密钥和证书的目录. // 如果未设置,则webhook服务器将在{TempDir}/k8s-webhook-server/serving-certs中查找服务器密钥和证书 // 服务器密钥和证书必须分别命名为tls.key和tls.crt CertDir string // 以下是所有用户可以使用的功能,以自定义将要注入的值。 // 创建要使用的cache被mgr使用 // 默认是 cache.NewCacheFunc NewCache cache.NewCacheFunc // 创建要使用的client被mgr使用. // 如果未设置,将创建默认的DelegatingClient,它包含了cache和client,将cache用于读取,将client用于写入 NewClient cluster.NewClientFunc // 告诉客户端,如果使用了任何缓存,则绕对给定对象的读 ClientDisableCacheFor []client.Object // 指定是否应将client配置为强制执行 dryRun模式 DryRunClient bool // 记录mgr发出的事件并将其发送到Kubernetes API // 使用它自定义事件收集和垃圾邮件过滤器 // // 不推荐使用:如果mgr或controller的生命周期短于进程的生命周期,则使用此方法可能会导致goroutine泄漏 EventBroadcaster record.EventBroadcaster // 在mgr实际返回停止之前,可运行停止的持续时间 // 要禁用正常关机,请设置为time.Duration(0) // 要使用正常关机而没有超时,请将其设置为负值,例如time.Duration(-1) // 出于安全原因,如果领导者选举租约丢失,将跳过正常关闭。 GracefulShutdownTimeout *time.Duration // 在此管理器中注册的控制器的全局配置选项 Controller v1alpha1.ControllerConfigurationSpec // 如果我们从不在此mgr上调用“Start”,则可以将broadcaster的创建推迟以避免泄漏goroutines。 // 它还会返回这是否是“ownd”broadcaster,因此应该与mgr一起停止. makeBroadcaster intrec.EventBroadcasterProducer // 依赖注入 newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster intrec.EventBroadcasterProducer) (*intrec.Provider, error) newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) newMetricsListener func(addr string) (net.Listener, error) newHealthProbeListener func(addr string) (net.Listener, error) }
- Runnable结构体 Runnable允许启动组件。非常重要的一点是,Start必须阻塞直到运行完成
type Runnable interface { Start(context.Context) error }
- RunnableFunc 使用函数实现Runnable
type RunnableFunc func(context.Context) error func (r RunnableFunc) Start(ctx context.Context) error { return r(ctx) }
- LeaderElectionRunnable接口 定义是否需要在领导者选举模式下运行Runnable
type LeaderElectionRunnable interface { NeedLeaderElection() bool }
- New函数 为创建controller返回一个新的mgr
func New(config *rest.Config, options Options) (Manager, error) { // 设置options默认值 options = setOptionsDefaults(options) // 通过options 创建一个新的cluster 提供给mgr cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) { clusterOptions.Scheme = options.Scheme clusterOptions.MapperProvider = options.MapperProvider clusterOptions.Logger = options.Logger clusterOptions.SyncPeriod = options.SyncPeriod clusterOptions.Namespace = options.Namespace clusterOptions.NewCache = options.NewCache clusterOptions.NewClient = options.NewClient clusterOptions.ClientDisableCacheFor = options.ClientDisableCacheFor clusterOptions.DryRunClient = options.DryRunClient clusterOptions.EventBroadcaster = options.EventBroadcaster }) if err != nil { return nil, err } // 创建记录器提供程序以为组件注入事件记录器 recorderProvider, err := options.newRecorderProvider(config, cluster.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster) if err != nil { return nil, err } // 创建资源锁以启用领导者选举 leaderConfig := options.LeaderElectionConfig if leaderConfig == nil { leaderConfig = rest.CopyConfig(config) } resourceLock, err := options.newResourceLock(leaderConfig, recorderProvider, leaderelection.Options{ LeaderElection: options.LeaderElection, LeaderElectionResourceLock: options.LeaderElectionResourceLock, LeaderElectionID: options.LeaderElectionID, LeaderElectionNamespace: options.LeaderElectionNamespace, }) if err != nil { return nil, err } // 创建指标侦听器. 如果地址无效或已在使用中,则会抛出错误. metricsListener, err := options.newMetricsListener(options.MetricsBindAddress) if err != nil { return nil, err } // 默认情况下,我们没有额外的端点可在指标http服务器上公开。但是可以定义 metricsExtraHandlers := make(map[string]http.Handler) // 创建运行状况探测器侦听器。如果绑定将抛出错误 healthProbeListener, err := options.newHealthProbeListener(options.HealthProbeBindAddress) if err != nil { return nil, err } return &controllerManager{ cluster: cluster, recorderProvider: recorderProvider, resourceLock: resourceLock, metricsListener: metricsListener, metricsExtraHandlers: metricsExtraHandlers, controllerOptions: options.Controller, logger: options.Logger, elected: make(chan struct{}), port: options.Port, host: options.Host, certDir: options.CertDir, leaseDuration: *options.LeaseDuration, renewDeadline: *options.RenewDeadline, retryPeriod: *options.RetryPeriod, healthProbeListener: healthProbeListener, readinessEndpointName: options.ReadinessEndpointName, livenessEndpointName: options.LivenessEndpointName, gracefulShutdownTimeout: *options.GracefulShutdownTimeout, internalProceduresStop: make(chan struct{}), leaderElectionStopped: make(chan struct{}), }, nil }
- manager_test.go 测试操作manager的文件
- doc.go 暂时没用,做一个解释文档
- example_test.go 测试例子
- internal.go controller-runtime的内部实现manager
- controllerManager 结构体 实现了mgr
type controllerManager struct { // cluster具有多种与集群交互的方法。该属性是必需的。 cluster cluster.Cluster // controllerManager将deps注入并启动的Controllers的集合 // 这些Runnable由领导选举进行管理. leaderElectionRunnables []Runnable // controllerManager将deps注入并启动的Webhook服务器的集合。 // 这些Runnable不会被牵头选举所阻止 nonLeaderElectionRunnables []Runnable // 用于生成事件记录器,该事件记录器将注入到控制器中(EventHandlers, Sources and Predicates). recorderProvider *intrec.Provider // 资源lock 选举leader resourceLock resourcelock.Interface // 定义mgr在关闭时是否应退出领导者租约 leaderElectionReleaseOnCancel bool // 用于提供普罗米修斯指标 metricsListener net.Listener // 包含要在提供指标的http服务器上注册的额外处理程序。 metricsExtraHandlers map[string]http.Handler // 用于提供活动性探针 healthProbeListener net.Listener // 准备就绪探针端点名称 readinessEndpointName string // 活动度探针端点名称 livenessEndpointName string // Readyz探针处理程序 readyzHandler *healthz.Handler // Healthz探针处理器 healthzHandler *healthz.Handler // mu sync.Mutex started bool startedLeader bool healthzStarted bool errChan chan error // 全局控制器选项。 controllerOptions v1alpha1.ControllerConfigurationSpec logger logr.Logger // 用于取消领导人选举。它与internalStopper不同,因为出于安全原因,我们在失去领导人选举时需要os.Exit(), // 意味着必须将其推迟到完成gracefulShutdown之后。 leaderElectionCancel context.CancelFunc // 一个内部通道,用于向停止程序发出信号,表示LeaderElection.Run(...)函数已返回,并且可以继续进行关闭 leaderElectionStopped chan struct{} // 停止程序已启用。换句话说,我们不应在管理器中添加其他任何内容 stopProcedureEngaged bool // 当此管理者成为一组管理者的领导者时被关闭,因为赢得了领导者选举,或者因为未配置领导者选举。 elected chan struct{} caches []hasCache port int host string certDir string webhookServer *webhook.Server // 同manager leaseDuration time.Duration // 同manager renewDeadline time.Duration // 同manager retryPeriod time.Duration // 保持当前正在运行的可运行runnable,以便我们可以在退出mgr之前等待它们退出 waitForRunnable sync.WaitGroup // 是可运行停止到在mgr实际返回停止之前的持续时间. gracefulShutdownTimeout time.Duration // 领导人选举租约丢失时调用 onStoppedLeading func() // 是可以在关机期间使用的上下文。在gracefulShutdownTimeout结束后它将被取消。在internalStop // 关闭之前一定不能访问它,因为它将为nil shutdownCtx context.Context internalCtx context.Context internalCancel context.CancelFunc // 用于协调管理器内部正确关闭服务器. 此通道也用于依赖项注入. internalProceduresStop chan struct{} }
- hasCache 接口 实现了Runnable
type hasCache interface { Runnable GetCache() cache.Cache }
- Add 方法 在r上设置依赖项,并将其添加到Runnables列表中以启动。
func (cm *controllerManager) Add(r Runnable) error { cm.mu.Lock() defer cm.mu.Unlock() if cm.stopProcedureEngaged { return errors.New("can't accept new runnable as stop procedure is already engaged") } // 在r上设置依赖项 if err := cm.SetFields(r); err != nil { return err } var shouldStart bool // 添加runnable到nonLeaderElectionRunnables和leaderElectionRunnables if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() { shouldStart = cm.started cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r) } else if hasCache, ok := r.(hasCache); ok { cm.caches = append(cm.caches, hasCache) } else { shouldStart = cm.startedLeader cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r) } // 开启r if shouldStart { // If already started, start the controller cm.startRunnable(r) } return nil }
- AddMetricsExtraHandler 方法 在提供指标的http服务器的路径上提供了额外的处理程序。
func (cm *controllerManager) AddMetricsExtraHandler(path string, handler http.Handler) error { if path == defaultMetricsEndpoint { return fmt.Errorf("overriding builtin %s endpoint is not allowed", defaultMetricsEndpoint) } cm.mu.Lock() defer cm.mu.Unlock() _, found := cm.metricsExtraHandlers[path] if found { return fmt.Errorf("can't register extra handler by duplicate path %q on metrics http server", path) } cm.metricsExtraHandlers[path] = handler cm.logger.V(2).Info("Registering metrics http server extra handler", "path", path) return nil }
- serveMetrics方法 开启serve metrics
func (cm *controllerManager) serveMetrics() { // 获取处理path //metrics的handler handler := promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{ ErrorHandling: promhttp.HTTPErrorOnError, }) // TODO(JoelSpeed): Use existing Kubernetes machinery for serving metrics mux := http.NewServeMux() mux.Handle(defaultMetricsEndpoint, handler) func() { cm.mu.Lock() defer cm.mu.Unlock() // 开启自定义的metricsHandler for path, extraHandler := range cm.metricsExtraHandlers { mux.Handle(path, extraHandler) } }() server := http.Server{ Handler: mux, } // start metrics server cm.startRunnable(RunnableFunc(func(_ context.Context) error { cm.logger.Info("starting metrics server", "path", defaultMetricsEndpoint) if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed { return err } return nil })) // 关闭metrics service 如果有signal <-cm.internalProceduresStop if err := server.Shutdown(cm.shutdownCtx); err != nil { cm.errChan <- err } }
- Start方法 mgr启动程序 包括启动controller,Metrics,health,NonLeaderElectionRunnables/LeaderElectionRunnables等
func (cm *controllerManager) Start(ctx context.Context) (err error) { if err := cm.Add(cm.cluster); err != nil { return fmt.Errorf("failed to add cluster to runnables: %w", err) } cm.internalCtx, cm.internalCancel = context.WithCancel(ctx) // 此chan指示停止已完成,也即是,所有可运行对象均已返回,或者在停止请求时超时 stopComplete := make(chan struct{}) defer close(stopComplete) // 在关闭stopComplete之后必须推迟此操作,否则我们将陷入死锁. defer func() { // https://hips.hearstapps.com/hmg-prod.s3.amazonaws.com/images/gettyimages-459889618-1533579787.jpg stopErr := cm.engageStopProcedure(stopComplete) if stopErr != nil { if err != nil { err = utilerrors.NewAggregate([]error{err, stopErr}) } else { err = stopErr } } }() cm.errChan = make(chan error) // 无论控制器是否为领导者,都应提供度量标准。 // (如果我们不为非领导者提供指标,则Prometheus仍会抓取 pod,但会拒绝连接) if cm.metricsListener != nil { go cm.serveMetrics() } // 服务健康探针 if cm.healthProbeListener != nil { go cm.serveHealthProbes() } go cm.startNonLeaderElectionRunnables() go func() { if cm.resourceLock != nil { err := cm.startLeaderElection() if err != nil { cm.errChan <- err } } else { // Treat not having leader election enabled the same as being elected. cm.startLeaderElectionRunnables() close(cm.elected) } }() select { case <-ctx.Done(): // We are done return nil case err := <-cm.errChan: // Error starting or running a runnable return err } }
- signals 用于通知信号的包
更多推荐
已为社区贡献15条内容
所有评论(0)