k8s源码分析--kubelet中docker-manager分析
源码为k8s v1.3.0稳定版本从pod管理SyncPod开始分析,kubelet中syncPod调用docker_manager的syncPod函数(1) SyncPod函数中的处理这是一个比较关键的函数,处理的流程也比较长。step1: computePodContainerChanges 检查pod的变化,返回的信息podContainerChangesSpec
源码为k8s v1.3.0稳定版本
从pod管理SyncPod开始分析,kubelet中syncPod调用docker_manager的syncPod函数
(1) SyncPod函数中的处理
这是一个比较关键的函数,处理的流程也比较长。
step1: computePodContainerChanges 检查pod的变化,返回的信息
podContainerChangesSpec{
StartInfraContainer: createPodInfraContainer,
InfraChanged: changed,
InfraContainerId: podInfraContainerID,
InitFailed: initFailed,
InitContainersToKeep: initContainersToKeep,
ContainersToStart: containersToStart,
ContainersToKeep: containersToKeep,
}
step2: 如果InfraChanged,发送一个event事件
dm.recorder.Eventf(pod, api.EventTypeNormal, "InfraChanged", "Pod infrastructure changed, it will be killed and re-created.")
step3: 如果判断满足下面的条件,并进行pod删除处理(具体流程还不清楚)
containerChanges.StartInfraContainer || (len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0
step4: pruneInitContainersBeforeStart:
step5:createPodInfraContainer,创建Infra容器
step6: dm.networkPlugin.SetUpPod
step7: InspectContainer获取infa的JSON
step8: HairpinMode处理
step9:determineContainerIP
step10: next, status, done := findActiveInitContainer(pod, podStatus)
step11: 遍历 idx := range containerChanges.ContainersToStart
调用 tryContainerStart创建对应的容器
// Sync the running pod to match the specified desired pod. func (dm *DockerManager) SyncPod(pod *api.Pod, _ api.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) { start := time.Now() defer func() { metrics.ContainerManagerLatency.WithLabelValues("SyncPod").Observe(metrics.SinceInMicroseconds(start)) }() g_docker_manager_count++ glog.Infof("DockerManager_syncPod is called, pod %q, g_docker_manager_count=%d", pod.UID,g_docker_manager_count) containerChanges, err := dm.computePodContainerChanges(pod, podStatus) if err != nil { result.Fail(err) return } glog.V(3).Infof("Got container changes for pod %q: %+v", format.Pod(pod), containerChanges) if containerChanges.InfraChanged { dm.recorder.Eventf(pod, api.EventTypeNormal, "InfraChanged", "Pod infrastructure changed, it will be killed and re-created.") } if containerChanges.StartInfraContainer || (len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0) { if len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0 { glog.V(4).Infof("Killing Infra Container for %q because all other containers are dead.", format.Pod(pod)) } else { glog.V(4).Infof("Killing Infra Container for %q, will start new one", format.Pod(pod)) } // Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container) // TODO(random-liu): We'll use pod status directly in the future killResult := dm.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(podStatus), nil) result.AddPodSyncResult(killResult) if killResult.Error() != nil { return } } else { // Otherwise kill any running containers in this pod which are not specified as ones to keep. runningContainerStatues := podStatus.GetRunningContainerStatuses() for _, containerStatus := range runningContainerStatues { _, keep := containerChanges.ContainersToKeep[kubecontainer.DockerID(containerStatus.ID.ID)] _, keepInit := containerChanges.InitContainersToKeep[kubecontainer.DockerID(containerStatus.ID.ID)] if !keep && !keepInit { glog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerStatus.Name, containerStatus.ID, format.Pod(pod)) // attempt to find the appropriate container policy var podContainer *api.Container var killMessage string for i, c := range pod.Spec.Containers { if c.Name == containerStatus.Name { podContainer = &pod.Spec.Containers[i] killMessage = containerChanges.ContainersToStart[i] break } } killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerStatus.Name) result.AddSyncResult(killContainerResult) if err := dm.KillContainerInPod(containerStatus.ID, podContainer, pod, killMessage, nil); err != nil { killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error()) glog.Errorf("Error killing container %q(id=%q) for pod %q: %v", containerStatus.Name, containerStatus.ID, format.Pod(pod), err) return } } } } // Keep terminated init containers fairly aggressively controlled dm.pruneInitContainersBeforeStart(pod, podStatus, containerChanges.InitContainersToKeep) // We pass the value of the podIP down to runContainerInPod, which in turn // passes it to various other functions, in order to facilitate // functionality that requires this value (hosts file and downward API) // and avoid races determining the pod IP in cases where a container // requires restart but the podIP isn't in the status manager yet. // // We default to the IP in the passed-in pod status, and overwrite it if the // infra container needs to be (re)started. podIP := "" if podStatus != nil { podIP = podStatus.IP } // If we should create infra container then we do it first. podInfraContainerID := containerChanges.InfraContainerId if containerChanges.StartInfraContainer && (len(containerChanges.ContainersToStart) > 0) { glog.V(4).Infof("Creating pod infra container for %q", format.Pod(pod)) startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, PodInfraContainerName) result.AddSyncResult(startContainerResult) var msg string podInfraContainerID, err, msg = dm.createPodInfraContainer(pod) if err != nil { startContainerResult.Fail(err, msg) glog.Errorf("Failed to create pod infra container: %v; Skipping pod %q: %s", err, format.Pod(pod), msg) return } setupNetworkResult := kubecontainer.NewSyncResult(kubecontainer.SetupNetwork, kubecontainer.GetPodFullName(pod)) result.AddSyncResult(setupNetworkResult) if !kubecontainer.IsHostNetworkPod(pod) { glog.V(3).Infof("Calling network plugin %s to setup pod for %s", dm.networkPlugin.Name(), format.Pod(pod)) err = dm.networkPlugin.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID.ContainerID()) if err != nil { // TODO: (random-liu) There shouldn't be "Skipping pod" in sync result message message := fmt.Sprintf("Failed to setup network for pod %q using network plugins %q: %v; Skipping pod", format.Pod(pod), dm.networkPlugin.Name(), err) setupNetworkResult.Fail(kubecontainer.ErrSetupNetwork, message) glog.Error(message) // Delete infra container killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, PodInfraContainerName) result.AddSyncResult(killContainerResult) if delErr := dm.KillContainerInPod(kubecontainer.ContainerID{ ID: string(podInfraContainerID), Type: "docker"}, nil, pod, message, nil); delErr != nil { killContainerResult.Fail(kubecontainer.ErrKillContainer, delErr.Error()) glog.Warningf("Clear infra container failed for pod %q: %v", format.Pod(pod), delErr) } return } // Setup the host interface unless the pod is on the host's network (FIXME: move to networkPlugin when ready) podInfraContainer, err := dm.client.InspectContainer(string(podInfraContainerID)) if err != nil { glog.Errorf("Failed to inspect pod infra container: %v; Skipping pod %q", err, format.Pod(pod)) result.Fail(err) return } if dm.configureHairpinMode { if err = hairpin.SetUpContainerPid(podInfraContainer.State.Pid, network.DefaultInterfaceName); err != nil { glog.Warningf("Hairpin setup failed for pod %q: %v", format.Pod(pod), err) } } // Overwrite the podIP passed in the pod status, since we just started the infra container. podIP = dm.determineContainerIP(pod.Name, pod.Namespace, podInfraContainer) } } next, status, done := findActiveInitContainer(pod, podStatus) if status != nil { if status.ExitCode != 0 { // container initialization has failed, flag the pod as failed initContainerResult := kubecontainer.NewSyncResult(kubecontainer.InitContainer, status.Name) initContainerResult.Fail(kubecontainer.ErrRunInitContainer, fmt.Sprintf("init container %q exited with %d", status.Name, status.ExitCode)) result.AddSyncResult(initContainerResult) if pod.Spec.RestartPolicy == api.RestartPolicyNever { utilruntime.HandleError(fmt.Errorf("error running pod %q init container %q, restart=Never: %+v", format.Pod(pod), status.Name, status)) return } utilruntime.HandleError(fmt.Errorf("Error running pod %q init container %q, restarting: %+v", format.Pod(pod), status.Name, status)) } } // Note: when configuring the pod's containers anything that can be configured by pointing // to the namespace of the infra container should use namespaceMode. This includes things like the net namespace // and IPC namespace. PID mode cannot point to another container right now. // See createPodInfraContainer for infra container setup. namespaceMode := fmt.Sprintf("container:%v", podInfraContainerID) pidMode := getPidMode(pod) if next != nil { if len(containerChanges.ContainersToStart) == 0 { glog.V(4).Infof("No containers to start, stopping at init container %+v in pod %v", next.Name, format.Pod(pod)) return } // If we need to start the next container, do so now then exit container := next startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name) result.AddSyncResult(startContainerResult) // containerChanges.StartInfraContainer causes the containers to be restarted for config reasons if !containerChanges.StartInfraContainer { isInBackOff, err, msg := dm.doBackOff(pod, container, podStatus, backOff) if isInBackOff { startContainerResult.Fail(err, msg) glog.V(4).Infof("Backing Off restarting init container %+v in pod %v", container, format.Pod(pod)) return } } glog.V(4).Infof("Creating init container %+v in pod %v", container, format.Pod(pod)) if err, msg := dm.tryContainerStart(container, pod, podStatus, pullSecrets, namespaceMode, pidMode, podIP); err != nil { startContainerResult.Fail(err, msg) utilruntime.HandleError(fmt.Errorf("container start failed: %v: %s", err, msg)) return } // Successfully started the container; clear the entry in the failure glog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod)) return } if !done { // init container still running glog.V(4).Infof("An init container is still running in pod %v", format.Pod(pod)) return } if containerChanges.InitFailed { // init container still running glog.V(4).Infof("Not all init containers have succeeded for pod %v", format.Pod(pod)) return } // Start regular containers for idx := range containerChanges.ContainersToStart { container := &pod.Spec.Containers[idx] startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name) result.AddSyncResult(startContainerResult) // containerChanges.StartInfraContainer causes the containers to be restarted for config reasons if !containerChanges.StartInfraContainer { isInBackOff, err, msg := dm.doBackOff(pod, container, podStatus, backOff) if isInBackOff { startContainerResult.Fail(err, msg) glog.V(4).Infof("Backing Off restarting container %+v in pod %v", container, format.Pod(pod)) continue } } glog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod)) if err, msg := dm.tryContainerStart(container, pod, podStatus, pullSecrets, namespaceMode, pidMode, podIP); err != nil { startContainerResult.Fail(err, msg) utilruntime.HandleError(fmt.Errorf("container start failed: %v: %s", err, msg)) continue } } return }
(2) createPodInfraContainer函数的处理
// createPodInfraContainer starts the pod infra container for a pod. Returns the docker container ID of the newly created container. // If any error occurs in this function, it will return a brief error and a detailed error message. func (dm *DockerManager) createPodInfraContainer(pod *api.Pod) (kubecontainer.DockerID, error, string) { start := time.Now() defer func() { metrics.ContainerManagerLatency.WithLabelValues("createPodInfraContainer").Observe(metrics.SinceInMicroseconds(start)) }() // Use host networking if specified. netNamespace := "" var ports []api.ContainerPort if kubecontainer.IsHostNetworkPod(pod) { netNamespace = namespaceModeHost } else if dm.networkPlugin.Name() == "cni" || dm.networkPlugin.Name() == "kubenet" { netNamespace = "none" } else { // Docker only exports ports from the pod infra container. Let's // collect all of the relevant ports and export them. for _, container := range pod.Spec.InitContainers { ports = append(ports, container.Ports...) } for _, container := range pod.Spec.Containers { ports = append(ports, container.Ports...) } } container := &api.Container{ Name: PodInfraContainerName, Image: dm.podInfraContainerImage, Ports: ports, ImagePullPolicy: podInfraContainerImagePullPolicy, Env: dm.podInfraContainerEnv, } // No pod secrets for the infra container. // The message isn't needed for the Infra container if err, msg := dm.imagePuller.PullImage(pod, container, nil); err != nil { return "", err, msg } // Currently we don't care about restart count of infra container, just set it to 0. id, err := dm.runContainerInPod(pod, container, netNamespace, getIPCMode(pod), getPidMode(pod), "", 0) if err != nil { return "", kubecontainer.ErrRunContainer, err.Error() } return kubecontainer.DockerID(id.ID), nil, "" }
step1: 判断如果没有启动network pulgin,则加入ports信息
step2: 创建infra container对象
step3: pullimage
stetp4: runContainerInPod启动一个容器
step5: kubecontainer.DockerID(id.ID)返回infra容器的id
(3) tryContainerStart函数中的处理
// tryContainerStart attempts to pull and start the container, returning an error and a reason string if the start // was not successful. func (dm *DockerManager) tryContainerStart(container *api.Container, pod *api.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, namespaceMode, pidMode, podIP string) (err error, reason string) { err, msg := dm.imagePuller.PullImage(pod, container, pullSecrets) if err != nil { return err, msg } if container.SecurityContext != nil && container.SecurityContext.RunAsNonRoot != nil && *container.SecurityContext.RunAsNonRoot { err := dm.verifyNonRoot(container) if err != nil { return kubecontainer.ErrVerifyNonRoot, err.Error() } } // For a new container, the RestartCount should be 0 restartCount := 0 containerStatus := podStatus.FindContainerStatusByName(container.Name) if containerStatus != nil { restartCount = containerStatus.RestartCount + 1 } // TODO(dawnchen): Check RestartPolicy.DelaySeconds before restart a container _, err = dm.runContainerInPod(pod, container, namespaceMode, namespaceMode, pidMode, podIP, restartCount) if err != nil { // TODO(bburns) : Perhaps blacklist a container after N failures? return kubecontainer.ErrRunContainer, err.Error() } return nil, "" }
step1: pullimage
step2: FindContainerStatusByName 和 restartCount设置
step3: runContainerInPod启动一个容器
(4) runContainerInPod函数处理
// Run a single container from a pod. Returns the docker container ID // If do not need to pass labels, just pass nil. func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Container, netMode, ipcMode, pidMode, podIP string, restartCount int) (kubecontainer.ContainerID, error) { start := time.Now() defer func() { metrics.ContainerManagerLatency.WithLabelValues("runContainerInPod").Observe(metrics.SinceInMicroseconds(start)) }() ref, err := kubecontainer.GenerateContainerRef(pod, container) if err != nil { glog.Errorf("Can't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) } else { glog.V(5).Infof("Generating ref for container %s: %#v", container.Name, ref) } opts, err := dm.runtimeHelper.GenerateRunContainerOptions(pod, container, podIP) if err != nil { return kubecontainer.ContainerID{}, fmt.Errorf("GenerateRunContainerOptions: %v", err) } utsMode := "" if kubecontainer.IsHostNetworkPod(pod) { utsMode = namespaceModeHost } oomScoreAdj := dm.calculateOomScoreAdj(pod, container) id, err := dm.runContainer(pod, container, opts, ref, netMode, ipcMode, utsMode, pidMode, restartCount, oomScoreAdj) if err != nil { return kubecontainer.ContainerID{}, fmt.Errorf("runContainer: %v", err) } // Remember this reference so we can report events about this container if ref != nil { dm.containerRefManager.SetRef(id, ref) } if container.Lifecycle != nil && container.Lifecycle.PostStart != nil { msg, handlerErr := dm.runner.Run(id, pod, container, container.Lifecycle.PostStart) if handlerErr != nil { err := fmt.Errorf("PostStart handler: %v", handlerErr) dm.generateFailedContainerEvent(id, pod.Name, kubecontainer.FailedPostStartHook, msg) dm.KillContainerInPod(id, container, pod, err.Error(), nil) return kubecontainer.ContainerID{}, err } } // Container information is used in adjusting OOM scores, adding ndots and getting the logPath. containerInfo, err := dm.client.InspectContainer(id.ID) if err != nil { return kubecontainer.ContainerID{}, fmt.Errorf("InspectContainer: %v", err) } // Create a symbolic link to the Docker container log file using a name which captures the // full pod name, the container name and the Docker container ID. Cluster level logging will // capture these symbolic filenames which can be used for search terms in Elasticsearch or for // labels for Cloud Logging. containerLogFile := containerInfo.LogPath symlinkFile := LogSymlink(dm.containerLogsDir, kubecontainer.GetPodFullName(pod), container.Name, id.ID) if err = dm.os.Symlink(containerLogFile, symlinkFile); err != nil { glog.Errorf("Failed to create symbolic link to the log file of pod %q container %q: %v", format.Pod(pod), container.Name, err) } // Check if current docker version is higher than 1.10. Otherwise, we have to apply OOMScoreAdj instead of using docker API. // TODO: Remove this logic after we stop supporting docker version < 1.10. if err = dm.applyOOMScoreAdjIfNeeded(pod, container, containerInfo); err != nil { return kubecontainer.ContainerID{}, err } // The addNDotsOption call appends the ndots option to the resolv.conf file generated by docker. // This resolv.conf file is shared by all containers of the same pod, and needs to be modified only once per pod. // we modify it when the pause container is created since it is the first container created in the pod since it holds // the networking namespace. if container.Name == PodInfraContainerName && utsMode != namespaceModeHost { err = addNDotsOption(containerInfo.ResolvConfPath) if err != nil { return kubecontainer.ContainerID{}, fmt.Errorf("addNDotsOption: %v", err) } } return id, err }
step1: GenerateContainerRef
step2: GenerateRunContainerOptions
step3: calculateOomScoreAdj
step4: runContainer启动一个容器
step5: dm.runner.Run(id, pod, container, container.Lifecycle.PostStart)
step6: InspectContainer获得容器的相关信息
step7:// Create a symbolic link to the Docker container log file
symlinkFile := LogSymlink(dm.containerLogsDir, kubecontainer.GetPodFullName(pod), container.Name, id.ID)
step8:applyOOMScoreAdjIfNeeded
step9: addNDotsOption
step10:返回容器的id
(5) runContainer函数处理
func (dm *DockerManager) runContainer( pod *api.Pod, container *api.Container, opts *kubecontainer.RunContainerOptions, ref *api.ObjectReference, netMode string, ipcMode string, utsMode string, pidMode string, restartCount int, oomScoreAdj int) (kubecontainer.ContainerID, error) { dockerName := KubeletContainerName{ PodFullName: kubecontainer.GetPodFullName(pod), PodUID: pod.UID, ContainerName: container.Name, } securityOpts, err := dm.getSecurityOpts(pod, container.Name) if err != nil { return kubecontainer.ContainerID{}, err } // Pod information is recorded on the container as labels to preserve it in the event the pod is deleted // while the Kubelet is down and there is no information available to recover the pod. // TODO: keep these labels up to date if the pod changes labels := newLabels(container, pod, restartCount, dm.enableCustomMetrics) // TODO(random-liu): Remove this when we start to use new labels for KillContainerInPod if container.Lifecycle != nil && container.Lifecycle.PreStop != nil { // TODO: This is kind of hacky, we should really just encode the bits we need. // TODO: This is hacky because the Kubelet should be parameterized to encode a specific version // and needs to be able to migrate this whenever we deprecate v1. Should be a member of DockerManager. if data, err := runtime.Encode(api.Codecs.LegacyCodec(unversioned.GroupVersion{Group: api.GroupName, Version: "v1"}), pod); err == nil { labels[kubernetesPodLabel] = string(data) } else { glog.Errorf("Failed to encode pod: %s for prestop hook", pod.Name) } } memoryLimit := container.Resources.Limits.Memory().Value() cpuRequest := container.Resources.Requests.Cpu() cpuLimit := container.Resources.Limits.Cpu() nvidiaGPULimit := container.Resources.Limits.NvidiaGPU() var cpuShares int64 // If request is not specified, but limit is, we want request to default to limit. // API server does this for new containers, but we repeat this logic in Kubelet // for containers running on existing Kubernetes clusters. if cpuRequest.IsZero() && !cpuLimit.IsZero() { cpuShares = milliCPUToShares(cpuLimit.MilliValue()) } else { // if cpuRequest.Amount is nil, then milliCPUToShares will return the minimal number // of CPU shares. cpuShares = milliCPUToShares(cpuRequest.MilliValue()) } var devices []dockercontainer.DeviceMapping if nvidiaGPULimit.Value() != 0 { // Experimental. For now, we hardcode /dev/nvidia0 no matter what the user asks for // (we only support one device per node). devices = []dockercontainer.DeviceMapping{ {"/dev/nvidia0", "/dev/nvidia0", "mrw"}, {"/dev/nvidiactl", "/dev/nvidiactl", "mrw"}, {"/dev/nvidia-uvm", "/dev/nvidia-uvm", "mrw"}, } } podHasSELinuxLabel := pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.SELinuxOptions != nil binds := makeMountBindings(opts.Mounts, podHasSELinuxLabel) // The reason we create and mount the log file in here (not in kubelet) is because // the file's location depends on the ID of the container, and we need to create and // mount the file before actually starting the container. // TODO(yifan): Consider to pull this logic out since we might need to reuse it in // other container runtime. _, containerName, cid := BuildDockerName(dockerName, container) if opts.PodContainerDir != "" && len(container.TerminationMessagePath) != 0 { // Because the PodContainerDir contains pod uid and container name which is unique enough, // here we just add an unique container id to make the path unique for different instances // of the same container. containerLogPath := path.Join(opts.PodContainerDir, cid) fs, err := os.Create(containerLogPath) if err != nil { // TODO: Clean up the previouly created dir? return the error? glog.Errorf("Error on creating termination-log file %q: %v", containerLogPath, err) } else { fs.Close() // Close immediately; we're just doing a `touch` here b := fmt.Sprintf("%s:%s", containerLogPath, container.TerminationMessagePath) binds = append(binds, b) } } hc := &dockercontainer.HostConfig{ Binds: binds, NetworkMode: dockercontainer.NetworkMode(netMode), IpcMode: dockercontainer.IpcMode(ipcMode), UTSMode: dockercontainer.UTSMode(utsMode), PidMode: dockercontainer.PidMode(pidMode), ReadonlyRootfs: readOnlyRootFilesystem(container), Resources: dockercontainer.Resources{ Memory: memoryLimit, MemorySwap: -1, CPUShares: cpuShares, Devices: devices, }, SecurityOpt: securityOpts, } // If current api version is newer than docker 1.10 requested, set OomScoreAdj to HostConfig result, err := dm.checkDockerAPIVersion(dockerV110APIVersion) if err != nil { glog.Errorf("Failed to check docker api version: %v", err) } else if result >= 0 { hc.OomScoreAdj = oomScoreAdj } if dm.cpuCFSQuota { // if cpuLimit.Amount is nil, then the appropriate default value is returned to allow full usage of cpu resource. cpuQuota, cpuPeriod := milliCPUToQuota(cpuLimit.MilliValue()) hc.CPUQuota = cpuQuota hc.CPUPeriod = cpuPeriod } if len(opts.CgroupParent) > 0 { hc.CgroupParent = opts.CgroupParent } dockerOpts := dockertypes.ContainerCreateConfig{ Name: containerName, Config: &dockercontainer.Config{ Env: makeEnvList(opts.Envs), Image: container.Image, WorkingDir: container.WorkingDir, Labels: labels, // Interactive containers: OpenStdin: container.Stdin, StdinOnce: container.StdinOnce, Tty: container.TTY, }, HostConfig: hc, } // Set network configuration for infra-container if container.Name == PodInfraContainerName { setInfraContainerNetworkConfig(pod, netMode, opts, &dockerOpts) } setEntrypointAndCommand(container, opts, dockerOpts) glog.V(3).Infof("Container %v/%v/%v: setting entrypoint \"%v\" and command \"%v\"", pod.Namespace, pod.Name, container.Name, dockerOpts.Config.Entrypoint, dockerOpts.Config.Cmd) securityContextProvider := securitycontext.NewSimpleSecurityContextProvider() securityContextProvider.ModifyContainerConfig(pod, container, dockerOpts.Config) securityContextProvider.ModifyHostConfig(pod, container, dockerOpts.HostConfig) createResp, err := dm.client.CreateContainer(dockerOpts) if err != nil { dm.recorder.Eventf(ref, api.EventTypeWarning, kubecontainer.FailedToCreateContainer, "Failed to create docker container with error: %v", err) return kubecontainer.ContainerID{}, err } if len(createResp.Warnings) != 0 { glog.V(2).Infof("Container %q of pod %q created with warnings: %v", container.Name, format.Pod(pod), createResp.Warnings) } dm.recorder.Eventf(ref, api.EventTypeNormal, kubecontainer.CreatedContainer, "Created container with docker id %v", utilstrings.ShortenString(createResp.ID, 12)) if err = dm.client.StartContainer(createResp.ID); err != nil { dm.recorder.Eventf(ref, api.EventTypeWarning, kubecontainer.FailedToStartContainer, "Failed to start container with docker id %v with error: %v", utilstrings.ShortenString(createResp.ID, 12), err) return kubecontainer.ContainerID{}, err } dm.recorder.Eventf(ref, api.EventTypeNormal, kubecontainer.StartedContainer, "Started container with docker id %v", utilstrings.ShortenString(createResp.ID, 12)) return kubecontainer.DockerID(createResp.ID).ContainerID(), nil }
step1: 创建容器的处理 createResp, err := dm.client.CreateContainer(dockerOpts)
step2:启动一个容器 dm.client.StartContainer(createResp.ID)
(6) docker_client 的中的处理CreateContainer 和 StartContainer
k8s.io/kubernetes/pkg/kubelet/dockertools/kube_docker_client.go
func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfig) (*dockertypes.ContainerCreateResponse, error) { ctx, cancel := d.getTimeoutContext() defer cancel() // we provide an explicit default shm size as to not depend on docker daemon. // TODO: evaluate exposing this as a knob in the API if opts.HostConfig != nil && opts.HostConfig.ShmSize <= 0 { opts.HostConfig.ShmSize = defaultShmSize } createResp, err := d.client.ContainerCreate(ctx, opts.Config, opts.HostConfig, opts.NetworkingConfig, opts.Name) if ctxErr := contextError(ctx); ctxErr != nil { return nil, ctxErr } if err != nil { return nil, err } return &createResp, nil } func (d *kubeDockerClient) StartContainer(id string) error { ctx, cancel := d.getTimeoutContext() defer cancel() err := d.client.ContainerStart(ctx, id) if ctxErr := contextError(ctx); ctxErr != nil { return ctxErr } return err }
docker_client继续往下,调用restful接口与docker通信,真正下达创建命令
// ContainerCreate creates a new container based in the given configuration. // It can be associated with a name, but it's not mandatory. func (cli *Client) ContainerCreate(ctx context.Context, config *container.Config, hostConfig *container.HostConfig, networkingConfig *network.NetworkingConfig, containerName string) (types.ContainerCreateResponse, error) { var response types.ContainerCreateResponse query := url.Values{} if containerName != "" { query.Set("name", containerName) } body := configWrapper{ Config: config, HostConfig: hostConfig, NetworkingConfig: networkingConfig, } serverResp, err := cli.post(ctx, "/containers/create", query, body, nil) if err != nil { if serverResp != nil && serverResp.statusCode == 404 && strings.Contains(err.Error(), "No such image") { return response, imageNotFoundError{config.Image} } return response, err } err = json.NewDecoder(serverResp.body).Decode(&response) ensureReaderClosed(serverResp) return response, err }
更多推荐
所有评论(0)