如果大家对kubernetes组件以及架构相关分析,可以看我之前的源码阅读,今天只从一个函数分析容器的启动过程,这个函数就是SyncPod,这个是创建kubelet里面最核心的一个函数了。这个方法分为五步:

//  1. Compute sandbox and container changes.
//  2. Kill pod sandbox if necessary.
//  3. Kill any containers that should not be running.
//  4. Create sandbox if necessary.
//  5. Create init containers.
//  6. Create normal containers.

下面就逐一讲解:

1.计算容器的变化

这部分代码主要完成容器状态的变化统计,就是确定哪些容器要创建,哪些容器要删除。

    // Step 1: Compute sandbox and container changes.
    podContainerChanges := m.computePodContainerChanges(pod, podStatus)
    glog.V(3).Infof("computePodContainerChanges got %+v for pod %q", podContainerChanges, format.Pod(pod))
    if podContainerChanges.CreateSandbox {
        ref, err := ref.GetReference(api.Scheme, pod)
        if err != nil {
            glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)
        }
        if podContainerChanges.SandboxID != "" {
            m.recorder.Eventf(ref, v1.EventTypeNormal, "SandboxChanged", "Pod sandbox changed, it will be killed and re-created.")
        } else {
            glog.V(4).Infof("SyncPod received new pod %q, will create a new sandbox for it", format.Pod(pod))
        }
    }

具体实现看m.computePodContainerChanges,如下:

func (m *kubeGenericRuntimeManager) computePodContainerChanges(pod *v1.Pod, podStatus *kubecontainer.PodStatus) podContainerSpecChanges {
    glog.V(5).Infof("Syncing Pod %q: %+v", format.Pod(pod), pod)

    sandboxChanged, attempt, sandboxID := m.podSandboxChanged(pod, podStatus)
    changes := podContainerSpecChanges{
        CreateSandbox:        sandboxChanged,
        SandboxID:            sandboxID,
        Attempt:              attempt,
        ContainersToStart:    make(map[int]string),
        ContainersToKeep:     make(map[kubecontainer.ContainerID]int),
        InitContainersToKeep: make(map[kubecontainer.ContainerID]int),
        ContainersToKill:     make(map[kubecontainer.ContainerID]containerToKillInfo),
    }

    // check the status of init containers.
    initFailed := false
    // always reset the init containers if the sandbox is changed.
    if !sandboxChanged {
        // Keep all successfully completed containers. If there are failing containers,
        // only keep the first failing one.
        initFailed = checkAndKeepInitContainers(pod, podStatus, changes.InitContainersToKeep)
    }
    changes.InitFailed = initFailed

    // check the status of containers.
    for index, container := range pod.Spec.Containers {
        containerStatus := podStatus.FindContainerStatusByName(container.Name)
        if containerStatus == nil || containerStatus.State != kubecontainer.ContainerStateRunning {
            if kubecontainer.ShouldContainerBeRestarted(&container, pod, podStatus) {
                message := fmt.Sprintf("Container %+v is dead, but RestartPolicy says that we should restart it.", container)
                glog.Info(message)
                changes.ContainersToStart[index] = message
            }
            continue
        }
        if sandboxChanged {
            if pod.Spec.RestartPolicy != v1.RestartPolicyNever {
                message := fmt.Sprintf("Container %+v's pod sandbox is dead, the container will be recreated.", container)
                glog.Info(message)
                changes.ContainersToStart[index] = message
            }
            continue
        }

        if initFailed {
            // Initialization failed and Container exists.
            // If we have an initialization failure everything will be killed anyway.
            // If RestartPolicy is Always or OnFailure we restart containers that were running before.
            if pod.Spec.RestartPolicy != v1.RestartPolicyNever {
                message := fmt.Sprintf("Failed to initialize pod. %q will be restarted.", container.Name)
                glog.V(1).Info(message)
                changes.ContainersToStart[index] = message
            }
            continue
        }

        expectedHash := kubecontainer.HashContainer(&container)
        containerChanged := containerStatus.Hash != expectedHash
        if containerChanged {
            message := fmt.Sprintf("Pod %q container %q hash changed (%d vs %d), it will be killed and re-created.",
                pod.Name, container.Name, containerStatus.Hash, expectedHash)
            glog.Info(message)
            changes.ContainersToStart[index] = message
            continue
        }

        liveness, found := m.livenessManager.Get(containerStatus.ID)
        if !found || liveness == proberesults.Success {
            changes.ContainersToKeep[containerStatus.ID] = index
            continue
        }
        if pod.Spec.RestartPolicy != v1.RestartPolicyNever {
            message := fmt.Sprintf("pod %q container %q is unhealthy, it will be killed and re-created.", format.Pod(pod), container.Name)
            glog.Info(message)
            changes.ContainersToStart[index] = message
        }
    }

    // Don't keep init containers if they are the only containers to keep.
    if !sandboxChanged && len(changes.ContainersToStart) == 0 && len(changes.ContainersToKeep) == 0 {
        changes.InitContainersToKeep = make(map[kubecontainer.ContainerID]int)
    }

    // compute containers to be killed
    runningContainerStatuses := podStatus.GetRunningContainerStatuses()
    for _, containerStatus := range runningContainerStatuses {
        _, keep := changes.ContainersToKeep[containerStatus.ID]
        _, keepInit := changes.InitContainersToKeep[containerStatus.ID]
        if !keep && !keepInit {
            var podContainer *v1.Container
            var killMessage string
            for i, c := range pod.Spec.Containers {
                if c.Name == containerStatus.Name {
                    podContainer = &pod.Spec.Containers[i]
                    killMessage = changes.ContainersToStart[i]
                    break
                }
            }

            changes.ContainersToKill[containerStatus.ID] = containerToKillInfo{
                name:      containerStatus.Name,
                container: podContainer,
                message:   killMessage,
            }
        }
    }

    return changes
}

计算出哪些容器要启动放到ContainersToStart这个管道里面,哪些容器要删除放到ContainersToKill这个管道里面。

2.删除sandbox变化的pod

这个主要是当沙箱变化的时候,需要重建pod,譬如切换了pause镜像,就会触发个操作

    // Step 2: Kill the pod if the sandbox has changed.
    if podContainerChanges.CreateSandbox || (len(podContainerChanges.ContainersToKeep) == 0 && len(podContainerChanges.ContainersToStart) == 0) {
        if len(podContainerChanges.ContainersToKeep) == 0 && len(podContainerChanges.ContainersToStart) == 0 {
            glog.V(4).Infof("Stopping PodSandbox for %q because all other containers are dead.", format.Pod(pod))
        } else {
            glog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod))
        }

        killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
        result.AddPodSyncResult(killResult)
        if killResult.Error() != nil {
            glog.Errorf("killPodWithSyncResult failed: %v", killResult.Error())
            return
        }
    } 

3. 删除不需要运行的容器

当kubernetes执行删除pod的操作时候,有些pods是没必要存在的。

// Step 3: kill any running containers in this pod which are not to keep.
        for containerID, containerInfo := range podContainerChanges.ContainersToKill {
            glog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod))
            killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
            result.AddSyncResult(killContainerResult)
            if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {
                killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
                glog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err)
                return
            }
        }

下面是调用具体的容器运行时,去删除

func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, reason string, gracePeriodOverride *int64) error {
    var containerSpec *v1.Container
    if pod != nil {
        containerSpec = kubecontainer.GetContainerSpec(pod, containerName)
    } else {
        // Restore necessary information if one of the specs is nil.
        restoredPod, restoredContainer, err := m.restoreSpecsFromContainerLabels(containerID)
        if err != nil {
            return err
        }
        pod, containerSpec = restoredPod, restoredContainer
    }
    // From this point , pod and container must be non-nil.
    gracePeriod := int64(minimumGracePeriodInSeconds)
    switch {
    case pod.DeletionGracePeriodSeconds != nil:
        gracePeriod = *pod.DeletionGracePeriodSeconds
    case pod.Spec.TerminationGracePeriodSeconds != nil:
        gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
    }

    glog.V(2).Infof("Killing container %q with %d second grace period", containerID.String(), gracePeriod)

    // Run the pre-stop lifecycle hooks if applicable.
    if containerSpec.Lifecycle != nil && containerSpec.Lifecycle.PreStop != nil {
        gracePeriod = gracePeriod - m.executePreStopHook(pod, containerID, containerSpec, gracePeriod)
    }
    // always give containers a minimal shutdown window to avoid unnecessary SIGKILLs
    if gracePeriod < minimumGracePeriodInSeconds {
        gracePeriod = minimumGracePeriodInSeconds
    }
    if gracePeriodOverride != nil {
        gracePeriod = *gracePeriodOverride
        glog.V(3).Infof("Killing container %q, but using %d second grace period override", containerID, gracePeriod)
    }

    err := m.runtimeService.StopContainer(containerID.ID, gracePeriod)
    if err != nil {
        glog.Errorf("Container %q termination failed with gracePeriod %d: %v", containerID.String(), gracePeriod, err)
    } else {
        glog.V(3).Infof("Container %q exited normally", containerID.String())
    }

    message := fmt.Sprintf("Killing container with id %s", containerID.String())
    if reason != "" {
        message = fmt.Sprint(message, ":", reason)
    }
    m.generateContainerEvent(containerID, v1.EventTypeNormal, events.KillingContainer, message)
    m.containerRefManager.ClearRef(containerID)

    return err

m.runtimeService.StopContainer带着优雅关闭时间的停止container,每种容器都是实现StopContainer这个接口,这个在kubelet启动的时候初始化的,之前代码已经分析过了,不再赘述。

4.创建沙箱

其实kubelet之所以引入沙箱,是想建立一个容器标准,这里可以简单理解成那个pause容器。所有的网络都是挂在这个基础容器里面。

    // Step 4: Create a sandbox for the pod if necessary.
    podSandboxID := podContainerChanges.SandboxID
    if podContainerChanges.CreateSandbox && len(podContainerChanges.ContainersToStart) > 0 {
        var msg string
        var err error

        glog.V(4).Infof("Creating sandbox for pod %q", format.Pod(pod))
        createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
        result.AddSyncResult(createSandboxResult)
        podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
        if err != nil {
            createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg)
            glog.Errorf("createPodSandbox for pod %q failed: %v", format.Pod(pod), err)
            return
        }
        glog.V(4).Infof("Created PodSandbox %q for pod %q", podSandboxID, format.Pod(pod))

        podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
        if err != nil {
            glog.Errorf("Failed to get pod sandbox status: %v; Skipping pod %q", err, format.Pod(pod))
            result.Fail(err)
            return
        }

        // If we ever allow updating a pod from non-host-network to
        // host-network, we may use a stale IP.
        if !kubecontainer.IsHostNetworkPod(pod) {
            // Overwrite the podIP passed in the pod status, since we just started the pod sandbox.
            podIP = m.determinePodSandboxIP(pod.Namespace, pod.Name, podSandboxStatus)
            glog.V(4).Infof("Determined the ip %q for pod %q after sandbox changed", podIP, format.Pod(pod))
        }
    }

和上面删除的方法一样,调用相应的容器运行时去完成操作

func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {
    podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)
    if err != nil {
        message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
        glog.Error(message)
        return "", message, err
    }

    // Create pod logs directory
    err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
    if err != nil {
        message := fmt.Sprintf("Create pod log directory for pod %q failed: %v", format.Pod(pod), err)
        glog.Errorf(message)
        return "", message, err
    }

    podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig)
    if err != nil {
        message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)
        glog.Error(message)
        return "", message, err
    }

    return podSandBoxID, "", nil
}

如果是docker就在pkg/kubelet/dockershim/docker_sandbox.go这个里面RunPodSandbox实现
无非是先拉镜像然后启动容器,只有一个cni的部分需要注意,这个在以后的代码分析中会介绍。言归正传,创建完基础容器后

5.启动init容器

init容器是为业务容器做初始化工作的,譬如可以预先从网络上面加载一些动态资源

// Step 5: start init containers.
    status, next, done := findNextInitContainerToRun(pod, podStatus)
    if status != nil && status.ExitCode != 0 {
        // container initialization has failed, flag the pod as failed
        initContainerResult := kubecontainer.NewSyncResult(kubecontainer.InitContainer, status.Name)
        initContainerResult.Fail(kubecontainer.ErrRunInitContainer, fmt.Sprintf("init container %q exited with %d", status.Name, status.ExitCode))
        result.AddSyncResult(initContainerResult)
        if pod.Spec.RestartPolicy == v1.RestartPolicyNever {
            utilruntime.HandleError(fmt.Errorf("error running pod %q init container %q, restart=Never: %#v", format.Pod(pod), status.Name, status))
            return
        }
        utilruntime.HandleError(fmt.Errorf("Error running pod %q init container %q, restarting: %#v", format.Pod(pod), status.Name, status))
    }
    if next != nil {
        if len(podContainerChanges.ContainersToStart) == 0 {
            glog.V(4).Infof("No containers to start, stopping at init container %+v in pod %v", next.Name, format.Pod(pod))
            return
        }

        // If we need to start the next container, do so now then exit
        container := next
        startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
        result.AddSyncResult(startContainerResult)

        isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)
        if isInBackOff {
            startContainerResult.Fail(err, msg)
            glog.V(4).Infof("Backing Off restarting init container %+v in pod %v", container, format.Pod(pod))
            return
        }

        glog.V(4).Infof("Creating init container %+v in pod %v", container, format.Pod(pod))
        if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil {
            startContainerResult.Fail(err, msg)
            utilruntime.HandleError(fmt.Errorf("init container start failed: %v: %s", err, msg))
            return
        }

        // Successfully started the container; clear the entry in the failure
        glog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))
        return
    }
    if !done {
        // init container still running
        glog.V(4).Infof("An init container is still running in pod %v", format.Pod(pod))
        return
    }
    if podContainerChanges.InitFailed {
        glog.V(4).Infof("Not all init containers have succeeded for pod %v", format.Pod(pod))
        return
    }

方法通过m.startContainer启动initcontainer。

6.启动业务容器

到这一步才是启动客户自己定义的容器。

// Step 6: start containers in podContainerChanges.ContainersToStart.
    for idx := range podContainerChanges.ContainersToStart {
        container := &pod.Spec.Containers[idx]
        startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
        result.AddSyncResult(startContainerResult)

        isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)
        if isInBackOff {
            startContainerResult.Fail(err, msg)
            glog.V(4).Infof("Backing Off restarting container %+v in pod %v", container, format.Pod(pod))
            continue
        }

        glog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod))
        if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil {
            startContainerResult.Fail(err, msg)
            utilruntime.HandleError(fmt.Errorf("container start failed: %v: %s", err, msg))
            continue
        }
    }

就是通过读取podContainerChanges.ContainersToStart管道里面,需要启动的容器,然后for循环逐一创建这个pod里面的container。
整个pod同步的代码说完了,总计6个步骤。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐