control-manager

资源控制器主要是为了控制各种资源的变更信息,例如pod的创建新增,副本控制器和账户控制器等信息,资源控制器的主要职责就是通过list-watch机制,从APIServer处获取所有的操作,从而将资源的操作依次解耦,通过不同的事件来驱动整个k8s的步骤。最容易理解的一张图(该图摘自于网络)如下所示;

在这里插入图片描述

具体细节先不做考究,大致的流程便是如图中描述的几步流程,依次通过APIServer的list-watch功能将数据依次流转出去。因为本次查阅的k8s1.1版本的代码较早,些许概念可能与上图不符合,但是基础流程还是相同的。

流程分析

先进行代码流程的查阅,后面再分析其中的一些实现机制。

control-manager启动分析
func main() {
	runtime.GOMAXPROCS(runtime.NumCPU())
	s := app.NewCMServer()  				// 生成一个服务端
	s.AddFlags(pflag.CommandLine)   // 解析参数

	util.InitFlags()
	util.InitLogs()
	defer util.FlushLogs()

	verflag.PrintAndExitIfRequested()

	if err := s.Run(pflag.CommandLine.Args()); err != nil {  // 开始运行
		fmt.Fprintf(os.Stderr, "%v\n", err)
		os.Exit(1)
	}
}

因为control-manager会进行k8s中的各种的资源的管理分发工作,在启动的过程中就会设置一些默认的例如同步时间等参数。

// NewCMServer creates a new CMServer with a default config.
func NewCMServer() *CMServer {
	s := CMServer{
		Port:                              ports.ControllerManagerPort,  // 本机监听端口
		Address:                           net.ParseIP("127.0.0.1"), 		 // 本机监听IP
		ConcurrentEndpointSyncs:           5, 													 // endpoint并发操作数量
		ConcurrentRCSyncs:                 5, 													 // RC并发操作数量
		ConcurrentDSCSyncs:                2, 													 // DS并发操作数量
		ConcurrentJobSyncs:                5, 													 // Job并发操作数量
		ServiceSyncPeriod:                 5 * time.Minute, 						 // 服务同步时间
		NodeSyncPeriod:                    10 * time.Second,
		ResourceQuotaSyncPeriod:           10 * time.Second,
		NamespaceSyncPeriod:               5 * time.Minute, 						 // 命名空间同步时间
		PVClaimBinderSyncPeriod:           10 * time.Second,
		HorizontalPodAutoscalerSyncPeriod: 30 * time.Second,
		DeploymentControllerSyncPeriod:    30 * time.Second, 						 // deployement同步时间
		MinResyncPeriod:                   12 * time.Hour,
		RegisterRetryCount:                10,
		PodEvictionTimeout:                5 * time.Minute,
		ClusterName:                       "kubernetes",
		TerminatedPodGCThreshold:          12500,
		VolumeConfigFlags: VolumeConfigFlags{
			// default values here
			PersistentVolumeRecyclerMinimumTimeoutNFS:        300,
			PersistentVolumeRecyclerIncrementTimeoutNFS:      30,
			PersistentVolumeRecyclerMinimumTimeoutHostPath:   60,
			PersistentVolumeRecyclerIncrementTimeoutHostPath: 30,
		},
	}
	return &s
}


// Run runs the CMServer.  This should never exit.
func (s *CMServer) Run(_ []string) error {
	if s.Kubeconfig == "" && s.Master == "" {
		glog.Warningf("Neither --kubeconfig nor --master was specified.  Using default API client.  This might not work.")
	}

	// This creates a client, first loading any specified kubeconfig
	// file, and then overriding the Master flag, if non-empty.
	kubeconfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
		&clientcmd.ClientConfigLoadingRules{ExplicitPath: s.Kubeconfig},
		&clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: s.Master}}).ClientConfig()    // 获取配置信息
	if err != nil {
		return err
	}

	kubeconfig.QPS = 20.0
	kubeconfig.Burst = 30

	kubeClient, err := client.New(kubeconfig)   // 获取连接到master的客户端连接
	if err != nil {
		glog.Fatalf("Invalid API configuration: %v", err)
	}

	go func() {
		mux := http.NewServeMux()   	// 注册调试接口信息
		healthz.InstallHandler(mux)
		if s.EnableProfiling {
			mux.HandleFunc("/debug/pprof/", pprof.Index)
			mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
			mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
		}
		mux.Handle("/metrics", prometheus.Handler())  // 注册prometheus监控信息

		server := &http.Server{
			Addr:    net.JoinHostPort(s.Address.String(), strconv.Itoa(s.Port)),
			Handler: mux,
		}
		glog.Fatal(server.ListenAndServe())
	}()

	go endpointcontroller.NewEndpointController(kubeClient, s.resyncPeriod).
		Run(s.ConcurrentEndpointSyncs, util.NeverStop)  // endpointcontroller监控运行

	go replicationControllerPkg.NewReplicationManager(kubeClient, s.resyncPeriod, replicationControllerPkg.BurstReplicas).
		Run(s.ConcurrentRCSyncs, util.NeverStop)       // RC资源监控运行

	if s.TerminatedPodGCThreshold > 0 {
		go gc.New(kubeClient, s.resyncPeriod, s.TerminatedPodGCThreshold).
			Run(util.NeverStop)   
	}

	cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
	if err != nil {
		glog.Fatalf("Cloud provider could not be initialized: %v", err)
	}

	nodeController := nodecontroller.NewNodeController(cloud, kubeClient,
		s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
		util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
		s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, &s.ClusterCIDR, s.AllocateNodeCIDRs)
	nodeController.Run(s.NodeSyncPeriod)  // node控制器运行

	serviceController := servicecontroller.New(cloud, kubeClient, s.ClusterName)
	if err := serviceController.Run(s.ServiceSyncPeriod, s.NodeSyncPeriod); err != nil {
		glog.Errorf("Failed to start service controller: %v", err) // 服务控制器运行
	}

	if s.AllocateNodeCIDRs {
		if cloud == nil {
			glog.Warning("allocate-node-cidrs is set, but no cloud provider specified. Will not manage routes.")
		} else if routes, ok := cloud.Routes(); !ok {
			glog.Warning("allocate-node-cidrs is set, but cloud provider does not support routes. Will not manage routes.")
		} else {
			routeController := routecontroller.New(routes, kubeClient, s.ClusterName, &s.ClusterCIDR)
			routeController.Run(s.NodeSyncPeriod)
		}
	}

	resourcequotacontroller.NewResourceQuotaController(kubeClient).Run(s.ResourceQuotaSyncPeriod)

	// If apiserver is not running we should wait for some time and fail only then. This is particularly
	// important when we start apiserver and controller manager at the same time.
	var versionStrings []string
	err = wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
		if versionStrings, err = client.ServerAPIVersions(kubeconfig); err == nil {
			return true, nil
		}
		glog.Errorf("Failed to get api versions from server: %v", err)
		return false, nil
	})
	if err != nil {
		glog.Fatalf("Failed to get api versions from server: %v", err)
	}
	versions := &unversioned.APIVersions{Versions: versionStrings}

	resourceMap, err := kubeClient.Discovery().ServerResources()
	if err != nil {
		glog.Fatalf("Failed to get supported resources from server: %v", err)
	}

	namespacecontroller.NewNamespaceController(kubeClient, versions, s.NamespaceSyncPeriod).Run()   // 命名空间运行

	groupVersion := "extensions/v1beta1"
	resources, found := resourceMap[groupVersion]
	// TODO: this needs to be dynamic so users don't have to restart their controller manager if they change the apiserver
	if containsVersion(versions, groupVersion) && found {
		glog.Infof("Starting %s apis", groupVersion)
		if containsResource(resources, "horizontalpodautoscalers") {
			glog.Infof("Starting horizontal pod controller.")
			podautoscaler.NewHorizontalController(kubeClient, metrics.NewHeapsterMetricsClient(kubeClient)).
				Run(s.HorizontalPodAutoscalerSyncPeriod)
		}

		if containsResource(resources, "daemonsets") {
			glog.Infof("Starting daemon set controller")
			go daemon.NewDaemonSetsController(kubeClient, s.resyncPeriod).
				Run(s.ConcurrentDSCSyncs, util.NeverStop)
		}

		if containsResource(resources, "jobs") {
			glog.Infof("Starting job controller")
			go job.NewJobController(kubeClient, s.resyncPeriod).
				Run(s.ConcurrentJobSyncs, util.NeverStop)
		}

		if containsResource(resources, "deployments") {
			glog.Infof("Starting deployment controller")
			deployment.New(kubeClient).
				Run(s.DeploymentControllerSyncPeriod)
		}
	}

	pvclaimBinder := volumeclaimbinder.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod)
	pvclaimBinder.Run()

	pvRecycler, err := volumeclaimbinder.NewPersistentVolumeRecycler(kubeClient, s.PVClaimBinderSyncPeriod, ProbeRecyclableVolumePlugins(s.VolumeConfigFlags))
	if err != nil {
		glog.Fatalf("Failed to start persistent volume recycler: %+v", err)
	}
	pvRecycler.Run()

	var rootCA []byte

	if s.RootCAFile != "" {
		rootCA, err = ioutil.ReadFile(s.RootCAFile)
		if err != nil {
			return fmt.Errorf("error reading root-ca-file at %s: %v", s.RootCAFile, err)
		}
		if _, err := util.CertsFromPEM(rootCA); err != nil {
			return fmt.Errorf("error parsing root-ca-file at %s: %v", s.RootCAFile, err)
		}
	} else {
		rootCA = kubeconfig.CAData
	}

	if len(s.ServiceAccountKeyFile) > 0 {
		privateKey, err := serviceaccount.ReadPrivateKey(s.ServiceAccountKeyFile)
		if err != nil {
			glog.Errorf("Error reading key for service account token controller: %v", err)
		} else {
			serviceaccount.NewTokensController(
				kubeClient,
				serviceaccount.TokensControllerOptions{
					TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey),
					RootCA:         rootCA,
				},
			).Run()
		}
	}

	serviceaccount.NewServiceAccountsController(
		kubeClient,
		serviceaccount.DefaultServiceAccountsControllerOptions(),
	).Run()   // 账户控制器运行

	select {}
}

从主要的流程可知,在Run函数中主要就是通过配置文件然后开启所有的控制器的运行,控制器的运行就是从APIServer获取变更的数据信息,然后加工完成之后将数据回写到APIServer,然后通过数据更改的事件推动后续的资源控制。

Informer机制-endpointcontroller实现概述

资源控制最底层都使用了Informer的机制来实现的,故其他资源控制器就不展开赘述,当前先分析一下endpointcontroller的实现流程。

// NewEndpointController returns a new *EndpointController.
func NewEndpointController(client *client.Client, resyncPeriod controller.ResyncPeriodFunc) *EndpointController {
	e := &EndpointController{
		client: client,
		queue:  workqueue.New(),
	}   		// 保存客户端连接,新生成一个队列

	e.serviceStore.Store, e.serviceController = framework.NewInformer(
		&cache.ListWatch{
			ListFunc: func() (runtime.Object, error) {
				return e.client.Services(api.NamespaceAll).List(labels.Everything()) // 获取服务
			},
			WatchFunc: func(rv string) (watch.Interface, error) {
				return e.client.Services(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv)   // 监控服务信息
			},
		},   // 设置ListWatch函数
		&api.Service{},
		// TODO: Can we have much longer period here?
		FullServiceResyncPeriod,
		framework.ResourceEventHandlerFuncs{
			AddFunc: e.enqueueService,   	// 新增的触发函数
			UpdateFunc: func(old, cur interface{}) {
				e.enqueueService(cur)
			},  													// 更新的触发函数
			DeleteFunc: e.enqueueService,   // 触发的函数
		},
	)   // 生成一个Informer

	e.podStore.Store, e.podController = framework.NewInformer(
		&cache.ListWatch{
			ListFunc: func() (runtime.Object, error) {
				return e.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything())
			},
			WatchFunc: func(rv string) (watch.Interface, error) {
				return e.client.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv)
			},
		},
		&api.Pod{},
		resyncPeriod(),
		framework.ResourceEventHandlerFuncs{
			AddFunc:    e.addPod,     // Pod新增的触发函数
			UpdateFunc: e.updatePod,  // Pod更新的触发函数
			DeleteFunc: e.deletePod,  // Pod删除的触发函数
		},
	)

	return e
}

通过该初始化的函数可知,endpointcontrol生成了两个Informer并配置了不同的触发函数和ListWatch函数,再出现事件通知的情况下,就调用对应的触发函数进行处理。

// Runs e; will not return until stopCh is closed. workers determines how many
// endpoints will be handled in parallel.
func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
	defer util.HandleCrash()
	go e.serviceController.Run(stopCh)   		// 开始serviceController的监控
	go e.podController.Run(stopCh)  				// 开始podController的监控
	for i := 0; i < workers; i++ {
		go util.Until(e.worker, time.Second, stopCh)  // 根据配置的worker的数量来启动worker消费变更的信息
	}
	go func() {
		defer util.HandleCrash()
		time.Sleep(5 * time.Minute) // give time for our cache to fill
		e.checkLeftoverEndpoints()
	}()
	<-stopCh
	e.queue.ShutDown()   // 如果退出则关闭
}

通过Run函数的实现可知,基本上分成了接受变更消息然后通过队列传给了worker处理,从而提高处理性能。

func NewInformer(
	lw cache.ListerWatcher,
	objType runtime.Object,
	resyncPeriod time.Duration,
	h ResourceEventHandler,
) (cache.Store, *Controller) {
	// This will hold the client state, as we know it.
	clientState := cache.NewStore(DeletionHandlingMetaNamespaceKeyFunc)  // 获取状态

	// This will hold incoming changes. Note how we pass clientState in as a
	// KeyLister, that way resync operations will result in the correct set
	// of update/delete deltas.
	fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, clientState)  // 生成一个先入先出队列

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    lw,
		ObjectType:       objType,
		FullResyncPeriod: resyncPeriod,
		RetryOnError:     false,

		Process: func(obj interface{}) error {
			// from oldest to newest
			for _, d := range obj.(cache.Deltas) {
				switch d.Type {
				case cache.Sync, cache.Added, cache.Updated:
					if old, exists, err := clientState.Get(d.Object); err == nil && exists {
						if err := clientState.Update(d.Object); err != nil {
							return err
						}
						h.OnUpdate(old, d.Object)   // 调用配置的更新触发函数
					} else {
						if err := clientState.Add(d.Object); err != nil {
							return err
						}
						h.OnAdd(d.Object)   			  // 调用新增回调函数
					}
				case cache.Deleted:
					if err := clientState.Delete(d.Object); err != nil {
						return err
					}
					h.OnDelete(d.Object)    			// 调用删除回调函数
				}
			}
			return nil
		},
	}
	return clientState, New(cfg)
}

再初始化所有cfg之后就调用了New函数来生成一个Controler,在Run的过程中最终也是调用的cfg生成的controler的Run方法。

func (c *Controller) Run(stopCh <-chan struct{}) {
	defer util.HandleCrash()
	r := cache.NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)   // 生成一个Reflector

	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()

	r.RunUntil(stopCh) 															 // 执行list-watch流程

	util.Until(c.processLoop, time.Second, stopCh)   // 一直循环执行processLoop函数
}

...

func (c *Controller) processLoop() {
	for {
		obj := c.config.Queue.Pop()   					// 从队列中获取对象
		err := c.config.Process(obj) 						// 调用process处理分发请求
		if err != nil {
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

此时,通过list-watch机制将获取到的变更的信息就会被EndpointController的注册的回调函数enqueueService、addPod、updatePod和deletePod方法,只要出现ServerController监控的信息改变则会调用enqueueService,只要出现Pod相关的资源数据改变则会调用addPod、updatePod和deletePod方法。

// When a pod is added, figure out what services it will be a member of and
// enqueue them. obj must have *api.Pod type.
func (e *EndpointController) addPod(obj interface{}) {
	pod := obj.(*api.Pod)
	services, err := e.getPodServiceMemberships(pod)   // 获取关联的服务信息
	if err != nil {
		glog.Errorf("Unable to get pod %v/%v's service memberships: %v", pod.Namespace, pod.Name, err)
		return
	}
	for key := range services {
		e.queue.Add(key)   		// 队列中添加所有关联的服务信息
	}
}

// When a pod is updated, figure out what services it used to be a member of
// and what services it will be a member of, and enqueue the union of these.
// old and cur must be *api.Pod types.
func (e *EndpointController) updatePod(old, cur interface{}) {
	if api.Semantic.DeepEqual(old, cur) {
		return
	}
	newPod := old.(*api.Pod)
	services, err := e.getPodServiceMemberships(newPod)  	// 获取所有的服务信息
	if err != nil {
		glog.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err)
		return
	}

	oldPod := cur.(*api.Pod)
	// Only need to get the old services if the labels changed.
	if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) {
		oldServices, err := e.getPodServiceMemberships(oldPod)
		if err != nil {
			glog.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err)
			return
		}
		services = services.Union(oldServices)
	}
	for key := range services {
		e.queue.Add(key)  		 		// 将更新的服务信息添加到队列中
	}
}

// When a pod is deleted, enqueue the services the pod used to be a member of.
// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
func (e *EndpointController) deletePod(obj interface{}) {
	if _, ok := obj.(*api.Pod); ok {
		// Enqueue all the services that the pod used to be a member
		// of. This happens to be exactly the same thing we do when a
		// pod is added.
		e.addPod(obj)      			
		return
	}
	podKey, err := keyFunc(obj)
	if err != nil {
		glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
	}
	glog.Infof("Pod %q was deleted but we don't have a record of its final state, so it will take up to %v before it will be removed from all endpoint records.", podKey, FullServiceResyncPeriod)

	// TODO: keep a map of pods to services to handle this condition.
}

// obj could be an *api.Service, or a DeletionFinalStateUnknown marker item.
func (e *EndpointController) enqueueService(obj interface{}) {
	key, err := keyFunc(obj)
	if err != nil {
		glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
	}

	e.queue.Add(key)   	// 添加到队列中
}

// worker runs a worker thread that just dequeues items, processes them, and
// marks them done. You may run as many of these in parallel as you wish; the
// workqueue guarantees that they will not end up processing the same service
// at the same time.
func (e *EndpointController) worker() {
	for {
		func() {
			key, quit := e.queue.Get()  	// 从队列中取数据
			if quit {
				return
			}
			// Use defer: in the unlikely event that there's a
			// panic, we'd still like this to get marked done--
			// otherwise the controller will not be able to sync
			// this service again until it is restarted.
			defer e.queue.Done(key)
			e.syncService(key.(string)) 		// 同步数据
		}()
	}
}

从处理流程可知,无论是增加都是通过转换成service来进行最终的处理,最终都会调用syncService函数来进行处理。

func (e *EndpointController) syncService(key string) {
	startTime := time.Now()
	defer func() {
		glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime))
	}()
	obj, exists, err := e.serviceStore.Store.GetByKey(key)    // 从本地缓存中获取数据
	if err != nil || !exists {
		// Delete the corresponding endpoint, as the service has been deleted.
		// TODO: Please note that this will delete an endpoint when a
		// service is deleted. However, if we're down at the time when
		// the service is deleted, we will miss that deletion, so this
		// doesn't completely solve the problem. See #6877.
		namespace, name, err := cache.SplitMetaNamespaceKey(key)  // 如果不在则通过client来重新拉取数据
		if err != nil {
			glog.Errorf("Need to delete endpoint with key %q, but couldn't understand the key: %v", key, err)
			// Don't retry, as the key isn't going to magically become understandable.
			return
		}
		err = e.client.Endpoints(namespace).Delete(name)
		if err != nil && !errors.IsNotFound(err) {
			glog.Errorf("Error deleting endpoint %q: %v", key, err)
			e.queue.Add(key) // Retry
		}
		return
	}

	service := obj.(*api.Service)   	// 获取service
	if service.Spec.Selector == nil {
		// services without a selector receive no endpoints from this controller;
		// these services will receive the endpoints that are created out-of-band via the REST API.
		return
	}

	glog.V(5).Infof("About to update endpoints for service %q", key)
	pods, err := e.podStore.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelector())
	if err != nil {
		// Since we're getting stuff from a local cache, it is
		// basically impossible to get this error.
		glog.Errorf("Error syncing service %q: %v", key, err)
		e.queue.Add(key) // Retry
		return
	}    	// 获取所有的pods信息

	subsets := []api.EndpointSubset{}  // 通过pod来组件Endpoint信息
	for i := range pods.Items {
		pod := &pods.Items[i]

		for i := range service.Spec.Ports {
			servicePort := &service.Spec.Ports[i]

			portName := servicePort.Name   
			portProto := servicePort.Protocol
			portNum, err := findPort(pod, servicePort)
			if err != nil {
				glog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
				continue
			}
			if len(pod.Status.PodIP) == 0 {
				glog.V(5).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)
				continue
			}
			if pod.DeletionTimestamp != nil {
				glog.V(5).Infof("Pod is being deleted %s/%s", pod.Namespace, pod.Name)
				continue
			}

			epp := api.EndpointPort{Name: portName, Port: portNum, Protocol: portProto}
			epa := api.EndpointAddress{IP: pod.Status.PodIP, TargetRef: &api.ObjectReference{
				Kind:            "Pod",
				Namespace:       pod.ObjectMeta.Namespace,
				Name:            pod.ObjectMeta.Name,
				UID:             pod.ObjectMeta.UID,
				ResourceVersion: pod.ObjectMeta.ResourceVersion,
			}}
			if api.IsPodReady(pod) {
				subsets = append(subsets, api.EndpointSubset{
					Addresses: []api.EndpointAddress{epa},
					Ports:     []api.EndpointPort{epp},
				})
			} else {
				glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name)
				subsets = append(subsets, api.EndpointSubset{
					NotReadyAddresses: []api.EndpointAddress{epa},
					Ports:             []api.EndpointPort{epp},
				})
			}
		}
	}
	subsets = endpoints.RepackSubsets(subsets)

	// See if there's actually an update here.
	currentEndpoints, err := e.client.Endpoints(service.Namespace).Get(service.Name)  // 通过client来连接APIServer确认该服务的信息
	if err != nil {
		if errors.IsNotFound(err) {
			currentEndpoints = &api.Endpoints{
				ObjectMeta: api.ObjectMeta{
					Name:   service.Name,
					Labels: service.Labels,
				},
			}
		} else {
			glog.Errorf("Error getting endpoints: %v", err)
			e.queue.Add(key) // Retry
			return
		}
	}
	if reflect.DeepEqual(currentEndpoints.Subsets, subsets) && reflect.DeepEqual(currentEndpoints.Labels, service.Labels) {
		glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
		return
	}
	newEndpoints := currentEndpoints  	// 设置新的endpoints的相关信息
	newEndpoints.Subsets = subsets
	newEndpoints.Labels = service.Labels

	if len(currentEndpoints.ResourceVersion) == 0 {
		// No previous endpoints, create them
		_, err = e.client.Endpoints(service.Namespace).Create(newEndpoints)  // 如果没有则新建
	} else {
		// Pre-existing
		_, err = e.client.Endpoints(service.Namespace).Update(newEndpoints)   // 如果有则更新
	}
	if err != nil {
		glog.Errorf("Error updating endpoints: %v", err)
		e.queue.Add(key) // Retry  如果出错就重试
	}
}

从流程中可知,将所有的service信息进行最终的对比,将最后处理完成的数据提交到APIServer,继而触发下一个流程。至此一个Informer的运行与实现机制都概述完成,其他的几个控制器基本上也基于此流程实现。

总结

本文只是简单的了解和概述了资源控制器的作用,并概述了一下Informer的基础流程,主要是为了加深一下资源控制器的理解。如果在生成环境中要掌握好k8s还需要进一步阅读与实践最新版本的相关知识。由于本人才疏学浅,如有错误请批评指正。

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐