k8s概念入门之control-manager-针对1.1.版本阅读
control-manager资源控制器主要是为了控制各种资源的变更信息,例如pod的创建新增,副本控制器和账户控制器等信息,资源控制器的主要职责就是通过list-watch机制,从APIServer处获取所有的操作,从而将资源的操作依次解耦,通过不同的事件来驱动整个k8s的步骤。最容易理解的一张图(该图摘自于网络)如下所示;[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(i
control-manager
资源控制器主要是为了控制各种资源的变更信息,例如pod的创建新增,副本控制器和账户控制器等信息,资源控制器的主要职责就是通过list-watch机制,从APIServer处获取所有的操作,从而将资源的操作依次解耦,通过不同的事件来驱动整个k8s的步骤。最容易理解的一张图(该图摘自于网络)如下所示;
具体细节先不做考究,大致的流程便是如图中描述的几步流程,依次通过APIServer的list-watch功能将数据依次流转出去。因为本次查阅的k8s1.1版本的代码较早,些许概念可能与上图不符合,但是基础流程还是相同的。
流程分析
先进行代码流程的查阅,后面再分析其中的一些实现机制。
control-manager启动分析
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
s := app.NewCMServer() // 生成一个服务端
s.AddFlags(pflag.CommandLine) // 解析参数
util.InitFlags()
util.InitLogs()
defer util.FlushLogs()
verflag.PrintAndExitIfRequested()
if err := s.Run(pflag.CommandLine.Args()); err != nil { // 开始运行
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}
因为control-manager会进行k8s中的各种的资源的管理分发工作,在启动的过程中就会设置一些默认的例如同步时间等参数。
// NewCMServer creates a new CMServer with a default config.
func NewCMServer() *CMServer {
s := CMServer{
Port: ports.ControllerManagerPort, // 本机监听端口
Address: net.ParseIP("127.0.0.1"), // 本机监听IP
ConcurrentEndpointSyncs: 5, // endpoint并发操作数量
ConcurrentRCSyncs: 5, // RC并发操作数量
ConcurrentDSCSyncs: 2, // DS并发操作数量
ConcurrentJobSyncs: 5, // Job并发操作数量
ServiceSyncPeriod: 5 * time.Minute, // 服务同步时间
NodeSyncPeriod: 10 * time.Second,
ResourceQuotaSyncPeriod: 10 * time.Second,
NamespaceSyncPeriod: 5 * time.Minute, // 命名空间同步时间
PVClaimBinderSyncPeriod: 10 * time.Second,
HorizontalPodAutoscalerSyncPeriod: 30 * time.Second,
DeploymentControllerSyncPeriod: 30 * time.Second, // deployement同步时间
MinResyncPeriod: 12 * time.Hour,
RegisterRetryCount: 10,
PodEvictionTimeout: 5 * time.Minute,
ClusterName: "kubernetes",
TerminatedPodGCThreshold: 12500,
VolumeConfigFlags: VolumeConfigFlags{
// default values here
PersistentVolumeRecyclerMinimumTimeoutNFS: 300,
PersistentVolumeRecyclerIncrementTimeoutNFS: 30,
PersistentVolumeRecyclerMinimumTimeoutHostPath: 60,
PersistentVolumeRecyclerIncrementTimeoutHostPath: 30,
},
}
return &s
}
// Run runs the CMServer. This should never exit.
func (s *CMServer) Run(_ []string) error {
if s.Kubeconfig == "" && s.Master == "" {
glog.Warningf("Neither --kubeconfig nor --master was specified. Using default API client. This might not work.")
}
// This creates a client, first loading any specified kubeconfig
// file, and then overriding the Master flag, if non-empty.
kubeconfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: s.Kubeconfig},
&clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: s.Master}}).ClientConfig() // 获取配置信息
if err != nil {
return err
}
kubeconfig.QPS = 20.0
kubeconfig.Burst = 30
kubeClient, err := client.New(kubeconfig) // 获取连接到master的客户端连接
if err != nil {
glog.Fatalf("Invalid API configuration: %v", err)
}
go func() {
mux := http.NewServeMux() // 注册调试接口信息
healthz.InstallHandler(mux)
if s.EnableProfiling {
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
}
mux.Handle("/metrics", prometheus.Handler()) // 注册prometheus监控信息
server := &http.Server{
Addr: net.JoinHostPort(s.Address.String(), strconv.Itoa(s.Port)),
Handler: mux,
}
glog.Fatal(server.ListenAndServe())
}()
go endpointcontroller.NewEndpointController(kubeClient, s.resyncPeriod).
Run(s.ConcurrentEndpointSyncs, util.NeverStop) // endpointcontroller监控运行
go replicationControllerPkg.NewReplicationManager(kubeClient, s.resyncPeriod, replicationControllerPkg.BurstReplicas).
Run(s.ConcurrentRCSyncs, util.NeverStop) // RC资源监控运行
if s.TerminatedPodGCThreshold > 0 {
go gc.New(kubeClient, s.resyncPeriod, s.TerminatedPodGCThreshold).
Run(util.NeverStop)
}
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
if err != nil {
glog.Fatalf("Cloud provider could not be initialized: %v", err)
}
nodeController := nodecontroller.NewNodeController(cloud, kubeClient,
s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, &s.ClusterCIDR, s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod) // node控制器运行
serviceController := servicecontroller.New(cloud, kubeClient, s.ClusterName)
if err := serviceController.Run(s.ServiceSyncPeriod, s.NodeSyncPeriod); err != nil {
glog.Errorf("Failed to start service controller: %v", err) // 服务控制器运行
}
if s.AllocateNodeCIDRs {
if cloud == nil {
glog.Warning("allocate-node-cidrs is set, but no cloud provider specified. Will not manage routes.")
} else if routes, ok := cloud.Routes(); !ok {
glog.Warning("allocate-node-cidrs is set, but cloud provider does not support routes. Will not manage routes.")
} else {
routeController := routecontroller.New(routes, kubeClient, s.ClusterName, &s.ClusterCIDR)
routeController.Run(s.NodeSyncPeriod)
}
}
resourcequotacontroller.NewResourceQuotaController(kubeClient).Run(s.ResourceQuotaSyncPeriod)
// If apiserver is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and controller manager at the same time.
var versionStrings []string
err = wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
if versionStrings, err = client.ServerAPIVersions(kubeconfig); err == nil {
return true, nil
}
glog.Errorf("Failed to get api versions from server: %v", err)
return false, nil
})
if err != nil {
glog.Fatalf("Failed to get api versions from server: %v", err)
}
versions := &unversioned.APIVersions{Versions: versionStrings}
resourceMap, err := kubeClient.Discovery().ServerResources()
if err != nil {
glog.Fatalf("Failed to get supported resources from server: %v", err)
}
namespacecontroller.NewNamespaceController(kubeClient, versions, s.NamespaceSyncPeriod).Run() // 命名空间运行
groupVersion := "extensions/v1beta1"
resources, found := resourceMap[groupVersion]
// TODO: this needs to be dynamic so users don't have to restart their controller manager if they change the apiserver
if containsVersion(versions, groupVersion) && found {
glog.Infof("Starting %s apis", groupVersion)
if containsResource(resources, "horizontalpodautoscalers") {
glog.Infof("Starting horizontal pod controller.")
podautoscaler.NewHorizontalController(kubeClient, metrics.NewHeapsterMetricsClient(kubeClient)).
Run(s.HorizontalPodAutoscalerSyncPeriod)
}
if containsResource(resources, "daemonsets") {
glog.Infof("Starting daemon set controller")
go daemon.NewDaemonSetsController(kubeClient, s.resyncPeriod).
Run(s.ConcurrentDSCSyncs, util.NeverStop)
}
if containsResource(resources, "jobs") {
glog.Infof("Starting job controller")
go job.NewJobController(kubeClient, s.resyncPeriod).
Run(s.ConcurrentJobSyncs, util.NeverStop)
}
if containsResource(resources, "deployments") {
glog.Infof("Starting deployment controller")
deployment.New(kubeClient).
Run(s.DeploymentControllerSyncPeriod)
}
}
pvclaimBinder := volumeclaimbinder.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod)
pvclaimBinder.Run()
pvRecycler, err := volumeclaimbinder.NewPersistentVolumeRecycler(kubeClient, s.PVClaimBinderSyncPeriod, ProbeRecyclableVolumePlugins(s.VolumeConfigFlags))
if err != nil {
glog.Fatalf("Failed to start persistent volume recycler: %+v", err)
}
pvRecycler.Run()
var rootCA []byte
if s.RootCAFile != "" {
rootCA, err = ioutil.ReadFile(s.RootCAFile)
if err != nil {
return fmt.Errorf("error reading root-ca-file at %s: %v", s.RootCAFile, err)
}
if _, err := util.CertsFromPEM(rootCA); err != nil {
return fmt.Errorf("error parsing root-ca-file at %s: %v", s.RootCAFile, err)
}
} else {
rootCA = kubeconfig.CAData
}
if len(s.ServiceAccountKeyFile) > 0 {
privateKey, err := serviceaccount.ReadPrivateKey(s.ServiceAccountKeyFile)
if err != nil {
glog.Errorf("Error reading key for service account token controller: %v", err)
} else {
serviceaccount.NewTokensController(
kubeClient,
serviceaccount.TokensControllerOptions{
TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey),
RootCA: rootCA,
},
).Run()
}
}
serviceaccount.NewServiceAccountsController(
kubeClient,
serviceaccount.DefaultServiceAccountsControllerOptions(),
).Run() // 账户控制器运行
select {}
}
从主要的流程可知,在Run函数中主要就是通过配置文件然后开启所有的控制器的运行,控制器的运行就是从APIServer获取变更的数据信息,然后加工完成之后将数据回写到APIServer,然后通过数据更改的事件推动后续的资源控制。
Informer机制-endpointcontroller实现概述
资源控制最底层都使用了Informer的机制来实现的,故其他资源控制器就不展开赘述,当前先分析一下endpointcontroller的实现流程。
// NewEndpointController returns a new *EndpointController.
func NewEndpointController(client *client.Client, resyncPeriod controller.ResyncPeriodFunc) *EndpointController {
e := &EndpointController{
client: client,
queue: workqueue.New(),
} // 保存客户端连接,新生成一个队列
e.serviceStore.Store, e.serviceController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return e.client.Services(api.NamespaceAll).List(labels.Everything()) // 获取服务
},
WatchFunc: func(rv string) (watch.Interface, error) {
return e.client.Services(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) // 监控服务信息
},
}, // 设置ListWatch函数
&api.Service{},
// TODO: Can we have much longer period here?
FullServiceResyncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: e.enqueueService, // 新增的触发函数
UpdateFunc: func(old, cur interface{}) {
e.enqueueService(cur)
}, // 更新的触发函数
DeleteFunc: e.enqueueService, // 触发的函数
},
) // 生成一个Informer
e.podStore.Store, e.podController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return e.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything())
},
WatchFunc: func(rv string) (watch.Interface, error) {
return e.client.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv)
},
},
&api.Pod{},
resyncPeriod(),
framework.ResourceEventHandlerFuncs{
AddFunc: e.addPod, // Pod新增的触发函数
UpdateFunc: e.updatePod, // Pod更新的触发函数
DeleteFunc: e.deletePod, // Pod删除的触发函数
},
)
return e
}
通过该初始化的函数可知,endpointcontrol生成了两个Informer并配置了不同的触发函数和ListWatch函数,再出现事件通知的情况下,就调用对应的触发函数进行处理。
// Runs e; will not return until stopCh is closed. workers determines how many
// endpoints will be handled in parallel.
func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
defer util.HandleCrash()
go e.serviceController.Run(stopCh) // 开始serviceController的监控
go e.podController.Run(stopCh) // 开始podController的监控
for i := 0; i < workers; i++ {
go util.Until(e.worker, time.Second, stopCh) // 根据配置的worker的数量来启动worker消费变更的信息
}
go func() {
defer util.HandleCrash()
time.Sleep(5 * time.Minute) // give time for our cache to fill
e.checkLeftoverEndpoints()
}()
<-stopCh
e.queue.ShutDown() // 如果退出则关闭
}
通过Run函数的实现可知,基本上分成了接受变更消息然后通过队列传给了worker处理,从而提高处理性能。
func NewInformer(
lw cache.ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
) (cache.Store, *Controller) {
// This will hold the client state, as we know it.
clientState := cache.NewStore(DeletionHandlingMetaNamespaceKeyFunc) // 获取状态
// This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, clientState) // 生成一个先入先出队列
cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
Process: func(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(cache.Deltas) {
switch d.Type {
case cache.Sync, cache.Added, cache.Updated:
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
if err := clientState.Update(d.Object); err != nil {
return err
}
h.OnUpdate(old, d.Object) // 调用配置的更新触发函数
} else {
if err := clientState.Add(d.Object); err != nil {
return err
}
h.OnAdd(d.Object) // 调用新增回调函数
}
case cache.Deleted:
if err := clientState.Delete(d.Object); err != nil {
return err
}
h.OnDelete(d.Object) // 调用删除回调函数
}
}
return nil
},
}
return clientState, New(cfg)
}
再初始化所有cfg之后就调用了New函数来生成一个Controler,在Run的过程中最终也是调用的cfg生成的controler的Run方法。
func (c *Controller) Run(stopCh <-chan struct{}) {
defer util.HandleCrash()
r := cache.NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
) // 生成一个Reflector
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
r.RunUntil(stopCh) // 执行list-watch流程
util.Until(c.processLoop, time.Second, stopCh) // 一直循环执行processLoop函数
}
...
func (c *Controller) processLoop() {
for {
obj := c.config.Queue.Pop() // 从队列中获取对象
err := c.config.Process(obj) // 调用process处理分发请求
if err != nil {
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
此时,通过list-watch机制将获取到的变更的信息就会被EndpointController的注册的回调函数enqueueService、addPod、updatePod和deletePod方法,只要出现ServerController监控的信息改变则会调用enqueueService,只要出现Pod相关的资源数据改变则会调用addPod、updatePod和deletePod方法。
// When a pod is added, figure out what services it will be a member of and
// enqueue them. obj must have *api.Pod type.
func (e *EndpointController) addPod(obj interface{}) {
pod := obj.(*api.Pod)
services, err := e.getPodServiceMemberships(pod) // 获取关联的服务信息
if err != nil {
glog.Errorf("Unable to get pod %v/%v's service memberships: %v", pod.Namespace, pod.Name, err)
return
}
for key := range services {
e.queue.Add(key) // 队列中添加所有关联的服务信息
}
}
// When a pod is updated, figure out what services it used to be a member of
// and what services it will be a member of, and enqueue the union of these.
// old and cur must be *api.Pod types.
func (e *EndpointController) updatePod(old, cur interface{}) {
if api.Semantic.DeepEqual(old, cur) {
return
}
newPod := old.(*api.Pod)
services, err := e.getPodServiceMemberships(newPod) // 获取所有的服务信息
if err != nil {
glog.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err)
return
}
oldPod := cur.(*api.Pod)
// Only need to get the old services if the labels changed.
if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) {
oldServices, err := e.getPodServiceMemberships(oldPod)
if err != nil {
glog.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err)
return
}
services = services.Union(oldServices)
}
for key := range services {
e.queue.Add(key) // 将更新的服务信息添加到队列中
}
}
// When a pod is deleted, enqueue the services the pod used to be a member of.
// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
func (e *EndpointController) deletePod(obj interface{}) {
if _, ok := obj.(*api.Pod); ok {
// Enqueue all the services that the pod used to be a member
// of. This happens to be exactly the same thing we do when a
// pod is added.
e.addPod(obj)
return
}
podKey, err := keyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
}
glog.Infof("Pod %q was deleted but we don't have a record of its final state, so it will take up to %v before it will be removed from all endpoint records.", podKey, FullServiceResyncPeriod)
// TODO: keep a map of pods to services to handle this condition.
}
// obj could be an *api.Service, or a DeletionFinalStateUnknown marker item.
func (e *EndpointController) enqueueService(obj interface{}) {
key, err := keyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
}
e.queue.Add(key) // 添加到队列中
}
// worker runs a worker thread that just dequeues items, processes them, and
// marks them done. You may run as many of these in parallel as you wish; the
// workqueue guarantees that they will not end up processing the same service
// at the same time.
func (e *EndpointController) worker() {
for {
func() {
key, quit := e.queue.Get() // 从队列中取数据
if quit {
return
}
// Use defer: in the unlikely event that there's a
// panic, we'd still like this to get marked done--
// otherwise the controller will not be able to sync
// this service again until it is restarted.
defer e.queue.Done(key)
e.syncService(key.(string)) // 同步数据
}()
}
}
从处理流程可知,无论是增加都是通过转换成service来进行最终的处理,最终都会调用syncService函数来进行处理。
func (e *EndpointController) syncService(key string) {
startTime := time.Now()
defer func() {
glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime))
}()
obj, exists, err := e.serviceStore.Store.GetByKey(key) // 从本地缓存中获取数据
if err != nil || !exists {
// Delete the corresponding endpoint, as the service has been deleted.
// TODO: Please note that this will delete an endpoint when a
// service is deleted. However, if we're down at the time when
// the service is deleted, we will miss that deletion, so this
// doesn't completely solve the problem. See #6877.
namespace, name, err := cache.SplitMetaNamespaceKey(key) // 如果不在则通过client来重新拉取数据
if err != nil {
glog.Errorf("Need to delete endpoint with key %q, but couldn't understand the key: %v", key, err)
// Don't retry, as the key isn't going to magically become understandable.
return
}
err = e.client.Endpoints(namespace).Delete(name)
if err != nil && !errors.IsNotFound(err) {
glog.Errorf("Error deleting endpoint %q: %v", key, err)
e.queue.Add(key) // Retry
}
return
}
service := obj.(*api.Service) // 获取service
if service.Spec.Selector == nil {
// services without a selector receive no endpoints from this controller;
// these services will receive the endpoints that are created out-of-band via the REST API.
return
}
glog.V(5).Infof("About to update endpoints for service %q", key)
pods, err := e.podStore.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelector())
if err != nil {
// Since we're getting stuff from a local cache, it is
// basically impossible to get this error.
glog.Errorf("Error syncing service %q: %v", key, err)
e.queue.Add(key) // Retry
return
} // 获取所有的pods信息
subsets := []api.EndpointSubset{} // 通过pod来组件Endpoint信息
for i := range pods.Items {
pod := &pods.Items[i]
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
portName := servicePort.Name
portProto := servicePort.Protocol
portNum, err := findPort(pod, servicePort)
if err != nil {
glog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
continue
}
if len(pod.Status.PodIP) == 0 {
glog.V(5).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)
continue
}
if pod.DeletionTimestamp != nil {
glog.V(5).Infof("Pod is being deleted %s/%s", pod.Namespace, pod.Name)
continue
}
epp := api.EndpointPort{Name: portName, Port: portNum, Protocol: portProto}
epa := api.EndpointAddress{IP: pod.Status.PodIP, TargetRef: &api.ObjectReference{
Kind: "Pod",
Namespace: pod.ObjectMeta.Namespace,
Name: pod.ObjectMeta.Name,
UID: pod.ObjectMeta.UID,
ResourceVersion: pod.ObjectMeta.ResourceVersion,
}}
if api.IsPodReady(pod) {
subsets = append(subsets, api.EndpointSubset{
Addresses: []api.EndpointAddress{epa},
Ports: []api.EndpointPort{epp},
})
} else {
glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name)
subsets = append(subsets, api.EndpointSubset{
NotReadyAddresses: []api.EndpointAddress{epa},
Ports: []api.EndpointPort{epp},
})
}
}
}
subsets = endpoints.RepackSubsets(subsets)
// See if there's actually an update here.
currentEndpoints, err := e.client.Endpoints(service.Namespace).Get(service.Name) // 通过client来连接APIServer确认该服务的信息
if err != nil {
if errors.IsNotFound(err) {
currentEndpoints = &api.Endpoints{
ObjectMeta: api.ObjectMeta{
Name: service.Name,
Labels: service.Labels,
},
}
} else {
glog.Errorf("Error getting endpoints: %v", err)
e.queue.Add(key) // Retry
return
}
}
if reflect.DeepEqual(currentEndpoints.Subsets, subsets) && reflect.DeepEqual(currentEndpoints.Labels, service.Labels) {
glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
return
}
newEndpoints := currentEndpoints // 设置新的endpoints的相关信息
newEndpoints.Subsets = subsets
newEndpoints.Labels = service.Labels
if len(currentEndpoints.ResourceVersion) == 0 {
// No previous endpoints, create them
_, err = e.client.Endpoints(service.Namespace).Create(newEndpoints) // 如果没有则新建
} else {
// Pre-existing
_, err = e.client.Endpoints(service.Namespace).Update(newEndpoints) // 如果有则更新
}
if err != nil {
glog.Errorf("Error updating endpoints: %v", err)
e.queue.Add(key) // Retry 如果出错就重试
}
}
从流程中可知,将所有的service信息进行最终的对比,将最后处理完成的数据提交到APIServer,继而触发下一个流程。至此一个Informer的运行与实现机制都概述完成,其他的几个控制器基本上也基于此流程实现。
总结
本文只是简单的了解和概述了资源控制器的作用,并概述了一下Informer的基础流程,主要是为了加深一下资源控制器的理解。如果在生成环境中要掌握好k8s还需要进一步阅读与实践最新版本的相关知识。由于本人才疏学浅,如有错误请批评指正。
更多推荐
所有评论(0)