就是在controler manager中创建rc 管理器,数据结构中有pod的存储和controler,pod从podinformer中获取,rc就是通过rc控制器监听apiserver获取,最后进行比较

type ReplicationManager struct {
   kubeClient clientset.Interface
   podControl controller.PodControlInterface

   // internalPodInformer is used to hold a personal informer.  If we're using
   // a normal shared informer, then the informer will be started for us.  If
   // we have a personal informer, we must start it ourselves.   If you start
   // the controller using NewReplicationManager(passing SharedInformer), this
   // will be null
   internalPodInformer framework.SharedIndexInformer

   // An rc is temporarily suspended after creating/deleting these many replicas.
   // It resumes normal action after observing the watch events for them.
   burstReplicas int
   // To allow injection of syncReplicationController for testing.
   syncHandler func(rcKey string) error

   // A TTLCache of pod creates/deletes each rc expects to see.
   expectations *controller.UIDTrackingControllerExpectations

   // A store of replication controllers, populated by the rcController
   rcStore cache.StoreToReplicationControllerLister
   // Watches changes to all replication controllers
   rcController *framework.Controller
   // A store of pods, populated by the podController
   podStore cache.StoreToPodLister
   // Watches changes to all pods
   podController framework.ControllerInterface
   // podStoreSynced returns true if the pod store has been synced at least once.
   // Added as a member to the struct to allow injection for testing.
   podStoreSynced func() bool

   lookupCache *controller.MatchingCache

   // Controllers that need to be synced
   queue *workqueue.Type
}

 

最后调用函数:比较rc和pod的差异,进行相应操作

func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.ReplicationController) {
   diff := len(filteredPods) - int(rc.Spec.Replicas)
   rcKey, err := controller.KeyFunc(rc)
   if err != nil {
      glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err)
      return
   }
   if diff < 0 {
      diff *= -1
      if diff > rm.burstReplicas {
         diff = rm.burstReplicas
      }
      // TODO: Track UIDs of creates just like deletes. The problem currently
      // is we'd need to wait on the result of a create to record the pod's
      // UID, which would require locking *across* the create, which will turn
      // into a performance bottleneck. We should generate a UID for the pod
      // beforehand and store it via ExpectCreations.
      rm.expectations.ExpectCreations(rcKey, diff)
      wait := sync.WaitGroup{}
      wait.Add(diff)
      glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rc.Namespace, rc.Name, rc.Spec.Replicas, diff)
      for i := 0; i < diff; i++ {
         go func() {
            defer wait.Done()
            if err := rm.podControl.CreatePods(rc.Namespace, rc.Spec.Template, rc); err != nil {
               // Decrement the expected number of creates because the informer won't observe this pod
               glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name)
               rm.expectations.CreationObserved(rcKey)
               rm.enqueueController(rc)
               utilruntime.HandleError(err)
            }
         }()
      }
      wait.Wait()
   } else if diff > 0 {
      if diff > rm.burstReplicas {
         diff = rm.burstReplicas
      }
      glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rc.Namespace, rc.Name, rc.Spec.Replicas, diff)
      // No need to sort pods if we are about to delete all of them
      if rc.Spec.Replicas != 0 {
         // Sort the pods in the order such that not-ready < ready, unscheduled
         // < scheduled, and pending < running. This ensures that we delete pods
         // in the earlier stages whenever possible.
         sort.Sort(controller.ActivePods(filteredPods))
      }
      // Snapshot the UIDs (ns/name) of the pods we're expecting to see
      // deleted, so we know to record their expectations exactly once either
      // when we see it as an update of the deletion timestamp, or as a delete.
      // Note that if the labels on a pod/rc change in a way that the pod gets
      // orphaned, the rs will only wake up after the expectations have
      // expired even if other pods are deleted.
      deletedPodKeys := []string{}
      for i := 0; i < diff; i++ {
         deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i]))
      }
      // We use pod namespace/name as a UID to wait for deletions, so if the
      // labels on a pod/rc change in a way that the pod gets orphaned, the
      // rc will only wake up after the expectation has expired.
      rm.expectations.ExpectDeletions(rcKey, deletedPodKeys)
      wait := sync.WaitGroup{}
      wait.Add(diff)
      for i := 0; i < diff; i++ {
         go func(ix int) {
            defer wait.Done()
            if err := rm.podControl.DeletePod(rc.Namespace, filteredPods[ix].Name, rc); err != nil {
               // Decrement the expected number of deletes because the informer won't observe this deletion
               podKey := controller.PodKey(filteredPods[ix])
               glog.V(2).Infof("Failed to delete %v due to %v, decrementing expectations for controller %q/%q", podKey, err, rc.Namespace, rc.Name)
               rm.expectations.DeletionObserved(rcKey, podKey)
               rm.enqueueController(rc)
               utilruntime.HandleError(err)
            }
         }(i)
      }
      wait.Wait()
   }
}
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐