kubelet

前言

本文没有去列出细节逻辑实现,只列出主干逻辑,代码中有注解可以简单阅读以下,k8s源码大多方法以interface层层包装的形式调用,一个interface会有较多实现(interface多态),代码中的interface的具体实现可以参考《intrface实现分析》,后续会就一处调用进行详细分析

如何debug

我是利用dlv工具远程调试的,远端搭建了一个3master、3node的k8s集群,停止了一个vm的kubelet,在vm上用以下命令启动kubelet源码,进行调试。至于其他组件调试,也可以通过这种方式。
需要注意:启动命令中的参数,多去少补(通过观察日志)
dlv启动命令dlv debug --headless --listen ":2345" --log --api-version 2 -- --runtime-cgroups=/systemd/system.slice --kubelet-cgroups=/systemd/system.slice --kubeconfig=/etc/kubernetes/kubelet.conf --pod-infra-container-image=xxx/pause:3.1 --config=/var/lib/kubelet/config.yaml --cgroup-driver=cgroupfs --network-plugin=cni

Kubelet服务启动流程

 

 
9371663-0dd9cbd30c54a73d.png
image.png


kubelet服务入口

 

cmd/kubelet/kubelet.go,主要负责校验参数,创建和 api-server 交互的 client 及对运行 kubelet 权限检测,启动 Kubelet 等等

func main() {
    rand.Seed(time.Now().UnixNano())

    command := app.NewKubeletCommand(server.SetupSignalHandler())
    logs.InitLogs()
    defer logs.FlushLogs()
    ...
}

具体实现cmd/kubelet/app/server.go

func NewKubeletCommand(stopCh <-chan struct{}) *cobra.Command {
    cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
    cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
    //设置默认的KubeletFlags的值,包括docker,证书路径,插件目录,包括CIDR等等信息
    kubeletFlags := options.NewKubeletFlags()
    //生成kubelet默认配置文件
    kubeletConfig, err := options.NewKubeletConfiguration()
    // programmer error
    if err != nil {
        klog.Fatal(err)
    }

    ...
        Run: func(cmd *cobra.Command, args []string) {

            // use dynamic kubelet config, if enabled
            var kubeletConfigController *dynamickubeletconfig.Controller
            if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
                var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration
                dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,
                    func(kc *kubeletconfiginternal.KubeletConfiguration) error {
                        // Here, we enforce flag precedence inside the controller, prior to the controller's validation sequence,
                        // so that we get a complete validation at the same point where we can decide to reject dynamic config.
                        // This fixes the flag-precedence component of issue #63305.
                        // See issue #56171 for general details on flag precedence.
                        return kubeletConfigFlagPrecedence(kc, args)
                    })
                if err != nil {
                    klog.Fatal(err)
                }
                // If we should just use our existing, local config, the controller will return a nil config
                if dynamicKubeletConfig != nil {
                    kubeletConfig = dynamicKubeletConfig
                    // Note: flag precedence was already enforced in the controller, prior to validation,
                    // by our above transform function. Now we simply update feature gates from the new config.
                    if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
                        klog.Fatal(err)
                    }
                }
            }

            // construct a KubeletServer from kubeletFlags and kubeletConfig
            kubeletServer := &options.KubeletServer{
                KubeletFlags:         *kubeletFlags,
                KubeletConfiguration: *kubeletConfig,
            }

            // use kubeletServer to construct the default KubeletDeps
            kubeletDeps, err := UnsecuredDependencies(kubeletServer)
            if err != nil {
                klog.Fatal(err)
            }

            // add the kubelet config controller to kubeletDeps
            kubeletDeps.KubeletConfigController = kubeletConfigController

            // start the experimental docker shim, if enabled
            if kubeletServer.KubeletFlags.ExperimentalDockershim {
                if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {
                    klog.Fatal(err)
                }
                return
            }

            // run the kubelet
            klog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
            if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil {
                klog.Fatal(err)
            }
        },
    }

...
}

func NewKubeletCommand中

// use dynamic kubelet config, if enabled
            var kubeletConfigController *dynamickubeletconfig.Controller
            if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
                var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration
                dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,
                    func(kc *kubeletconfiginternal.KubeletConfiguration) error {
                        // Here, we enforce flag precedence inside the controller, prior to the controller's validation sequence,
                        // so that we get a complete validation at the same point where we can decide to reject dynamic config.
                        // This fixes the flag-precedence component of issue #63305.
                        // See issue #56171 for general details on flag precedence.
                        return kubeletConfigFlagPrecedence(kc, args)
                    })
                if err != nil {
                    klog.Fatal(err)
                }
                // If we should just use our existing, local config, the controller will return a nil config
                if dynamicKubeletConfig != nil {
                    kubeletConfig = dynamicKubeletConfig
                    // Note: flag precedence was already enforced in the controller, prior to validation,
                    // by our above transform function. Now we simply update feature gates from the new config.
                    if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
                        klog.Fatal(err)
                    }
                }
            }

该功能主要是新建一个watch的功能,主要是用来watch kubelet的配置文件是否改变,如果已经改变,那么就重新load kubelet的配置文件 用的是kubernetes常用到的Controller,也就是Informer的架构,watch ConfigMap对象,进一步查看BootstrapKubeletConfigController

// BootstrapKubeletConfigController constructs and bootstrap a configuration controller
func BootstrapKubeletConfigController(dynamicConfigDir string, transform dynamickubeletconfig.TransformFunc) (*kubeletconfiginternal.KubeletConfiguration, *dynamickubeletconfig.Controller, error) {
    if !utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) {
        return nil, nil, fmt.Errorf("failed to bootstrap Kubelet config controller, you must enable the DynamicKubeletConfig feature gate")
    }
    if len(dynamicConfigDir) == 0 {
        return nil, nil, fmt.Errorf("cannot bootstrap Kubelet config controller, --dynamic-config-dir was not provided")
    }

    // compute absolute path and bootstrap controller
    dir, err := filepath.Abs(dynamicConfigDir)
    if err != nil {
        return nil, nil, fmt.Errorf("failed to get absolute path for --dynamic-config-dir=%s", dynamicConfigDir)
    }
    // get the latest KubeletConfiguration checkpoint from disk, or return the default config if no valid checkpoints exist
    c := dynamickubeletconfig.NewController(dir, transform)
    kc, err := c.Bootstrap()
    if err != nil {
        return nil, nil, fmt.Errorf("failed to determine a valid configuration, error: %v", err)
    }
    return kc, c, nil
}

-------------------------
// /pkg/kubelet/kubeletconfig/controller.go
// Bootstrap attempts to return a valid KubeletConfiguration based on the configuration of the Controller,
// or returns an error if no valid configuration could be produced. Bootstrap should be called synchronously before StartSync.
// If the pre-existing local configuration should be used, Bootstrap returns a nil config.
func (cc *Controller) Bootstrap() (*kubeletconfig.KubeletConfiguration, error)
graph LR
NewKubeletCommand-->RUN
RUN-->run

这个函数主要是用来启动各种以来的服务以及kubelet的监听端口

func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) error {
    // To help debugging, immediately log version
    klog.Infof("Version: %+v", version.Get())
    if err := initForOS(s.KubeletFlags.WindowsService); err != nil {
        return fmt.Errorf("failed OS init: %v", err)
    }
    //主要启动函数
    if err := run(s, kubeDeps, stopCh); err != nil {
        return fmt.Errorf("failed to run Kubelet: %v", err)
    }
    return nil
}
func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) {
    ...
    //启动前参数准备完备,进入启动流程 
    if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
        return err
    }
}
graph LR
准备完成-->RunKubelet
RunKubelet-->CreateAndInitKubelet
RunKubelet-->startKubelet

在 RunKubelet 中主要做 CreateAndInitKubelet 和 startKubelet 两件事:

func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
    ...
    k, err := createAndInitKubelet(...)
    ...
    // process pods and exit.
    if runOnce {
        if _, err := k.RunOnce(podCfg.Updates()); err != nil {
            return fmt.Errorf("runonce failed: %v", err)
        }
        klog.Info("Started kubelet as runonce")
    } else {
        startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
        klog.Info("Started kubelet")
    }
    return nil
}

createAndInitKubelet

func createAndInitKubelet(...){
    ...
    k, err = kubelet.NewMainKubelet(...)
    if err != nil {
        return nil, err
    }
    k.BirthCry()

    k.StartGarbageCollection()
}

NewMainKubelet 实例化一个 kubelet 对象,并对 kubelet 内部各个 component 进行初始化工作:

  • containerGC // 容器的垃圾回收
  • statusManager // pod 状态的管理
  • imageManager // 镜像的管路
  • probeManager // 容器健康检测
  • gpuManager // GPU 的支持
  • PodCache // Pod 缓存的管理
  • secretManager // secret 资源的管理
  • configMapManager // configMap 资源的管理
  • InitNetworkPlugin // 网络插件的初始化
  • PodManager // 对 pod 的管理, e.g., CRUD
  • makePodSourceConfig // pod 元数据的来源 (FILE, URL, api-server)
  • diskSpaceManager // 磁盘空间的管理
  • ContainerRuntime // 容器运行时的选择(docker 或 rkt)
  • BirthCry // 通知 api-server 服务 kubelet 启动
  • StartGarbageCollection // 开启垃圾回收服务

startKubelet

func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
    // start the kubelet
    go wait.Until(func() {
        k.Run(podCfg.Updates())
    }, 0, wait.NeverStop)

    // start the kubelet server
    //获取 pod 及 node 的相关信息,后续会更新到etcd
    if enableServer {
        go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)

    }
    if kubeCfg.ReadOnlyPort > 0 {
        go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
    }
    if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
        go k.ListenAndServePodResources()
    }
}
graph LR
startKubelet-->Run

pkg/kubelet/kubelet.go

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {   //启动日志服务
    if kl.logServer == nil {
        kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
    }
    
    if kl.kubeClient == nil {
        klog.Warning("No api server defined - no node status update will be sent.")
    }

    // Start the cloud provider sync manager
    if kl.cloudResourceSyncManager != nil {
        go kl.cloudResourceSyncManager.Run(wait.NeverStop)
    }
    
    //初始化模块,包括volume 数据目录 容器日志
    //启动镜像管理 启动证书管理 OOM管理
    //启动资源分析器

    // Start volume manager
    go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

        // Start syncing node status immediately, this may set up things the runtime needs to run.
        go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
        go kl.fastStatusUpdateOnce()
        // start syncing lease
            go kl.nodeLeaseController.Run(wait.NeverStop)
    go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

    // Start loop to sync iptables util rules
        go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)

    // Start a goroutine responsible for killing pods (that are not properly
    // handled by pod workers).
    go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)

    // Start component sync loops.
    kl.statusManager.Start() //状态管理
    kl.probeManager.Start() //探针管理

    // Start syncing RuntimeClasses if enabled.
        kl.runtimeClassManager.Start(wait.NeverStop)

    // Start the pod lifecycle event generator.
    kl.pleg.Start() //启动容器的生命周期
    kl.syncLoop(updates, kl) //循环同步
}

==至此kubelet启动完成==

graph LR
Run-->synLoop

syncLoop

syncLoop is the main loop for processing changes. It watches for changes from three channels (file, apiserver, and http) and creates a union of them. For any new change seen, will run a sync against desired state and running state. If no changes are seen to the configuration, will synchronize the last known desired state every sync-frequency seconds. Never returns.

func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
    // 准备工作
    for{
        time.Sleep(duration)
        kl.syncLoopIteration(...)
        ...
    }
}
graph LR
syncLoop-->syncLoopIteration

syncLoopIteration 接收来自多个方向的消息,run a sync against desired state and running state

func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
    select {
    case u, open := <-configCh:
    case e := <-plegCh:...
    case <-syncCh:...
    case update := <-kl.livenessManager.Updates():...
    case <-housekeepingCh:...
    }
    return true
}

syncLoopIteration reads from various channels and dispatches pods to the given handler. 以configCh 为例

switch u.Op {
case kubetypes.ADD:
    handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
    handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
    handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
    handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
    // DELETE is treated as a UPDATE because of graceful deletion.
    handler.HandlePodUpdates(u.Pods)
case kubetypes.RESTORE:
    // These are pods restored from the checkpoint. Treat them as new pods.
    handler.HandlePodAdditions(u.Pods)
}

最终的立足点还是 syncHandler(还是Kubelet 自己实现的),下面分析下 HandlePodAdditions

新建 pod开始

 

 
9371663-e54d8adbe4a7eb0e.png
image.png


代码中去掉了跟创建 无关的部分,删减了日志、错误校验等

 

//file:/pkg/kubelet/kubelet.go---2026

func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
    sort.Sort(sliceutils.PodsByCreationTime(pods))
    for _, pod := range pods {
        ...
        // Always add the pod to the pod manager. Kubelet relies on the pod manager as the source of truth for the desired state. If a pod does
        // not exist in the pod manager, it means that it has been deleted in the apiserver and no action (other than cleanup) is required.
        kl.podManager.AddPod(pod)
        ...
        mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
        kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
        kl.probeManager.AddPod(pod)
    }
}

kl.podManager.AddPodkl.probeManager.AddPod(pod) 都只是将pod 纳入跟踪,真正创建pod的是dispatchWork,然后又转回 kl.syncPod

//file:/pkg/kubelet/kubelet.go---1464

func (kl *Kubelet) syncPod(o syncPodOptions) error {
    ...
    // Generate final API pod status with pod and status manager status
    apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
    existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
    if runnable := kl.canRunPod(pod); !runnable.Admit {...}
    // Update status in the status manager
    kl.statusManager.SetPodStatus(pod, apiPodStatus)
    // Create Cgroups for the pod and apply resource parameters to them if cgroups-per-qos flag is enabled.
    pcm := kl.containerManager.NewPodContainerManager()
    // Make data directories for the pod
    kl.makePodDataDirs(pod);
    // Fetch the pull secrets for the pod
    pullSecrets := kl.getPullSecretsForPod(pod)
    // Call the container runtime's SyncPod callback
    result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
    ...
}

kubeGenericRuntimeManager.syncPod

//file:/pkg/kubelet/kuberuntime/kuberuntime_manager.go---618

func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
    // Step 1: Compute sandbox and container changes.
    podContainerChanges := m.computePodActions(pod, podStatus)
    ...
    // Step 4: Create a sandbox for the pod if necessary.
    podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)       
    // Get podSandboxConfig for containers to start.
    podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
    // Step 5: start the init container.
    if container := podContainerChanges.NextInitContainerToStart; container != nil {
        // Start the next init container.
        msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeInit); 
    }
    // Step 6: start containers in podContainerChanges.ContainersToStart.
    for _, idx := range podContainerChanges.ContainersToStart {
        container := &pod.Spec.Containers[idx]
        msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeRegular); 
    }
    ...
}

m.createPodSandbox 和 startContainer

pkg/kubelet/kuberuntime/包中,kuberuntime_manager.go 定义了 kubeGenericRuntimeManager struct 及其接口方法实现,但接口方法的内部依赖方法 分散在 package 下的其它go文件中。其本质是将 一个“类方法”分散在了多个go 文件中,多个文件合起来 组成了kubeGenericRuntimeManager 类实现。

这个方法的内容也非常多,它的主要逻辑是先比较传递过来的 pod 信息和实际运行的 pod(对于新建 pod 来说后者为空),计算出两者的差别,也就是需要更新的地方。然后先创建 infra 容器,配置好网络,然后再逐个创建应用容器。

func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, container *v1.Container, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, containerType kubecontainer.ContainerType) (string, error) {
    // Step 1: pull the image.
    imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets)
    // Step 2: create the container.
    ref, err := kubecontainer.GenerateContainerRef(pod, container)
    containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, containerType)
    containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
    err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
    // Step 3: start the container.
    err = m.runtimeService.StartContainer(containerID)
    // Step 4: execute the post start hook.
    msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
}
 
9371663-5b6a6412b15207e4.png
image.png
  • 默认sandbox image为 gcr.io/google_containers/pause-amd64:3.0
  • ensureImageExists 当镜像不存在是进行拉取工作
  • CreateContainer最终通过docker API POST方法调用 /containers/create
  • CreateCheckpoint写入文件,文件名为容器ID
  • StartContainer最终通过docker API POST方法调用 /containers/containerID/start
  • 重新写入resolv.conf由docker产生,pod里的容器共享
  • InspectContainer最终通过docker API GET方法调用 /containers/containerID/json
  • 为容器建立网络,通过CNI建立网络,建立loopback接口,建立网络设置为混杂模式(调用命令ip link show dev / ip set bridgeName promisc on)。

以下为debug,PodSandbox从create到start的过程,一直到请求发送docker结束,其他调用与本次调用相似,可以自行debug

func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {
    podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)
    if err != nil {
        message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
        klog.Error(message)
        return "", message, err
    }

    // Create pod logs directory
    err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
    if err != nil {
        message := fmt.Sprintf("Create pod log directory for pod %q failed: %v", format.Pod(pod), err)
        klog.Errorf(message)
        return "", message, err
    }

    runtimeHandler := ""
    if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) && m.runtimeClassManager != nil {
        runtimeHandler, err = m.runtimeClassManager.LookupRuntimeHandler(pod.Spec.RuntimeClassName)
        if err != nil {
            message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)
            return "", message, err
        }
        if runtimeHandler != "" {
            klog.V(2).Infof("Running pod %s with RuntimeHandler %q", format.Pod(pod), runtimeHandler)
        }
    }
//启动RunPodSandbox
    podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)
    if err != nil {
        message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)
        klog.Error(message)
        return "", message, err
    }

    return podSandBoxID, "", nil
}
/workspace/goWorkspace/src/k8s.io/kubernetes/pkg/kubelet/kuberuntime/instrumented_services.go

func (in instrumentedRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
    const operation = "run_podsandbox"
    startTime := time.Now()
    defer recordOperation(operation, startTime)
    defer metrics.RunPodSandboxDuration.WithLabelValues(runtimeHandler).Observe(metrics.SinceInSeconds(startTime))

    out, err := in.service.RunPodSandbox(config, runtimeHandler)
    recordError(operation, err)
    if err != nil {
        metrics.RunPodSandboxErrors.WithLabelValues(runtimeHandler).Inc()
    }
    return out, err
}
/workspace/goWorkspace/src/k8s.io/kubernetes/pkg/kubelet/remote/remote_runtime.go

func (r *RemoteRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
    // Use 2 times longer timeout for sandbox operation (4 mins by default)
    // TODO: Make the pod sandbox timeout configurable.
    ctx, cancel := getContextWithTimeout(r.timeout * 2)
    defer cancel()

    resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
        Config:         config,
        RuntimeHandler: runtimeHandler,
    })
    if err != nil {
        klog.Errorf("RunPodSandbox from runtime service failed: %v", err)
        return "", err
    }

    if resp.PodSandboxId == "" {
        errorMessage := fmt.Sprintf("PodSandboxId is not set for sandbox %q", config.GetMetadata())
        klog.Errorf("RunPodSandbox failed: %s", errorMessage)
        return "", errors.New(errorMessage)
    }

    return resp.PodSandboxId, nil
}

kubelet cri grpc-client实现

/workspace/goWorkspace/src/k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2/api.pb.go

func (c *runtimeServiceClient) RunPodSandbox(ctx context.Context, in *RunPodSandboxRequest, opts ...grpc.CallOption) (*RunPodSandboxResponse, error) {
    out := new(RunPodSandboxResponse)
    err := grpc.Invoke(ctx, "/runtime.v1alpha2.RuntimeService/RunPodSandbox", in, out, c.cc, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

grpc-server实现在dockershim中,继续

/workspace/goWorkspace/src/k8s.io/kubernetes/pkg/kubelet/dockershim/docker_sandbox.go

func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) {
    config := r.GetConfig()

    // Step 1: Pull the image for the sandbox.
    image := defaultSandboxImage
    podSandboxImage := ds.podSandboxImage
    if len(podSandboxImage) != 0 {
        image = podSandboxImage
    }

    // NOTE: To use a custom sandbox image in a private repository, users need to configure the nodes with credentials properly.
    // see: http://kubernetes.io/docs/user-guide/images/#configuring-nodes-to-authenticate-to-a-private-repository
    // Only pull sandbox image when it's not present - v1.PullIfNotPresent.
    if err := ensureSandboxImageExists(ds.client, image); err != nil {
        return nil, err
    }

    // Step 2: Create the sandbox container.
    if r.GetRuntimeHandler() != "" {
        return nil, fmt.Errorf("RuntimeHandler %q not supported", r.GetRuntimeHandler())
    }
    createConfig, err := ds.makeSandboxDockerConfig(config, image)
    if err != nil {
        return nil, fmt.Errorf("failed to make sandbox docker config for pod %q: %v", config.Metadata.Name, err)
    }
    createResp, err := ds.client.CreateContainer(*createConfig)
    if err != nil {
        createResp, err = recoverFromCreationConflictIfNeeded(ds.client, *createConfig, err)
    }

    if err != nil || createResp == nil {
        return nil, fmt.Errorf("failed to create a sandbox for pod %q: %v", config.Metadata.Name, err)
    }
    resp := &runtimeapi.RunPodSandboxResponse{PodSandboxId: createResp.ID}

    ds.setNetworkReady(createResp.ID, false)
    defer func(e *error) {
        // Set networking ready depending on the error return of
        // the parent function
        if *e == nil {
            ds.setNetworkReady(createResp.ID, true)
        }
    }(&err)

    // Step 3: Create Sandbox Checkpoint.
    if err = ds.checkpointManager.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil {
        return nil, err
    }

    // Step 4: Start the sandbox container.
    // Assume kubelet's garbage collector would remove the sandbox later, if
    // startContainer failed.
    err = ds.client.StartContainer(createResp.ID)
    if err != nil {
        return nil, fmt.Errorf("failed to start sandbox container for pod %q: %v", config.Metadata.Name, err)
    }

    // Rewrite resolv.conf file generated by docker.
    // NOTE: cluster dns settings aren't passed anymore to docker api in all cases,
    // not only for pods with host network: the resolver conf will be overwritten
    // after sandbox creation to override docker's behaviour. This resolv.conf
    // file is shared by all containers of the same pod, and needs to be modified
    // only once per pod.
    if dnsConfig := config.GetDnsConfig(); dnsConfig != nil {
        containerInfo, err := ds.client.InspectContainer(createResp.ID)
        if err != nil {
            return nil, fmt.Errorf("failed to inspect sandbox container for pod %q: %v", config.Metadata.Name, err)
        }

        if err := rewriteResolvFile(containerInfo.ResolvConfPath, dnsConfig.Servers, dnsConfig.Searches, dnsConfig.Options); err != nil {
            return nil, fmt.Errorf("rewrite resolv.conf failed for pod %q: %v", config.Metadata.Name, err)
        }
    }

    // Do not invoke network plugins if in hostNetwork mode.
    if config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtimeapi.NamespaceMode_NODE {
        return resp, nil
    }

    // Step 5: Setup networking for the sandbox.
    // All pod networking is setup by a CNI plugin discovered at startup time.
    // This plugin assigns the pod ip, sets up routes inside the sandbox,
    // creates interfaces etc. In theory, its jurisdiction ends with pod
    // sandbox networking, but it might insert iptables rules or open ports
    // on the host as well, to satisfy parts of the pod spec that aren't
    // recognized by the CNI standard yet.
    cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID)
    networkOptions := make(map[string]string)
    if dnsConfig := config.GetDnsConfig(); dnsConfig != nil {
        // Build DNS options.
        dnsOption, err := json.Marshal(dnsConfig)
        if err != nil {
            return nil, fmt.Errorf("failed to marshal dns config for pod %q: %v", config.Metadata.Name, err)
        }
        networkOptions["dns"] = string(dnsOption)
    }
    err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations, networkOptions)
    if err != nil {
        errList := []error{fmt.Errorf("failed to set up sandbox container %q network for pod %q: %v", createResp.ID, config.Metadata.Name, err)}

        // Ensure network resources are cleaned up even if the plugin
        // succeeded but an error happened between that success and here.
        err = ds.network.TearDownPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID)
        if err != nil {
            errList = append(errList, fmt.Errorf("failed to clean up sandbox container %q network for pod %q: %v", createResp.ID, config.Metadata.Name, err))
        }

        err = ds.client.StopContainer(createResp.ID, defaultSandboxGracePeriod)
        if err != nil {
            errList = append(errList, fmt.Errorf("failed to stop sandbox container %q for pod %q: %v", createResp.ID, config.Metadata.Name, err))
        }

        return resp, utilerrors.NewAggregate(errList)
    }

    return resp, nil
}

/workspace/goWorkspace/src/k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker/instrumented_client.go

func (in instrumentedInterface) StartContainer(id string) error {
    const operation = "start_container"
    defer recordOperation(operation, time.Now())

    err := in.client.StartContainer(id)
    recordError(operation, err)
    return err
}
/workspace/goWorkspace/src/k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker/kube_docker_client.go

func (d *kubeDockerClient) StartContainer(id string) error {
    ctx, cancel := d.getTimeoutContext()
    defer cancel()
    err := d.client.ContainerStart(ctx, id, dockertypes.ContainerStartOptions{})
    if ctxErr := contextError(ctx); ctxErr != nil {
        return ctxErr
    }
    return err
}

grpc调用到docker至此结束

/workspace/goWorkspace/src/k8s.io/kubernetes/vendor/github.com/docker/docker/client/container_start.go

func (cli *Client) ContainerStart(ctx context.Context, containerID string, options types.ContainerStartOptions) error {
    query := url.Values{}
    if len(options.CheckpointID) != 0 {
        query.Set("checkpoint", options.CheckpointID)
    }
    if len(options.CheckpointDir) != 0 {
        query.Set("checkpoint-dir", options.CheckpointDir)
    }

    resp, err := cli.post(ctx, "/containers/"+containerID+"/start", query, nil, nil)
    ensureReaderClosed(resp)
    return err
}

参考资料:

http://qiankunli.github.io/2018/12/31/kubernetes_source_kubelet.html#%E6%96%B0%E5%BB%BA-pod
https://cizixs.com/2017/06/07/kubelet-source-code-analysis-part-2/
https://toutiao.io/posts/z2e88b/preview

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐