上一篇blog讲解到到从apiserver获取需要更新的pod的信息,然后传入syncLoopIteration。那么需要变化的pod不是只有上面上面三个,因为本地的pod状态可能发生变化,譬如本地的容器退出,那么怎么去让本地退出的容器重新启动呢?答案在于另一个输入源码plegch这个channel中。
下面看看syncLoopIteration这几个输入源

func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {}

configCh就是上一篇介绍的update ;syncCh 和housekeepingCh是两个定时任务,分别是每隔一秒和两秒运行一次,plegCh是本地就是同步本地变化,如果本地真实pod状态和本地pod缓存的状态不一致,则会把变化发到这个管道里面。pkg/kubelet/pleg/generic.go,
先介绍一个pod记录的结构体

type podRecord struct {
    old     *kubecontainer.Pod
    current *kubecontainer.Pod
}

很简单,就是记录当前pod状态和之前记录状态的两个属性。

func (g *GenericPLEG) relist() {
//获取本地pod真实状态
podList, err := g.runtime.GetPods(true)
//设置本地新状态
pods := kubecontainer.Pods(podList)
g.podRecords.setCurrent(pods)
//和老状态比较,通过computeEvents产生event,如果新老状态一致则返回nil,不一致,如果新状态是running,这返回容器启动状态,如果新状态是exit,则返回容器died,详细的看generateEvents这个方法。这个event就是上面plegch里面的内容。
for _, container := range allContainers {
            events := computeEvents(oldPod, pod, &container.ID)
            for _, e := range events {
                updateEvents(eventsByPodID, e)
            }
        }
}
//更新本地缓存
g.updateCache(pod, pid)
g.podRecords.update(pid)

syncLoopIteration这个事件的来源说清楚了。具体看看syncLoopIteration是怎样运行的

func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
    select {
    case u, open := <-configCh:
        // Update from a config source; dispatch it to the right handler
        // callback.
        if !open {
            glog.Errorf("Update channel is closed. Exiting the sync loop.")
            return false
        }

        switch u.Op {
        case kubetypes.ADD:
            glog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))

            handler.HandlePodAdditions(u.Pods)
        case kubetypes.UPDATE:
            glog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletiontimestamps(u.Pods))
            handler.HandlePodUpdates(u.Pods)
        case kubetypes.REMOVE:
            glog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
            handler.HandlePodRemoves(u.Pods)
        case kubetypes.RECONCILE:
            glog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
            handler.HandlePodReconcile(u.Pods)
        case kubetypes.DELETE:
            glog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
            // DELETE is treated as a UPDATE because of graceful deletion.
            handler.HandlePodUpdates(u.Pods)
            ....

syncLoopIteration本质上就是根据不同的事件类型去执行不同的方法。已创建pod为例,handler.HandlePodAdditions(u.Pods),代码任然在pkg/kubelet/kubelet.go

func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
    start := kl.clock.Now()
    sort.Sort(sliceutils.PodsByCreationTime(pods))
    for _, pod := range pods {
        existingPods := kl.podManager.GetPods()

        kl.podManager.AddPod(pod)
//如果是mirror pod则直接跳过
        if kubepod.IsMirrorPod(pod) {
            kl.handleMirrorPod(pod, start)
            continue
        }

        ...
        mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
        kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
        kl.probeManager.AddPod(pod)
    }
}

上面的dispatchWork负责分发任务,具体看下图
这里写图片描述
通过分发,最终调用,kubelet.go里面的syncPod方法,

func (kl *Kubelet) syncPod(o syncPodOptions) error {
...
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)

kl.statusManager.SetPodStatus(pod, apiPodStatus)

pcm := kl.containerManager.NewPodContainerManager()

if kubepod.IsStaticPod(pod) ...

kl.makePodDataDirs(pod)

kl.volumeManager.WaitForAttachAndMount(pod)

kl.getPullSecretsForPod(pod)

kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
}

最终还是通过运行时SyncPod去启动容器,具体代码在pkg/kubelet/kuberuntime/kuberuntime_manager.go里面,

func (m *kubeGenericRuntimeManager) SyncPod

通过六个步骤

//  1. Compute sandbox and container changes.
//  2. Kill pod sandbox if necessary.
//  3. Kill any containers that should not be running.
//  4. Create sandbox if necessary.
//  5. Create init containers.
//  6. Create normal containers.

到了第六步才是启动业务容器的代码pkg/kubelet/kuberuntime/kuberuntime_container.go

func (m *kubeGenericRuntimeManager) startContainer

它也是分为一下几个步骤

// * pull the image
// * create the container
// * start the container
// * run the post start lifecycle hooks (if applicable)

至此,容器启动的代码已经介绍完毕,但还有很多细节,譬如存储挂载网络分配等细节,后面再慢慢阐述!

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐