简介

之前介绍过sigs.k8s.io controller-runtime系列之一 builder分析sigs.k8s.io controller-runtime-builder
本文主要介绍pkg/manager的源码分析。

目录结构

  1. manager_suite_test.go 校验k8s环境 获取client config
    • 依赖ginkgo做集成测试,表示该文件夹内的测试例执行之前执行,
    • BeforeSuite和AfterSuite,会在所有测试例执行之前和之后执行
    • 如果BeforeSuite执行失败,则这个测试集都不会被执行
  2. manager.go
    • Manager接口 构建controller所需要的参数
    type Manager interface {
        // Cluster 提供了许多方法来操作集群
        cluster.Cluster
    
        // 该方法将设置manager得一些属性到实现了Runnable的结构体(实现了inject接口)中,并在mgr.Start启动时启动
        // 如果实现了Runnable的结构体也实现了LeaderElectionRunnable接口,将工作在leader election 模式下(被选举为leader后运行)
        // 如果实现了Runnable的结构体没有实现LeaderElectionRunnable接口,将工作在 non-leaderelection 模式下(一直运行)
    	Add(Runnable) error
    
        // 标识mgr是否为leader  closed状态为leader
        // Elected为关闭状态的两种情况:
        // 1.mgr成为leader
        // 2.该mgr再集群中不是多个的存在,不需要leader选举,one is leader
        Elected() <-chan struct{}
        
        // 添加除了defaultMetricsEndpoint以外的任意指标(path/handler)
        // 对于一些诊断性的端点可能十分有用,值得注意的是,这些端点大多是比较敏感的,最好不要公开暴漏
        // 对于简单的 path -> handler形式的映射方式不能满足需求, 我们可以提供一个server/listener形式的Runnable,调用Add方法添加到mgr
        AddMetricsExtraHandler(path string, handler http.Handler) error
        
        // 添加一个健康检查
        AddHealthzCheck(name string, check healthz.Checker) error
        
        // 添加一个就绪检查
        AddReadyzCheck(name string, check healthz.Checker) error
        
        // 启动所有已注册的控制器并阻塞,直到取消context为止
        // 如果启动任何控制器时出错,则返回错误
        // 如果开启了选举, 如果该mgr的leader lock丢失,那么处理程序必须立即退出,
        // 否则组件再失去leader lock的时候还会继续运行
        Start(ctx context.Context) error
        
        // 获取webhook server
        GetWebhookServer() *webhook.Server
        
        // 获取该mgr的log
        GetLogger() logr.Logger
        
        // 返回全局的controller options
        GetControllerOptions() v1alpha1.ControllerConfigurationSpec
    }
    
    • Options结构体 创建mgr的参数 提供创建mgr选填的参数
    type Options struct {
    	// 解析 GroupVersionKinds / Resources的对应转化
    	Scheme *runtime.Scheme
    
    	// 提供了 rest 格式的映射, 用来构建 Kubernetes APIs
    	MapperProvider func(c *rest.Config) (meta.RESTMapper, error)
    
    	// 同步周期(reconcile的最小时间间隔),默认为10小时,可以手动修改设置
        // 所有控制器的SyncPeriod之间将有10%的抖动,因此所有控制器将不会同时发送列表请求。
    	//
    	// 同步周期发生的两个原因:
    	// 1. 为了防止控制器中的错误导致对象不被重新排队,除此之外应该重新排队。
    	// 2. 为了确保不会在controller-runtime或其依赖项时发生未知错误,导致对象不被重新排队,除此之外应重新排队/应将其从队列中删除/不应该将其从队列中删除。
    	//
    	// 如果你想
    	// 1. 为避免错过watch事件
    	// 2. 轮询无法监视的服务,
    	// 那么我们建议您不要更改默认期限
    	SyncPeriod *time.Duration
    
    	// mgr的log,默认是全局的log.Log
    	Logger logr.Logger
    
    	// 确定启动mgr时是否使用领导者选举
    	LeaderElection bool
    
    	// 确定用于leader选举的resource lock,
    	// 默认是 "configmapsleases". 仅当你知道自己要作什么才改变此值.
    	// 否则, 控制器的用户可能会遇到多个正在运行的实例,
        // 每个实例在升级过程中都通过不同的资源锁获得了leader,因此并发地对同一资源进行操作.
    	// 如果要迁移到"leases" resource lock, 您可以先迁移到相应的multilock(“ configmapsleases”或“ endpointsleases”)来完成此操作,这将在多个资源上获得一个leader锁. 
        // 将所有用户迁移到 multilock之后, 你可以继续迁移到"leases".
    	// 请注意,用户可能会跳过您的控制器版本.
    	//
    	// 注意:在conntroller-runtime版本v0.7之前,资源锁定设置为“ configmaps”。 
        // 在为控制器规划正确的迁移路径时,请记住这一点。
    	LeaderElectionResourceLock string
    
    	// 确定将在其中创建leader election resource的名称空间.
    	LeaderElectionNamespace string
    
    	// 持有leader lock的资源的名称.
    	LeaderElectionID string
    
    	// 覆盖构建领导者选举客户端的默认配置.
    	LeaderElectionConfig *rest.Config
    
    	// 定义leader是否应该在mgr结束时自动down 
    	// 管理器停止时立即结束, 否则,此设置不安全
    	// 自己设置此为true可加快自动leader的过渡,因为新leader无需先等待 LeaseDuration时间
    	LeaderElectionReleaseOnCancel bool
    
    	// 非领导候选人将等待获得领导的持续时间。
        // 这是根据最后观察到的ack的时间来衡量的. 
        // 默认15 seconds.
    	LeaseDuration *time.Duration
    
    	// acting的controlplane重新获取leader的持续时间
    	// 默认 10 seconds.
    	RenewDeadline *time.Duration
    
    	// LeaderElector clients 应在尝试获取锁之间等待的时间
    	// 默认 2 seconds.
    	RetryPeriod *time.Duration
    
    	// 限制mgr的cache以监视该命名空间中的对象
    	// 默认是 all namespaces
    	//
    	// 注意:如果指定了名称空间,则控制器仍可以监视cluster-scoped的资源(例如Node)。
    	// 对于namespaced资源,cache将仅保存所需命名空间中的对象.
    	Namespace string
    
    	// 控制器应绑定到的TCP地址以提供普罗米修斯指标.
    	// 可以将其设置为“ 0”以禁用指标服务.
    	MetricsBindAddress string
    
    	// 控制器应绑定的TCP地址,以便为运行健康探测服务
    	HealthProbeBindAddress string
    
    	// 准备就绪探针端点名称, 默认是 "readyz"
    	ReadinessEndpointName string
    
    	// 存活探针端点名称, 默认是 "healthz"
    	LivenessEndpointName string
    
    	// webhook服务器服务的端口.用于设置webhook.Server.Port
    	Port int
    	// Webhook服务器绑定到的主机名.用于设置webhook.Server.Host.
    	Host string
    
    	// 包含服务器密钥和证书的目录.
    	// 如果未设置,则webhook服务器将在{TempDir}/k8s-webhook-server/serving-certs中查找服务器密钥和证书
    	// 服务器密钥和证书必须分别命名为tls.key和tls.crt
    	CertDir string
    
    	// 以下是所有用户可以使用的功能,以自定义将要注入的值。
    
    	// 创建要使用的cache被mgr使用
    	// 默认是 cache.NewCacheFunc
    	NewCache cache.NewCacheFunc
    
    	// 创建要使用的client被mgr使用.
    	// 如果未设置,将创建默认的DelegatingClient,它包含了cache和client,将cache用于读取,将client用于写入
    	NewClient cluster.NewClientFunc
    
    	// 告诉客户端,如果使用了任何缓存,则绕对给定对象的读
    	ClientDisableCacheFor []client.Object
    
    	// 指定是否应将client配置为强制执行 dryRun模式
    	DryRunClient bool
    
    	// 记录mgr发出的事件并将其发送到Kubernetes API
    	// 使用它自定义事件收集和垃圾邮件过滤器
    	//
    	// 不推荐使用:如果mgr或controller的生命周期短于进程的生命周期,则使用此方法可能会导致goroutine泄漏
    	EventBroadcaster record.EventBroadcaster
    
    	// 在mgr实际返回停止之前,可运行停止的持续时间
    	// 要禁用正常关机,请设置为time.Duration(0)
    	// 要使用正常关机而没有超时,请将其设置为负值,例如time.Duration(-1)
    	// 出于安全原因,如果领导者选举租约丢失,将跳过正常关闭。
    	GracefulShutdownTimeout *time.Duration
    
    	// 在此管理器中注册的控制器的全局配置选项
    	Controller v1alpha1.ControllerConfigurationSpec
    
    	// 如果我们从不在此mgr上调用“Start”,则可以将broadcaster的创建推迟以避免泄漏goroutines。
    	// 它还会返回这是否是“ownd”broadcaster,因此应该与mgr一起停止.
    	makeBroadcaster intrec.EventBroadcasterProducer
    
    	// 依赖注入
    	newRecorderProvider    func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster intrec.EventBroadcasterProducer) (*intrec.Provider, error)
    	newResourceLock        func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error)
    	newMetricsListener     func(addr string) (net.Listener, error)
    	newHealthProbeListener func(addr string) (net.Listener, error)
    }
    
    • Runnable结构体 Runnable允许启动组件。非常重要的一点是,Start必须阻塞直到运行完成
    type Runnable interface {
    	Start(context.Context) error
    }
    
    • RunnableFunc 使用函数实现Runnable
    type RunnableFunc func(context.Context) error
    
    func (r RunnableFunc) Start(ctx context.Context) error {
     return r(ctx)
    }
    
    • LeaderElectionRunnable接口 定义是否需要在领导者选举模式下运行Runnable
    type LeaderElectionRunnable interface {
    	NeedLeaderElection() bool
    }
    
    • New函数 为创建controller返回一个新的mgr
    func New(config *rest.Config, options Options) (Manager, error) {
    	// 设置options默认值
    	options = setOptionsDefaults(options)
    
        // 通过options 创建一个新的cluster 提供给mgr
    	cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {
    		clusterOptions.Scheme = options.Scheme
    		clusterOptions.MapperProvider = options.MapperProvider
    		clusterOptions.Logger = options.Logger
    		clusterOptions.SyncPeriod = options.SyncPeriod
    		clusterOptions.Namespace = options.Namespace
    		clusterOptions.NewCache = options.NewCache
    		clusterOptions.NewClient = options.NewClient
    		clusterOptions.ClientDisableCacheFor = options.ClientDisableCacheFor
    		clusterOptions.DryRunClient = options.DryRunClient
    		clusterOptions.EventBroadcaster = options.EventBroadcaster
    	})
    	if err != nil {
    		return nil, err
    	}
    
    	// 创建记录器提供程序以为组件注入事件记录器
    	recorderProvider, err := options.newRecorderProvider(config, cluster.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster)
    	if err != nil {
    		return nil, err
    	}
    
    	// 创建资源锁以启用领导者选举
    	leaderConfig := options.LeaderElectionConfig
    	if leaderConfig == nil {
    		leaderConfig = rest.CopyConfig(config)
    	}
    	resourceLock, err := options.newResourceLock(leaderConfig, recorderProvider, leaderelection.Options{
    		LeaderElection:             options.LeaderElection,
    		LeaderElectionResourceLock: options.LeaderElectionResourceLock,
    		LeaderElectionID:           options.LeaderElectionID,
    		LeaderElectionNamespace:    options.LeaderElectionNamespace,
    	})
    	if err != nil {
    		return nil, err
    	}
    
    	// 创建指标侦听器. 如果地址无效或已在使用中,则会抛出错误.
    	metricsListener, err := options.newMetricsListener(options.MetricsBindAddress)
    	if err != nil {
    		return nil, err
    	}
    
    	// 默认情况下,我们没有额外的端点可在指标http服务器上公开。但是可以定义
    	metricsExtraHandlers := make(map[string]http.Handler)
    
    	// 创建运行状况探测器侦听器。如果绑定将抛出错误
    	healthProbeListener, err := options.newHealthProbeListener(options.HealthProbeBindAddress)
    	if err != nil {
    		return nil, err
    	}
    
    	return &controllerManager{
    		cluster:                 cluster,
    		recorderProvider:        recorderProvider,
    		resourceLock:            resourceLock,
    		metricsListener:         metricsListener,
    		metricsExtraHandlers:    metricsExtraHandlers,
    		controllerOptions:       options.Controller,
    		logger:                  options.Logger,
    		elected:                 make(chan struct{}),
    		port:                    options.Port,
    		host:                    options.Host,
    		certDir:                 options.CertDir,
    		leaseDuration:           *options.LeaseDuration,
    		renewDeadline:           *options.RenewDeadline,
    		retryPeriod:             *options.RetryPeriod,
    		healthProbeListener:     healthProbeListener,
    		readinessEndpointName:   options.ReadinessEndpointName,
    		livenessEndpointName:    options.LivenessEndpointName,
    		gracefulShutdownTimeout: *options.GracefulShutdownTimeout,
    		internalProceduresStop:  make(chan struct{}),
    		leaderElectionStopped:   make(chan struct{}),
    	}, nil
    }
    
  3. manager_test.go 测试操作manager的文件
  4. doc.go 暂时没用,做一个解释文档
  5. example_test.go 测试例子
  6. internal.go controller-runtime的内部实现manager
    • controllerManager 结构体 实现了mgr
    type controllerManager struct {
    	// cluster具有多种与集群交互的方法。该属性是必需的。
    	cluster cluster.Cluster
    
    	// controllerManager将deps注入并启动的Controllers的集合
    	// 这些Runnable由领导选举进行管理.
    	leaderElectionRunnables []Runnable
    	// controllerManager将deps注入并启动的Webhook服务器的集合。 
        // 这些Runnable不会被牵头选举所阻止
    	nonLeaderElectionRunnables []Runnable
    
    	// 用于生成事件记录器,该事件记录器将注入到控制器中(EventHandlers, Sources and Predicates).
    	recorderProvider *intrec.Provider
    
    	// 资源lock 选举leader
    	resourceLock resourcelock.Interface
    
    	// 定义mgr在关闭时是否应退出领导者租约
    	leaderElectionReleaseOnCancel bool
    
    	// 用于提供普罗米修斯指标
    	metricsListener net.Listener
    
    	// 包含要在提供指标的http服务器上注册的额外处理程序。
    	metricsExtraHandlers map[string]http.Handler
    
    	// 用于提供活动性探针
    	healthProbeListener net.Listener
    
    	// 准备就绪探针端点名称
    	readinessEndpointName string
    
    	// 活动度探针端点名称
    	livenessEndpointName string
    
    	// Readyz探针处理程序
    	readyzHandler *healthz.Handler
    
    	// Healthz探针处理器
    	healthzHandler *healthz.Handler
    
        //
    	mu             sync.Mutex
    	started        bool
    	startedLeader  bool
    	healthzStarted bool
    	errChan        chan error
    
    	// 全局控制器选项。
    	controllerOptions v1alpha1.ControllerConfigurationSpec
    
    	logger logr.Logger
    
    	// 用于取消领导人选举。它与internalStopper不同,因为出于安全原因,我们在失去领导人选举时需要os.Exit(),
    	// 意味着必须将其推迟到完成gracefulShutdown之后。
    	leaderElectionCancel context.CancelFunc
    
    	// 一个内部通道,用于向停止程序发出信号,表示LeaderElection.Run(...)函数已返回,并且可以继续进行关闭
    	leaderElectionStopped chan struct{}
    
    	// 停止程序已启用。换句话说,我们不应在管理器中添加其他任何内容
    	stopProcedureEngaged bool
    
    	// 当此管理者成为一组管理者的领导者时被关闭,因为赢得了领导者选举,或者因为未配置领导者选举。
    	elected chan struct{}
    
    	caches []hasCache
    
    	port int
    	host string
    	
    	certDir string
    
    	webhookServer *webhook.Server
    
        // 同manager
    	leaseDuration time.Duration
    	// 同manager
    	renewDeadline time.Duration
    	// 同manager
    	retryPeriod time.Duration
    
    	// 保持当前正在运行的可运行runnable,以便我们可以在退出mgr之前等待它们退出
    	waitForRunnable sync.WaitGroup
    
    	// 是可运行停止到在mgr实际返回停止之前的持续时间.
    	gracefulShutdownTimeout time.Duration
    
    	// 领导人选举租约丢失时调用
    	onStoppedLeading func()
    
    	// 是可以在关机期间使用的上下文。在gracefulShutdownTimeout结束后它将被取消。在internalStop 
        // 关闭之前一定不能访问它,因为它将为nil
    	shutdownCtx context.Context
    
    	internalCtx    context.Context
    	internalCancel context.CancelFunc
    
    	// 用于协调管理器内部正确关闭服务器. 此通道也用于依赖项注入.
    	internalProceduresStop chan struct{}
    }
    
    • hasCache 接口 实现了Runnable
    type hasCache interface {
    	Runnable
    	GetCache() cache.Cache
    }
    
    • Add 方法 在r上设置依赖项,并将其添加到Runnables列表中以启动。
      func (cm *controllerManager) Add(r Runnable) error {
      	cm.mu.Lock()
      	defer cm.mu.Unlock()
      	if cm.stopProcedureEngaged {
      		return errors.New("can't accept new runnable as stop procedure is already engaged")
      	}
      
      	// 在r上设置依赖项
      	if err := cm.SetFields(r); err != nil {
      		return err
      	}
      
      	var shouldStart bool
      
      	// 添加runnable到nonLeaderElectionRunnables和leaderElectionRunnables
      	if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() {
      		shouldStart = cm.started
      		cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r)
      	} else if hasCache, ok := r.(hasCache); ok {
      		cm.caches = append(cm.caches, hasCache)
      	} else {
      		shouldStart = cm.startedLeader
      		cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r)
      	}
      
        // 开启r
      	if shouldStart {
      		// If already started, start the controller
      		cm.startRunnable(r)
      	}
      
      	return nil
      }
      
      • AddMetricsExtraHandler 方法 在提供指标的http服务器的路径上提供了额外的处理程序。
      func (cm *controllerManager) AddMetricsExtraHandler(path string, handler http.Handler) error {
      	if path == defaultMetricsEndpoint {
      		return fmt.Errorf("overriding builtin %s endpoint is not allowed", defaultMetricsEndpoint)
      	}
      
      	cm.mu.Lock()
      	defer cm.mu.Unlock()
      
      	_, found := cm.metricsExtraHandlers[path]
      	if found {
      		return fmt.Errorf("can't register extra handler by duplicate path %q on metrics http server", path)
      	}
      
      	cm.metricsExtraHandlers[path] = handler
      	cm.logger.V(2).Info("Registering metrics http server extra handler", "path", path)
      	return nil
      }
      
      • serveMetrics方法 开启serve metrics
      func (cm *controllerManager) serveMetrics() {
         // 获取处理path //metrics的handler
      	handler := promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{
      		ErrorHandling: promhttp.HTTPErrorOnError,
      	})
      	// TODO(JoelSpeed): Use existing Kubernetes machinery for serving metrics
      	mux := http.NewServeMux()
      	mux.Handle(defaultMetricsEndpoint, handler)
      
      	func() {
      		cm.mu.Lock()
      		defer cm.mu.Unlock()
      
             // 开启自定义的metricsHandler
      		for path, extraHandler := range cm.metricsExtraHandlers {
      			mux.Handle(path, extraHandler)
      		}
      	}()
      
      	server := http.Server{
      		Handler: mux,
      	}
      	// start metrics server
      	cm.startRunnable(RunnableFunc(func(_ context.Context) error {
      		cm.logger.Info("starting metrics server", "path", defaultMetricsEndpoint)
      		if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed {
      			return err
      		}
      		return nil
      	}))
      
      	// 关闭metrics service 如果有signal
      	<-cm.internalProceduresStop
      	if err := server.Shutdown(cm.shutdownCtx); err != nil {
      		cm.errChan <- err
      	}
      }
      
      • Start方法 mgr启动程序 包括启动controller,Metrics,health,NonLeaderElectionRunnables/LeaderElectionRunnables等
      func (cm *controllerManager) Start(ctx context.Context) (err error) {
      	if err := cm.Add(cm.cluster); err != nil {
      		return fmt.Errorf("failed to add cluster to runnables: %w", err)
      	}
      	cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)
      
      	// 此chan指示停止已完成,也即是,所有可运行对象均已返回,或者在停止请求时超时
      	stopComplete := make(chan struct{})
      	defer close(stopComplete)
      	// 在关闭stopComplete之后必须推迟此操作,否则我们将陷入死锁.
      	defer func() {
      		// https://hips.hearstapps.com/hmg-prod.s3.amazonaws.com/images/gettyimages-459889618-1533579787.jpg
      		stopErr := cm.engageStopProcedure(stopComplete)
      		if stopErr != nil {
      			if err != nil {
      				err = utilerrors.NewAggregate([]error{err, stopErr})
      			} else {
      				err = stopErr
      			}
      		}
      	}()
      
      	cm.errChan = make(chan error)
      
      	// 无论控制器是否为领导者,都应提供度量标准。 
         // (如果我们不为非领导者提供指标,则Prometheus仍会抓取 pod,但会拒绝连接)
      	if cm.metricsListener != nil {
      		go cm.serveMetrics()
      	}
      
      	// 服务健康探针
      	if cm.healthProbeListener != nil {
      		go cm.serveHealthProbes()
      	}
      
      	go cm.startNonLeaderElectionRunnables()
      
      	go func() {
      		if cm.resourceLock != nil {
      			err := cm.startLeaderElection()
      			if err != nil {
      				cm.errChan <- err
      			}
      		} else {
      			// Treat not having leader election enabled the same as being elected.
      			cm.startLeaderElectionRunnables()
      			close(cm.elected)
      		}
      	}()
      
      	select {
      	case <-ctx.Done():
      		// We are done
      		return nil
      	case err := <-cm.errChan:
      		// Error starting or running a runnable
      		return err
      	}
      }
      
  7. signals 用于通知信号的包
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐