k8s源码分析--kubelet中pod处理流程(续)
源码为k8s v1.3.0稳定版本(7) updatepod起的协程在何时退出再kubelet中有两个过程涉及到两个处理函数 一个是: HandlePodRemoves-》deletePod-》kl.podWorkers.ForgetWorker(pod.UID)-》removeWorker 另外一个是: HandlePodCleanups -》Forget
源码为k8s v1.3.0稳定版本
(7) updatepod起的协程在何时退出
再kubelet中有两个过程涉及到两个处理函数
一个是: HandlePodRemoves-》deletePod-》kl.podWorkers.ForgetWorker(pod.UID)-》removeWorker
另外一个是: HandlePodCleanups -》ForgetNonExistingPodWorkers-》 然后循环遍历所有的pod -》 removeWorker
(8)mainloop中的处理
step1:接收updatepod函数发送过来的事件消息,不断的进行处理
step2: 比较最后同步时间
step3: 进行同步处理,同步处理的函数为kubelet中的syncPod函数
k8s.io\kubernetes\pkg\kubelet\kubelet.go:1703
step4:更新最后同步的时间
step5:执行完成的回调函数:OnCompleteFunc
step6:wrapUp中设置一个时间间隔(待确定具体工作流程)
(9) Kubelet中,syncPod的处理
从pod管理往下走,pod管理最后进入到kubelet中的 syncPod,在 sycnPod中,会进入到下一级的sycnPod
result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
然后在Cache 中进行状态同步
kl.reasonCache.Update(pod.UID, result)
// syncPod is the transaction script for the sync of a single pod. // // Arguments: // // pod - the pod to sync // mirrorPod - the mirror pod for the pod to sync, if it is a static pod // podStatus - the current status (TODO: always from the status manager?) // updateType - the type of update (ADD, UPDATE, REMOVE, RECONCILE, DELETE) // // The workflow is: // * If the pod is being created, record pod worker start latency // * Call generateAPIPodStatus to prepare an api.PodStatus for the pod // * If the pod is being seen as running for the first time, record pod // start latency // * Update the status of the pod in the status manager // * Kill the pod if it should not be running // * Create a mirror pod if the pod is a static pod, and does not // already have a mirror pod // * Create the data directories for the pod if they do not exist // * Wait for volumes to attach/mount // * Fetch the pull secrets for the pod // * Call the container runtime's SyncPod callback // * Update the traffic shaping for the pod's ingress and egress limits // // If any step if this workflow errors, the error is returned, and is repeated // on the next syncPod call. func (kl *Kubelet) syncPod(o syncPodOptions) error { // pull out the required options pod := o.pod mirrorPod := o.mirrorPod podStatus := o.podStatus updateType := o.updateType // if we want to kill a pod, do it now! if updateType == kubetypes.SyncPodKill { killPodOptions := o.killPodOptions if killPodOptions == nil || killPodOptions.PodStatusFunc == nil { return fmt.Errorf("kill pod options are required if update type is kill") } apiPodStatus := killPodOptions.PodStatusFunc(pod, podStatus) kl.statusManager.SetPodStatus(pod, apiPodStatus) // we kill the pod with the specified grace period since this is a termination if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil { // there was an error killing the pod, so we return that error directly utilruntime.HandleError(err) return err } return nil } // Latency measurements for the main workflow are relative to the // (first time the pod was seen by the API server. var firstSeenTime time.Time if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok { firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get() } // Record pod worker start latency if being created // TODO: make pod workers record their own latencies if updateType == kubetypes.SyncPodCreate { if !firstSeenTime.IsZero() { // This is the first time we are syncing the pod. Record the latency // since kubelet first saw the pod if firstSeenTime is set. metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime)) } else { glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID) } } // Generate final API pod status with pod and status manager status apiPodStatus := kl.generateAPIPodStatus(pod, podStatus) // The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576) // TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and // set pod IP to hostIP directly in runtime.GetPodStatus podStatus.IP = apiPodStatus.PodIP // Record the time it takes for the pod to become running. existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID) if !ok || existingStatus.Phase == api.PodPending && apiPodStatus.Phase == api.PodRunning && !firstSeenTime.IsZero() { metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime)) } // Update status in the status manager kl.statusManager.SetPodStatus(pod, apiPodStatus) // Kill pod if it should not be running if errOuter := canRunPod(pod); errOuter != nil || pod.DeletionTimestamp != nil || apiPodStatus.Phase == api.PodFailed { if errInner := kl.killPod(pod, nil, podStatus, nil); errInner != nil { errOuter = fmt.Errorf("error killing pod: %v", errInner) utilruntime.HandleError(errOuter) } // there was no error killing the pod, but the pod cannot be run, so we return that err (if any) return errOuter } // Create Mirror Pod for Static Pod if it doesn't already exist if kubepod.IsStaticPod(pod) { podFullName := kubecontainer.GetPodFullName(pod) deleted := false if mirrorPod != nil { if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) { // The mirror pod is semantically different from the static pod. Remove // it. The mirror pod will get recreated later. glog.Errorf("Deleting mirror pod %q because it is outdated", format.Pod(mirrorPod)) if err := kl.podManager.DeleteMirrorPod(podFullName); err != nil { glog.Errorf("Failed deleting mirror pod %q: %v", format.Pod(mirrorPod), err) } else { deleted = true } } } if mirrorPod == nil || deleted { glog.V(3).Infof("Creating a mirror pod for static pod %q", format.Pod(pod)) if err := kl.podManager.CreateMirrorPod(pod); err != nil { glog.Errorf("Failed creating a mirror pod for %q: %v", format.Pod(pod), err) } } } // Make data directories for the pod if err := kl.makePodDataDirs(pod); err != nil { glog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err) return err } // Wait for volumes to attach/mount defaultedPod, _, err := kl.defaultPodLimitsForDownwardApi(pod, nil) if err != nil { return err } if err := kl.volumeManager.WaitForAttachAndMount(defaultedPod); err != nil { kl.recorder.Eventf(pod, api.EventTypeWarning, kubecontainer.FailedMountVolume, "Unable to mount volumes for pod %q: %v", format.Pod(pod), err) glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err) return err } // Fetch the pull secrets for the pod pullSecrets, err := kl.getPullSecretsForPod(pod) if err != nil { glog.Errorf("Unable to get pull secrets for pod %q: %v", format.Pod(pod), err) return err } // Call the container runtime's SyncPod callback result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff) kl.reasonCache.Update(pod.UID, result) if err = result.Error(); err != nil { return err } // early successful exit if pod is not bandwidth-constrained if !kl.shapingEnabled() { return nil } // Update the traffic shaping for the pod's ingress and egress limits ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations) if err != nil { return err } if egress != nil || ingress != nil { if podUsesHostNetwork(pod) { kl.recorder.Event(pod, api.EventTypeWarning, kubecontainer.HostNetworkNotSupported, "Bandwidth shaping is not currently supported on the host network") } else if kl.shaper != nil { if len(apiPodStatus.PodIP) > 0 { err = kl.shaper.ReconcileCIDR(fmt.Sprintf("%s/32", apiPodStatus.PodIP), egress, ingress) } } else { kl.recorder.Event(pod, api.EventTypeWarning, kubecontainer.UndefinedShaper, "Pod requests bandwidth shaping, but the shaper is undefined") } } return nil }
step1: generateAPIPodStatus处理
step2: GetPodStatus and SetPodStatus
step3: canRunPod判断
step4: IsStaticPod处理
step5: makePodDataDirs处理
step6:// Wait for volumes to attach/mount
defaultPodLimitsForDownwardApi
WaitForAttachAndMount
step7:// Fetch the pull secrets for the pod
getPullSecretsForPod
step8:kl.containerRuntime.SyncPod
kl.reasonCache.Update(pod.UID, result)
step9:kl.shapingEnabled()
step10:// Update the traffic shaping for the pod's ingress and egress limits
bandwidth.ExtractPodBandwidthResources
更多推荐
所有评论(0)