k8s rc 控制器处理
就是在controler manager中创建rc 管理器,数据结构中有pod的存储和controler,pod从podinformer中获取,rc就是通过rc控制器监听apiserver获取,最后进行比较type ReplicationManager struct {kubeClient clientset.InterfacepodControl controller.Po...
·
就是在controler manager中创建rc 管理器,数据结构中有pod的存储和controler,pod从podinformer中获取,rc就是通过rc控制器监听apiserver获取,最后进行比较
type ReplicationManager struct { kubeClient clientset.Interface podControl controller.PodControlInterface // internalPodInformer is used to hold a personal informer. If we're using // a normal shared informer, then the informer will be started for us. If // we have a personal informer, we must start it ourselves. If you start // the controller using NewReplicationManager(passing SharedInformer), this // will be null internalPodInformer framework.SharedIndexInformer // An rc is temporarily suspended after creating/deleting these many replicas. // It resumes normal action after observing the watch events for them. burstReplicas int // To allow injection of syncReplicationController for testing. syncHandler func(rcKey string) error // A TTLCache of pod creates/deletes each rc expects to see. expectations *controller.UIDTrackingControllerExpectations // A store of replication controllers, populated by the rcController rcStore cache.StoreToReplicationControllerLister // Watches changes to all replication controllers rcController *framework.Controller // A store of pods, populated by the podController podStore cache.StoreToPodLister // Watches changes to all pods podController framework.ControllerInterface // podStoreSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. podStoreSynced func() bool lookupCache *controller.MatchingCache // Controllers that need to be synced queue *workqueue.Type }
最后调用函数:比较rc和pod的差异,进行相应操作
func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.ReplicationController) { diff := len(filteredPods) - int(rc.Spec.Replicas) rcKey, err := controller.KeyFunc(rc) if err != nil { glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err) return } if diff < 0 { diff *= -1 if diff > rm.burstReplicas { diff = rm.burstReplicas } // TODO: Track UIDs of creates just like deletes. The problem currently // is we'd need to wait on the result of a create to record the pod's // UID, which would require locking *across* the create, which will turn // into a performance bottleneck. We should generate a UID for the pod // beforehand and store it via ExpectCreations. rm.expectations.ExpectCreations(rcKey, diff) wait := sync.WaitGroup{} wait.Add(diff) glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rc.Namespace, rc.Name, rc.Spec.Replicas, diff) for i := 0; i < diff; i++ { go func() { defer wait.Done() if err := rm.podControl.CreatePods(rc.Namespace, rc.Spec.Template, rc); err != nil { // Decrement the expected number of creates because the informer won't observe this pod glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name) rm.expectations.CreationObserved(rcKey) rm.enqueueController(rc) utilruntime.HandleError(err) } }() } wait.Wait() } else if diff > 0 { if diff > rm.burstReplicas { diff = rm.burstReplicas } glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rc.Namespace, rc.Name, rc.Spec.Replicas, diff) // No need to sort pods if we are about to delete all of them if rc.Spec.Replicas != 0 { // Sort the pods in the order such that not-ready < ready, unscheduled // < scheduled, and pending < running. This ensures that we delete pods // in the earlier stages whenever possible. sort.Sort(controller.ActivePods(filteredPods)) } // Snapshot the UIDs (ns/name) of the pods we're expecting to see // deleted, so we know to record their expectations exactly once either // when we see it as an update of the deletion timestamp, or as a delete. // Note that if the labels on a pod/rc change in a way that the pod gets // orphaned, the rs will only wake up after the expectations have // expired even if other pods are deleted. deletedPodKeys := []string{} for i := 0; i < diff; i++ { deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i])) } // We use pod namespace/name as a UID to wait for deletions, so if the // labels on a pod/rc change in a way that the pod gets orphaned, the // rc will only wake up after the expectation has expired. rm.expectations.ExpectDeletions(rcKey, deletedPodKeys) wait := sync.WaitGroup{} wait.Add(diff) for i := 0; i < diff; i++ { go func(ix int) { defer wait.Done() if err := rm.podControl.DeletePod(rc.Namespace, filteredPods[ix].Name, rc); err != nil { // Decrement the expected number of deletes because the informer won't observe this deletion podKey := controller.PodKey(filteredPods[ix]) glog.V(2).Infof("Failed to delete %v due to %v, decrementing expectations for controller %q/%q", podKey, err, rc.Namespace, rc.Name) rm.expectations.DeletionObserved(rcKey, podKey) rm.enqueueController(rc) utilruntime.HandleError(err) } }(i) } wait.Wait() } }
更多推荐
已为社区贡献16条内容
所有评论(0)