kubelet源码详解(一)
kubelet前言本文没有去列出细节逻辑实现,只列出主干逻辑,代码中有注解可以简单阅读以下,k8s源码大多方法以interface层层包装的形式调用,一个interface会有较多实现(interface多态),代码中的interface的具体实现可以参考《intrface实现分析》,后续会就一处调用进行详细分析如何debug我是利用dlv工具远程调试的,远端搭建了一个3maste...
kubelet
前言
本文没有去列出细节逻辑实现,只列出主干逻辑,代码中有注解可以简单阅读以下,k8s源码大多方法以interface层层包装的形式调用,一个interface会有较多实现(interface多态),代码中的interface的具体实现可以参考《intrface实现分析》,后续会就一处调用进行详细分析
如何debug
我是利用dlv工具远程调试的,远端搭建了一个3master、3node的k8s集群,停止了一个vm的kubelet,在vm上用以下命令启动kubelet源码,进行调试。至于其他组件调试,也可以通过这种方式。
需要注意:启动命令中的参数,多去少补(通过观察日志)
dlv启动命令dlv debug --headless --listen ":2345" --log --api-version 2 -- --runtime-cgroups=/systemd/system.slice --kubelet-cgroups=/systemd/system.slice --kubeconfig=/etc/kubernetes/kubelet.conf --pod-infra-container-image=xxx/pause:3.1 --config=/var/lib/kubelet/config.yaml --cgroup-driver=cgroupfs --network-plugin=cni
Kubelet服务启动流程
kubelet服务入口
cmd/kubelet/kubelet.go
,主要负责校验参数,创建和 api-server 交互的 client 及对运行 kubelet 权限检测,启动 Kubelet 等等
func main() {
rand.Seed(time.Now().UnixNano())
command := app.NewKubeletCommand(server.SetupSignalHandler())
logs.InitLogs()
defer logs.FlushLogs()
...
}
具体实现cmd/kubelet/app/server.go
func NewKubeletCommand(stopCh <-chan struct{}) *cobra.Command {
cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
//设置默认的KubeletFlags的值,包括docker,证书路径,插件目录,包括CIDR等等信息
kubeletFlags := options.NewKubeletFlags()
//生成kubelet默认配置文件
kubeletConfig, err := options.NewKubeletConfiguration()
// programmer error
if err != nil {
klog.Fatal(err)
}
...
Run: func(cmd *cobra.Command, args []string) {
// use dynamic kubelet config, if enabled
var kubeletConfigController *dynamickubeletconfig.Controller
if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration
dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,
func(kc *kubeletconfiginternal.KubeletConfiguration) error {
// Here, we enforce flag precedence inside the controller, prior to the controller's validation sequence,
// so that we get a complete validation at the same point where we can decide to reject dynamic config.
// This fixes the flag-precedence component of issue #63305.
// See issue #56171 for general details on flag precedence.
return kubeletConfigFlagPrecedence(kc, args)
})
if err != nil {
klog.Fatal(err)
}
// If we should just use our existing, local config, the controller will return a nil config
if dynamicKubeletConfig != nil {
kubeletConfig = dynamicKubeletConfig
// Note: flag precedence was already enforced in the controller, prior to validation,
// by our above transform function. Now we simply update feature gates from the new config.
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
klog.Fatal(err)
}
}
}
// construct a KubeletServer from kubeletFlags and kubeletConfig
kubeletServer := &options.KubeletServer{
KubeletFlags: *kubeletFlags,
KubeletConfiguration: *kubeletConfig,
}
// use kubeletServer to construct the default KubeletDeps
kubeletDeps, err := UnsecuredDependencies(kubeletServer)
if err != nil {
klog.Fatal(err)
}
// add the kubelet config controller to kubeletDeps
kubeletDeps.KubeletConfigController = kubeletConfigController
// start the experimental docker shim, if enabled
if kubeletServer.KubeletFlags.ExperimentalDockershim {
if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {
klog.Fatal(err)
}
return
}
// run the kubelet
klog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil {
klog.Fatal(err)
}
},
}
...
}
func NewKubeletCommand中
// use dynamic kubelet config, if enabled
var kubeletConfigController *dynamickubeletconfig.Controller
if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration
dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,
func(kc *kubeletconfiginternal.KubeletConfiguration) error {
// Here, we enforce flag precedence inside the controller, prior to the controller's validation sequence,
// so that we get a complete validation at the same point where we can decide to reject dynamic config.
// This fixes the flag-precedence component of issue #63305.
// See issue #56171 for general details on flag precedence.
return kubeletConfigFlagPrecedence(kc, args)
})
if err != nil {
klog.Fatal(err)
}
// If we should just use our existing, local config, the controller will return a nil config
if dynamicKubeletConfig != nil {
kubeletConfig = dynamicKubeletConfig
// Note: flag precedence was already enforced in the controller, prior to validation,
// by our above transform function. Now we simply update feature gates from the new config.
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
klog.Fatal(err)
}
}
}
该功能主要是新建一个watch的功能,主要是用来watch kubelet的配置文件是否改变,如果已经改变,那么就重新load kubelet的配置文件 用的是kubernetes常用到的Controller,也就是Informer的架构,watch ConfigMap对象,进一步查看BootstrapKubeletConfigController
// BootstrapKubeletConfigController constructs and bootstrap a configuration controller
func BootstrapKubeletConfigController(dynamicConfigDir string, transform dynamickubeletconfig.TransformFunc) (*kubeletconfiginternal.KubeletConfiguration, *dynamickubeletconfig.Controller, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) {
return nil, nil, fmt.Errorf("failed to bootstrap Kubelet config controller, you must enable the DynamicKubeletConfig feature gate")
}
if len(dynamicConfigDir) == 0 {
return nil, nil, fmt.Errorf("cannot bootstrap Kubelet config controller, --dynamic-config-dir was not provided")
}
// compute absolute path and bootstrap controller
dir, err := filepath.Abs(dynamicConfigDir)
if err != nil {
return nil, nil, fmt.Errorf("failed to get absolute path for --dynamic-config-dir=%s", dynamicConfigDir)
}
// get the latest KubeletConfiguration checkpoint from disk, or return the default config if no valid checkpoints exist
c := dynamickubeletconfig.NewController(dir, transform)
kc, err := c.Bootstrap()
if err != nil {
return nil, nil, fmt.Errorf("failed to determine a valid configuration, error: %v", err)
}
return kc, c, nil
}
-------------------------
// /pkg/kubelet/kubeletconfig/controller.go
// Bootstrap attempts to return a valid KubeletConfiguration based on the configuration of the Controller,
// or returns an error if no valid configuration could be produced. Bootstrap should be called synchronously before StartSync.
// If the pre-existing local configuration should be used, Bootstrap returns a nil config.
func (cc *Controller) Bootstrap() (*kubeletconfig.KubeletConfiguration, error)
graph LR
NewKubeletCommand-->RUN
RUN-->run
这个函数主要是用来启动各种以来的服务以及kubelet的监听端口
func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())
if err := initForOS(s.KubeletFlags.WindowsService); err != nil {
return fmt.Errorf("failed OS init: %v", err)
}
//主要启动函数
if err := run(s, kubeDeps, stopCh); err != nil {
return fmt.Errorf("failed to run Kubelet: %v", err)
}
return nil
}
func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) {
...
//启动前参数准备完备,进入启动流程
if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
return err
}
}
graph LR
准备完成-->RunKubelet
RunKubelet-->CreateAndInitKubelet
RunKubelet-->startKubelet
在 RunKubelet 中主要做 CreateAndInitKubelet 和 startKubelet 两件事:
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
...
k, err := createAndInitKubelet(...)
...
// process pods and exit.
if runOnce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %v", err)
}
klog.Info("Started kubelet as runonce")
} else {
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
klog.Info("Started kubelet")
}
return nil
}
createAndInitKubelet
func createAndInitKubelet(...){
...
k, err = kubelet.NewMainKubelet(...)
if err != nil {
return nil, err
}
k.BirthCry()
k.StartGarbageCollection()
}
NewMainKubelet 实例化一个 kubelet 对象,并对 kubelet 内部各个 component 进行初始化工作:
- containerGC // 容器的垃圾回收
- statusManager // pod 状态的管理
- imageManager // 镜像的管路
- probeManager // 容器健康检测
- gpuManager // GPU 的支持
- PodCache // Pod 缓存的管理
- secretManager // secret 资源的管理
- configMapManager // configMap 资源的管理
- InitNetworkPlugin // 网络插件的初始化
- PodManager // 对 pod 的管理, e.g., CRUD
- makePodSourceConfig // pod 元数据的来源 (FILE, URL, api-server)
- diskSpaceManager // 磁盘空间的管理
- ContainerRuntime // 容器运行时的选择(docker 或 rkt)
- BirthCry // 通知 api-server 服务 kubelet 启动
- StartGarbageCollection // 开启垃圾回收服务
startKubelet
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
// start the kubelet
go wait.Until(func() {
k.Run(podCfg.Updates())
}, 0, wait.NeverStop)
// start the kubelet server
//获取 pod 及 node 的相关信息,后续会更新到etcd
if enableServer {
go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
}
if kubeCfg.ReadOnlyPort > 0 {
go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
go k.ListenAndServePodResources()
}
}
graph LR
startKubelet-->Run
pkg/kubelet/kubelet.go
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { //启动日志服务
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.kubeClient == nil {
klog.Warning("No api server defined - no node status update will be sent.")
}
// Start the cloud provider sync manager
if kl.cloudResourceSyncManager != nil {
go kl.cloudResourceSyncManager.Run(wait.NeverStop)
}
//初始化模块,包括volume 数据目录 容器日志
//启动镜像管理 启动证书管理 OOM管理
//启动资源分析器
// Start volume manager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
// Start syncing node status immediately, this may set up things the runtime needs to run.
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
go kl.fastStatusUpdateOnce()
// start syncing lease
go kl.nodeLeaseController.Run(wait.NeverStop)
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
// Start loop to sync iptables util rules
go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
// Start a goroutine responsible for killing pods (that are not properly
// handled by pod workers).
go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)
// Start component sync loops.
kl.statusManager.Start() //状态管理
kl.probeManager.Start() //探针管理
// Start syncing RuntimeClasses if enabled.
kl.runtimeClassManager.Start(wait.NeverStop)
// Start the pod lifecycle event generator.
kl.pleg.Start() //启动容器的生命周期
kl.syncLoop(updates, kl) //循环同步
}
==至此kubelet启动完成==
graph LR
Run-->synLoop
syncLoop
syncLoop is the main loop for processing changes. It watches for changes from three channels (file, apiserver, and http) and creates a union of them. For any new change seen, will run a sync against desired state and running state. If no changes are seen to the configuration, will synchronize the last known desired state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
// 准备工作
for{
time.Sleep(duration)
kl.syncLoopIteration(...)
...
}
}
graph LR
syncLoop-->syncLoopIteration
syncLoopIteration 接收来自多个方向的消息,run a sync against desired state and running state
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
case e := <-plegCh:...
case <-syncCh:...
case update := <-kl.livenessManager.Updates():...
case <-housekeepingCh:...
}
return true
}
syncLoopIteration reads from various channels and dispatches pods to the given handler. 以configCh 为例
switch u.Op {
case kubetypes.ADD:
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.RESTORE:
// These are pods restored from the checkpoint. Treat them as new pods.
handler.HandlePodAdditions(u.Pods)
}
最终的立足点还是 syncHandler(还是Kubelet 自己实现的),下面分析下 HandlePodAdditions
新建 pod开始
代码中去掉了跟创建 无关的部分,删减了日志、错误校验等
//file:/pkg/kubelet/kubelet.go---2026
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
sort.Sort(sliceutils.PodsByCreationTime(pods))
for _, pod := range pods {
...
// Always add the pod to the pod manager. Kubelet relies on the pod manager as the source of truth for the desired state. If a pod does
// not exist in the pod manager, it means that it has been deleted in the apiserver and no action (other than cleanup) is required.
kl.podManager.AddPod(pod)
...
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
kl.probeManager.AddPod(pod)
}
}
kl.podManager.AddPod
和 kl.probeManager.AddPod(pod)
都只是将pod 纳入跟踪,真正创建pod的是dispatchWork,然后又转回 kl.syncPod
//file:/pkg/kubelet/kubelet.go---1464
func (kl *Kubelet) syncPod(o syncPodOptions) error {
...
// Generate final API pod status with pod and status manager status
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
if runnable := kl.canRunPod(pod); !runnable.Admit {...}
// Update status in the status manager
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// Create Cgroups for the pod and apply resource parameters to them if cgroups-per-qos flag is enabled.
pcm := kl.containerManager.NewPodContainerManager()
// Make data directories for the pod
kl.makePodDataDirs(pod);
// Fetch the pull secrets for the pod
pullSecrets := kl.getPullSecretsForPod(pod)
// Call the container runtime's SyncPod callback
result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
...
}
kubeGenericRuntimeManager.syncPod
//file:/pkg/kubelet/kuberuntime/kuberuntime_manager.go---618
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
// Step 1: Compute sandbox and container changes.
podContainerChanges := m.computePodActions(pod, podStatus)
...
// Step 4: Create a sandbox for the pod if necessary.
podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
// Get podSandboxConfig for containers to start.
podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
// Step 5: start the init container.
if container := podContainerChanges.NextInitContainerToStart; container != nil {
// Start the next init container.
msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeInit);
}
// Step 6: start containers in podContainerChanges.ContainersToStart.
for _, idx := range podContainerChanges.ContainersToStart {
container := &pod.Spec.Containers[idx]
msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeRegular);
}
...
}
m.createPodSandbox 和 startContainer
pkg/kubelet/kuberuntime/包中,kuberuntime_manager.go 定义了 kubeGenericRuntimeManager struct 及其接口方法实现,但接口方法的内部依赖方法 分散在 package 下的其它go文件中。其本质是将 一个“类方法”分散在了多个go 文件中,多个文件合起来 组成了kubeGenericRuntimeManager 类实现。
这个方法的内容也非常多,它的主要逻辑是先比较传递过来的 pod 信息和实际运行的 pod(对于新建 pod 来说后者为空),计算出两者的差别,也就是需要更新的地方。然后先创建 infra 容器,配置好网络,然后再逐个创建应用容器。
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, container *v1.Container, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, containerType kubecontainer.ContainerType) (string, error) {
// Step 1: pull the image.
imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets)
// Step 2: create the container.
ref, err := kubecontainer.GenerateContainerRef(pod, container)
containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, containerType)
containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
// Step 3: start the container.
err = m.runtimeService.StartContainer(containerID)
// Step 4: execute the post start hook.
msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
}
- 默认sandbox image为 gcr.io/google_containers/pause-amd64:3.0
- ensureImageExists 当镜像不存在是进行拉取工作
- CreateContainer最终通过docker API POST方法调用 /containers/create
- CreateCheckpoint写入文件,文件名为容器ID
- StartContainer最终通过docker API POST方法调用 /containers/containerID/start
- 重新写入resolv.conf由docker产生,pod里的容器共享
- InspectContainer最终通过docker API GET方法调用 /containers/containerID/json
- 为容器建立网络,通过CNI建立网络,建立loopback接口,建立网络设置为混杂模式(调用命令ip link show dev / ip set bridgeName promisc on)。
以下为debug,PodSandbox从create到start的过程,一直到请求发送docker结束,其他调用与本次调用相似,可以自行debug
func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {
podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)
if err != nil {
message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
klog.Error(message)
return "", message, err
}
// Create pod logs directory
err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
if err != nil {
message := fmt.Sprintf("Create pod log directory for pod %q failed: %v", format.Pod(pod), err)
klog.Errorf(message)
return "", message, err
}
runtimeHandler := ""
if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) && m.runtimeClassManager != nil {
runtimeHandler, err = m.runtimeClassManager.LookupRuntimeHandler(pod.Spec.RuntimeClassName)
if err != nil {
message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)
return "", message, err
}
if runtimeHandler != "" {
klog.V(2).Infof("Running pod %s with RuntimeHandler %q", format.Pod(pod), runtimeHandler)
}
}
//启动RunPodSandbox
podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)
if err != nil {
message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)
klog.Error(message)
return "", message, err
}
return podSandBoxID, "", nil
}
/workspace/goWorkspace/src/k8s.io/kubernetes/pkg/kubelet/kuberuntime/instrumented_services.go
func (in instrumentedRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
const operation = "run_podsandbox"
startTime := time.Now()
defer recordOperation(operation, startTime)
defer metrics.RunPodSandboxDuration.WithLabelValues(runtimeHandler).Observe(metrics.SinceInSeconds(startTime))
out, err := in.service.RunPodSandbox(config, runtimeHandler)
recordError(operation, err)
if err != nil {
metrics.RunPodSandboxErrors.WithLabelValues(runtimeHandler).Inc()
}
return out, err
}
/workspace/goWorkspace/src/k8s.io/kubernetes/pkg/kubelet/remote/remote_runtime.go
func (r *RemoteRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
// Use 2 times longer timeout for sandbox operation (4 mins by default)
// TODO: Make the pod sandbox timeout configurable.
ctx, cancel := getContextWithTimeout(r.timeout * 2)
defer cancel()
resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
Config: config,
RuntimeHandler: runtimeHandler,
})
if err != nil {
klog.Errorf("RunPodSandbox from runtime service failed: %v", err)
return "", err
}
if resp.PodSandboxId == "" {
errorMessage := fmt.Sprintf("PodSandboxId is not set for sandbox %q", config.GetMetadata())
klog.Errorf("RunPodSandbox failed: %s", errorMessage)
return "", errors.New(errorMessage)
}
return resp.PodSandboxId, nil
}
kubelet cri grpc-client实现
/workspace/goWorkspace/src/k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2/api.pb.go
func (c *runtimeServiceClient) RunPodSandbox(ctx context.Context, in *RunPodSandboxRequest, opts ...grpc.CallOption) (*RunPodSandboxResponse, error) {
out := new(RunPodSandboxResponse)
err := grpc.Invoke(ctx, "/runtime.v1alpha2.RuntimeService/RunPodSandbox", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
grpc-server实现在dockershim中,继续
/workspace/goWorkspace/src/k8s.io/kubernetes/pkg/kubelet/dockershim/docker_sandbox.go
func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) {
config := r.GetConfig()
// Step 1: Pull the image for the sandbox.
image := defaultSandboxImage
podSandboxImage := ds.podSandboxImage
if len(podSandboxImage) != 0 {
image = podSandboxImage
}
// NOTE: To use a custom sandbox image in a private repository, users need to configure the nodes with credentials properly.
// see: http://kubernetes.io/docs/user-guide/images/#configuring-nodes-to-authenticate-to-a-private-repository
// Only pull sandbox image when it's not present - v1.PullIfNotPresent.
if err := ensureSandboxImageExists(ds.client, image); err != nil {
return nil, err
}
// Step 2: Create the sandbox container.
if r.GetRuntimeHandler() != "" {
return nil, fmt.Errorf("RuntimeHandler %q not supported", r.GetRuntimeHandler())
}
createConfig, err := ds.makeSandboxDockerConfig(config, image)
if err != nil {
return nil, fmt.Errorf("failed to make sandbox docker config for pod %q: %v", config.Metadata.Name, err)
}
createResp, err := ds.client.CreateContainer(*createConfig)
if err != nil {
createResp, err = recoverFromCreationConflictIfNeeded(ds.client, *createConfig, err)
}
if err != nil || createResp == nil {
return nil, fmt.Errorf("failed to create a sandbox for pod %q: %v", config.Metadata.Name, err)
}
resp := &runtimeapi.RunPodSandboxResponse{PodSandboxId: createResp.ID}
ds.setNetworkReady(createResp.ID, false)
defer func(e *error) {
// Set networking ready depending on the error return of
// the parent function
if *e == nil {
ds.setNetworkReady(createResp.ID, true)
}
}(&err)
// Step 3: Create Sandbox Checkpoint.
if err = ds.checkpointManager.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil {
return nil, err
}
// Step 4: Start the sandbox container.
// Assume kubelet's garbage collector would remove the sandbox later, if
// startContainer failed.
err = ds.client.StartContainer(createResp.ID)
if err != nil {
return nil, fmt.Errorf("failed to start sandbox container for pod %q: %v", config.Metadata.Name, err)
}
// Rewrite resolv.conf file generated by docker.
// NOTE: cluster dns settings aren't passed anymore to docker api in all cases,
// not only for pods with host network: the resolver conf will be overwritten
// after sandbox creation to override docker's behaviour. This resolv.conf
// file is shared by all containers of the same pod, and needs to be modified
// only once per pod.
if dnsConfig := config.GetDnsConfig(); dnsConfig != nil {
containerInfo, err := ds.client.InspectContainer(createResp.ID)
if err != nil {
return nil, fmt.Errorf("failed to inspect sandbox container for pod %q: %v", config.Metadata.Name, err)
}
if err := rewriteResolvFile(containerInfo.ResolvConfPath, dnsConfig.Servers, dnsConfig.Searches, dnsConfig.Options); err != nil {
return nil, fmt.Errorf("rewrite resolv.conf failed for pod %q: %v", config.Metadata.Name, err)
}
}
// Do not invoke network plugins if in hostNetwork mode.
if config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtimeapi.NamespaceMode_NODE {
return resp, nil
}
// Step 5: Setup networking for the sandbox.
// All pod networking is setup by a CNI plugin discovered at startup time.
// This plugin assigns the pod ip, sets up routes inside the sandbox,
// creates interfaces etc. In theory, its jurisdiction ends with pod
// sandbox networking, but it might insert iptables rules or open ports
// on the host as well, to satisfy parts of the pod spec that aren't
// recognized by the CNI standard yet.
cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID)
networkOptions := make(map[string]string)
if dnsConfig := config.GetDnsConfig(); dnsConfig != nil {
// Build DNS options.
dnsOption, err := json.Marshal(dnsConfig)
if err != nil {
return nil, fmt.Errorf("failed to marshal dns config for pod %q: %v", config.Metadata.Name, err)
}
networkOptions["dns"] = string(dnsOption)
}
err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations, networkOptions)
if err != nil {
errList := []error{fmt.Errorf("failed to set up sandbox container %q network for pod %q: %v", createResp.ID, config.Metadata.Name, err)}
// Ensure network resources are cleaned up even if the plugin
// succeeded but an error happened between that success and here.
err = ds.network.TearDownPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID)
if err != nil {
errList = append(errList, fmt.Errorf("failed to clean up sandbox container %q network for pod %q: %v", createResp.ID, config.Metadata.Name, err))
}
err = ds.client.StopContainer(createResp.ID, defaultSandboxGracePeriod)
if err != nil {
errList = append(errList, fmt.Errorf("failed to stop sandbox container %q for pod %q: %v", createResp.ID, config.Metadata.Name, err))
}
return resp, utilerrors.NewAggregate(errList)
}
return resp, nil
}
/workspace/goWorkspace/src/k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker/instrumented_client.go
func (in instrumentedInterface) StartContainer(id string) error {
const operation = "start_container"
defer recordOperation(operation, time.Now())
err := in.client.StartContainer(id)
recordError(operation, err)
return err
}
/workspace/goWorkspace/src/k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker/kube_docker_client.go
func (d *kubeDockerClient) StartContainer(id string) error {
ctx, cancel := d.getTimeoutContext()
defer cancel()
err := d.client.ContainerStart(ctx, id, dockertypes.ContainerStartOptions{})
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
return err
}
grpc调用到docker至此结束
/workspace/goWorkspace/src/k8s.io/kubernetes/vendor/github.com/docker/docker/client/container_start.go
func (cli *Client) ContainerStart(ctx context.Context, containerID string, options types.ContainerStartOptions) error {
query := url.Values{}
if len(options.CheckpointID) != 0 {
query.Set("checkpoint", options.CheckpointID)
}
if len(options.CheckpointDir) != 0 {
query.Set("checkpoint-dir", options.CheckpointDir)
}
resp, err := cli.post(ctx, "/containers/"+containerID+"/start", query, nil, nil)
ensureReaderClosed(resp)
return err
}
参考资料:
http://qiankunli.github.io/2018/12/31/kubernetes_source_kubelet.html#%E6%96%B0%E5%BB%BA-pod
https://cizixs.com/2017/06/07/kubelet-source-code-analysis-part-2/
https://toutiao.io/posts/z2e88b/preview
更多推荐
所有评论(0)