前言

kubelet是k8s单独部署的应用,它是一个节点的监视器,管理器。 我们操作的所有资源信息最后的最后都是由kubelet呈现出来的,比如创建pod。所以它是k8s的核心(我认为)。
![image.png](https://img-blog.csdnimg.cn/img_convert/f434af953e0b87be12f27be40ddf1509.png#clientId=u5956da4a-678f-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=439&id=ua10a8a80&margin=[object Object]&name=image.png&originHeight=658&originWidth=943&originalType=binary&ratio=1&rotation=0&showTitle=false&size=200683&status=done&style=none&taskId=u2fe7c614-782c-4593-9e4c-2438bb1f9d1&title=&width=628.6666666666666)

创建kubelet组件

func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
	// Set global feature gates based on the value on the initial KubeletServer
	// 检查featuregates 插件 是否有实验性版本,有设置为false
	err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
	if err != nil {
		return err
	}
	// validate the initial KubeletServer (we set feature gates first, because this validation depends on feature gates)
	// 校验 kubeletServerconfig 值正确性
	if err := options.ValidateKubeletServer(s); err != nil {
		return err
	}

	// Warn if MemoryQoS enabled with cgroups v1
	if utilfeature.DefaultFeatureGate.Enabled(features.MemoryQoS) &&
		!isCgroup2UnifiedMode() {
		klog.InfoS("Warning: MemoryQoS feature only works with cgroups v2 on Linux, but enabled with cgroups v1")
	}
	// Obtain Kubelet Lock File
	// 锁文件,防止并发冲突
	if s.ExitOnLockContention && s.LockFilePath == "" {
		return errors.New("cannot exit on lock file contention: no lock file specified")
	}
	done := make(chan struct{})
	if s.LockFilePath != "" {
		klog.InfoS("Acquiring file lock", "path", s.LockFilePath)
		if err := flock.Acquire(s.LockFilePath); err != nil {
			return fmt.Errorf("unable to acquire file lock on %q: %w", s.LockFilePath, err)
		}
		if s.ExitOnLockContention {
			klog.InfoS("Watching for inotify events", "path", s.LockFilePath)
			if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
				return err
			}
		}
	}

	// Register current configuration with /configz endpoint
	// configz的作用是暴露出restful接口将 当前运行的配置信息返回。
	err = initConfigz(&s.KubeletConfiguration)
	if err != nil {
		klog.ErrorS(err, "Failed to register kubelet configuration with configz")
	}

	if len(s.ShowHiddenMetricsForVersion) > 0 {
		metrics.SetShowHidden()
	}

	// About to get clients and such, detect standaloneMode
	standaloneMode := true
	if len(s.KubeConfig) > 0 {
		standaloneMode = false
	}

	if kubeDeps == nil {
		kubeDeps, err = UnsecuredDependencies(s, featureGate)
		if err != nil {
			return err
		}
	}

	if kubeDeps.Cloud == nil {
		if !cloudprovider.IsExternal(s.CloudProvider) {
			cloudprovider.DeprecationWarningForProvider(s.CloudProvider)
			cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
			if err != nil {
				return err
			}
			if cloud != nil {
				klog.V(2).InfoS("Successfully initialized cloud provider", "cloudProvider", s.CloudProvider, "cloudConfigFile", s.CloudConfigFile)
			}
			kubeDeps.Cloud = cloud
		}
	}

	hostName, err := nodeutil.GetHostname(s.HostnameOverride)
	if err != nil {
		return err
	}
	nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
	if err != nil {
		return err
	}

	// if in standalone mode, indicate as much by setting all clients to nil
	switch {
	case standaloneMode:
		kubeDeps.KubeClient = nil
		kubeDeps.EventClient = nil
		kubeDeps.HeartbeatClient = nil
		klog.InfoS("Standalone mode, no API client")

	case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
		// 创建client客户端,与apiserver建立连接,健康检查,事件客户端
		clientConfig, onHeartbeatFailure, err := buildKubeletClientConfig(ctx, s, nodeName)
		if err != nil {
			return err
		}
		if onHeartbeatFailure == nil {
			return errors.New("onHeartbeatFailure must be a valid function other than nil")
		}
		kubeDeps.OnHeartbeatFailure = onHeartbeatFailure

		kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
		if err != nil {
			return fmt.Errorf("failed to initialize kubelet client: %w", err)
		}

		// make a separate client for events
		// 创建获取event资源的客户端
		eventClientConfig := *clientConfig
		eventClientConfig.QPS = float32(s.EventRecordQPS)
		eventClientConfig.Burst = int(s.EventBurst)
		kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
		if err != nil {
			return fmt.Errorf("failed to initialize kubelet event client: %w", err)
		}

		// make a separate client for heartbeat with throttling disabled and a timeout attached
		heartbeatClientConfig := *clientConfig
		heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
		// The timeout is the minimum of the lease duration and status update frequency
		leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
		if heartbeatClientConfig.Timeout > leaseTimeout {
			heartbeatClientConfig.Timeout = leaseTimeout
		}

		heartbeatClientConfig.QPS = float32(-1)
		// 创建心跳检测客户端,主要是apisever <-> kubelet之间的心跳检测
		kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
		if err != nil {
			return fmt.Errorf("failed to initialize kubelet heartbeat client: %w", err)
		}
	}

	// 鉴权,验证  主要由 x509证书,token,webhook
	if kubeDeps.Auth == nil {
		auth, runAuthenticatorCAReload, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
		if err != nil {
			return err
		}
		kubeDeps.Auth = auth
		runAuthenticatorCAReload(ctx.Done())
	}

	// 下面是设置了cgroup路径位置。
	// 主要设置 CAdvisor从那个路径下 获取pod 资源使用信息.
	var cgroupRoots []string
	nodeAllocatableRoot := cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupsPerQOS, s.CgroupDriver)
	// /kubepods
	cgroupRoots = append(cgroupRoots, nodeAllocatableRoot)
	kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
	if err != nil {
		klog.InfoS("Failed to get the kubelet's cgroup. Kubelet system container metrics may be missing.", "err", err)
	} else if kubeletCgroup != "" {
		cgroupRoots = append(cgroupRoots, kubeletCgroup)
	}

	if err != nil {
		klog.InfoS("Failed to get the container runtime's cgroup. Runtime system container metrics may be missing.", "err", err)
	} else if s.RuntimeCgroups != "" {
		// RuntimeCgroups is optional, so ignore if it isn't specified
		cgroupRoots = append(cgroupRoots, s.RuntimeCgroups)
	}

	if s.SystemCgroups != "" {
		// SystemCgroups is optional, so ignore if it isn't specified
		cgroupRoots = append(cgroupRoots, s.SystemCgroups)
	}

	// 对当前容器中的state 进行监控并发送给收集器 比如 prometheus
	if kubeDeps.CAdvisorInterface == nil {
		imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.RemoteRuntimeEndpoint)
		kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.RemoteRuntimeEndpoint))
		if err != nil {
			return err
		}
	}

	// Setup event recorder if required.
	// 事件记录器
	makeEventRecorder(kubeDeps, nodeName)

	if kubeDeps.ContainerManager == nil {
		if s.CgroupsPerQOS && s.CgroupRoot == "" {
			klog.InfoS("--cgroups-per-qos enabled, but --cgroup-root was not specified.  defaulting to /")
			s.CgroupRoot = "/"
		}

		machineInfo, err := kubeDeps.CAdvisorInterface.MachineInfo()
		if err != nil {
			return err
		}
		// 下面是对资源进行保留的操作
		// 为系统内核保留需要的资源
		reservedSystemCPUs, err := getReservedCPUs(machineInfo, s.ReservedSystemCPUs)
		if err != nil {
			return err
		}
		if reservedSystemCPUs.Size() > 0 {
			// at cmd option validation phase it is tested either --system-reserved-cgroup or --kube-reserved-cgroup is specified, so overwrite should be ok
			klog.InfoS("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved and SystemReserved", "kubeReservedCPUs", s.KubeReserved, "systemReservedCPUs", s.SystemReserved)
			if s.KubeReserved != nil {
				delete(s.KubeReserved, "cpu")
			}
			if s.SystemReserved == nil {
				s.SystemReserved = make(map[string]string)
			}
			s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size())
			klog.InfoS("After cpu setting is overwritten", "kubeReservedCPUs", s.KubeReserved, "systemReservedCPUs", s.SystemReserved)
		}
		// 为kubelet保留需要的资源
		kubeReserved, err := parseResourceList(s.KubeReserved)
		if err != nil {
			return err
		}
		// 为系统内核保留需要的资源
		systemReserved, err := parseResourceList(s.SystemReserved)
		if err != nil {
			return err
		}
		// 资源驱逐设置, 这个是我们设置的阈值,是OOM的上一级,当到达阈值后会驱逐pod 而OOM 驱逐POD 是系统内核的一个机制.
		var hardEvictionThresholds []evictionapi.Threshold
		// If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
		if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
			hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
			if err != nil {
				return err
			}
		}
		// 设置 oom 时 qos策略
		experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
		if err != nil {
			return err
		}

		devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)

		var cpuManagerPolicyOptions map[string]string
		if utilfeature.DefaultFeatureGate.Enabled(features.CPUManager) {
			if utilfeature.DefaultFeatureGate.Enabled(features.CPUManagerPolicyOptions) {
				cpuManagerPolicyOptions = s.CPUManagerPolicyOptions
			} else if s.CPUManagerPolicyOptions != nil {
				return fmt.Errorf("CPU Manager policy options %v require feature gates %q, %q enabled",
					s.CPUManagerPolicyOptions, features.CPUManager, features.CPUManagerPolicyOptions)
			}
		}
		// 创建ContainerManager
		kubeDeps.ContainerManager, err = cm.NewContainerManager(
			kubeDeps.Mounter,
			kubeDeps.CAdvisorInterface,
			cm.NodeConfig{
				RuntimeCgroupsName:    s.RuntimeCgroups,
				SystemCgroupsName:     s.SystemCgroups,
				KubeletCgroupsName:    s.KubeletCgroups,
				KubeletOOMScoreAdj:    s.OOMScoreAdj,
				CgroupsPerQOS:         s.CgroupsPerQOS,
				CgroupRoot:            s.CgroupRoot,
				CgroupDriver:          s.CgroupDriver,
				KubeletRootDir:        s.RootDirectory,
				ProtectKernelDefaults: s.ProtectKernelDefaults,
				NodeAllocatableConfig: cm.NodeAllocatableConfig{
					KubeReservedCgroupName:   s.KubeReservedCgroup,
					SystemReservedCgroupName: s.SystemReservedCgroup,
					EnforceNodeAllocatable:   sets.NewString(s.EnforceNodeAllocatable...),
					KubeReserved:             kubeReserved,
					SystemReserved:           systemReserved,
					ReservedSystemCPUs:       reservedSystemCPUs,
					HardEvictionThresholds:   hardEvictionThresholds,
				},
				QOSReserved:                             *experimentalQOSReserved,
				ExperimentalCPUManagerPolicy:            s.CPUManagerPolicy,
				ExperimentalCPUManagerPolicyOptions:     cpuManagerPolicyOptions,
				ExperimentalCPUManagerReconcilePeriod:   s.CPUManagerReconcilePeriod.Duration,
				ExperimentalMemoryManagerPolicy:         s.MemoryManagerPolicy,
				ExperimentalMemoryManagerReservedMemory: s.ReservedMemory,
				ExperimentalPodPidsLimit:                s.PodPidsLimit,
				EnforceCPULimits:                        s.CPUCFSQuota,
				CPUCFSQuotaPeriod:                       s.CPUCFSQuotaPeriod.Duration,
				ExperimentalTopologyManagerPolicy:       s.TopologyManagerPolicy,
				ExperimentalTopologyManagerScope:        s.TopologyManagerScope,
			},
			s.FailSwapOn,
			devicePluginEnabled,
			kubeDeps.Recorder)

		if err != nil {
			return err
		}
	}

	// 将kubelet进程设置OOM策略分数为指定分数
	oomAdjuster := kubeDeps.OOMAdjuster
	if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
		klog.InfoS("Failed to ApplyOOMScoreAdj", "err", err)
	}

	// 运行时,例如docker,创建docker客户端连接器。
	err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration, kubeDeps, s.RemoteRuntimeEndpoint, s.RemoteImageEndpoint)
	if err != nil {
		return err
	}

	// 运行kubelet
	if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
		return err
	}

	if s.HealthzPort > 0 {
		mux := http.NewServeMux()
		healthz.InstallHandler(mux)
		go wait.Until(func() {
			err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
			if err != nil {
				klog.ErrorS(err, "Failed to start healthz server")
			}
		}, 5*time.Second, wait.NeverStop)
	}
	return nil
}

重要组件kubelet.Server

启动参数的配置含义请参考下文提供的链接进行了解。
下面我们介绍一下kubelet.Dependencies 这个结构体。

kubelet.Dependencies

该结构体主要存储了kubelet全局配置,比如kubeclient,auth,CAdvisor,EventClient等,翻译上是kubelet依赖,那么也等于kubelet的一些通用配置。

CAdvisor

主要提供容器静态信息,状态信息,以及设定的监控指标信息。
监控cgroupPath 根据此判断容器是否创建。

func (m *manager) createContainerLocked(containerName string, watchSource watcher.ContainerWatchSource) error {
	namespacedName := namespacedContainerName{
		Name: containerName,
	}

	// Check that the container didn't already exist.
	if _, ok := m.containers[namespacedName]; ok {
		return nil
	}

	handler, accept, err := container.NewContainerHandler(containerName, watchSource, m.containerEnvMetadataWhiteList, m.inHostNamespace)
	if err != nil {
		return err
	}
	if !accept {
		// ignoring this container.
		klog.V(4).Infof("ignoring container %q", containerName)
		return nil
	}
	collectorManager, err := collector.NewCollectorManager()
	if err != nil {
		return err
	}

	logUsage := *logCadvisorUsage && containerName == m.cadvisorContainer
	cont, err := newContainerData(containerName, m.memoryCache, handler, logUsage, collectorManager, m.maxHousekeepingInterval, m.allowDynamicHousekeeping, clock.RealClock{})
	if err != nil {
		return err
	}

	if !cgroups.IsCgroup2UnifiedMode() {
		devicesCgroupPath, err := handler.GetCgroupPath("devices")
		if err != nil {
			klog.Warningf("Error getting devices cgroup path: %v", err)
		} else {
			cont.nvidiaCollector, err = m.nvidiaManager.GetCollector(devicesCgroupPath)
			if err != nil {
				klog.V(4).Infof("GPU metrics may be unavailable/incomplete for container %s: %s", cont.info.Name, err)
			}
		}
	}
	if m.includedMetrics.Has(container.PerfMetrics) {
		perfCgroupPath, err := handler.GetCgroupPath("perf_event")
		if err != nil {
			klog.Warningf("Error getting perf_event cgroup path: %q", err)
		} else {
			cont.perfCollector, err = m.perfManager.GetCollector(perfCgroupPath)
			if err != nil {
				klog.Errorf("Perf event metrics will not be available for container %q: %v", containerName, err)
			}
		}
	}

	if m.includedMetrics.Has(container.ResctrlMetrics) {
		cont.resctrlCollector, err = m.resctrlManager.GetCollector(containerName, func() ([]string, error) {
			return cont.getContainerPids(m.inHostNamespace)
		}, len(m.machineInfo.Topology))
		if err != nil {
			klog.V(4).Infof("resctrl metrics will not be available for container %s: %s", cont.info.Name, err)
		}
	}

	// Add collectors
	labels := handler.GetContainerLabels()
	collectorConfigs := collector.GetCollectorConfigs(labels)
	err = m.registerCollectors(collectorConfigs, cont)
	if err != nil {
		klog.Warningf("Failed to register collectors for %q: %v", containerName, err)
	}

	// Add the container name and all its aliases. The aliases must be within the namespace of the factory.
	m.containers[namespacedName] = cont
	for _, alias := range cont.info.Aliases {
		m.containers[namespacedContainerName{
			Namespace: cont.info.Namespace,
			Name:      alias,
		}] = cont
	}

	klog.V(3).Infof("Added container: %q (aliases: %v, namespace: %q)", containerName, cont.info.Aliases, cont.info.Namespace)

	contSpec, err := cont.handler.GetSpec()
	if err != nil {
		return err
	}

	contRef, err := cont.handler.ContainerReference()
	if err != nil {
		return err
	}

	newEvent := &info.Event{
		ContainerName: contRef.Name,
		Timestamp:     contSpec.CreationTime,
		EventType:     info.EventContainerCreation,
	}
	err = m.eventHandler.AddEvent(newEvent)
	if err != nil {
		return err
	}
	// Start the container's housekeeping.
	return cont.Start()
}

逻辑为监听目录也就是监听容器创建事件(这里只分析创建),然后为其创建一个ContainerHandler,该结构体主要实现以下方法

  • ContainerReference()
  • GetSpec()
  • GetStats()
  • ListContainers()

这些值对应了cri 架构。

ContainerManager

qosContainerManager

它的作用是为pod提供创建cgroup的功能,通过cgroupManager.create 创建根Cgroupboot(/kubepods)中的Burstable,BestEffort,以及cpu,内存等.

[root@master01 cgroup]# ls
blkio  cpu  cpuacct  cpu,cpuacct  cpuset  devices  freezer  hugetlb  memory  net_cls  net_cls,net_prio  net_prio  perf_event  pids  systemd

注意上面的文件,当设置资源限额后,如果是pod里设置的kubepods下面的cpu,memory等等值是由全部pod的资源总和动态更新的(对的这里是动态更新的)。
至于pod里的资源限制 是在下面的文件夹中设置,可以看到是通过kubepods-besteffort , kubepods-burstable分开设置的。

[root@master01 cgroup]# ls cpu/kubepods.slice/kubepods-besteffort.slice/
cgroup.clone_children  cpuacct.usage_percpu  cpu.shares                                                         kubepods-besteffort-pod93e35254_9260_4f0b_a89d_af89e95e2f24.slice  tasks

启动后,会定时获取pod然后根据其pod分析资源限制的类型,然后更新到系统中。

topologyManager
deviceManager

从Kubernetes 1.8开始,官方推荐使用Device Plugins方式来使用GPU、FPGA、NIC、InfiniBand等高性能硬件。
创建了一个server供硬件插件进行注册。start主要是对server进行启动监听unix套接字。
最主要的是Allocate(),它是对插件进行调用,申请资源的方法。

// 类似于过滤器,先进行申请资源,如果能够申请下来则说明该pod可以在node上部署
func (m *resourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
	pod := attrs.Pod

	for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
		err := m.deviceManager.Allocate(pod, &container)
		if err != nil {
			return admission.GetPodAdmitResult(err)
		}

		if m.cpuManager != nil {
			err = m.cpuManager.Allocate(pod, &container)
			if err != nil {
				return admission.GetPodAdmitResult(err)
			}
		}

		if m.memoryManager != nil {
			err = m.memoryManager.Allocate(pod, &container)
			if err != nil {
				return admission.GetPodAdmitResult(err)
			}
		}
	}

	return admission.GetPodAdmitResult(nil)
}
cpuManager
cset := m.state.GetCPUSetOrDefault(string(pod.UID), container.Name)
lcset := m.lastUpdateState.GetCPUSetOrDefault(string(pod.UID), container.Name)
if !cset.Equals(lcset) {
				err = m.updateContainerCPUSet(containerID, cset)
	}
m.lastUpdateState.SetCPUSet(string(pod.UID), container.Name, cset)

func (m *manager) updateContainerCPUSet(containerID string, cpus cpuset.CPUSet) error {
	// TODO: Consider adding a `ResourceConfigForContainer` helper in
	// helpers_linux.go similar to what exists for pods.
	// It would be better to pass the full container resources here instead of
	// this patch-like partial resources.
	return m.containerRuntime.UpdateContainerResources(
		containerID,
		&runtimeapi.LinuxContainerResources{
			CpusetCpus: cpus.String(),
		})
}
memoryManager

它主要是协助qos来使用的。
CPU与Memory主要解决的是NUMA机制带来的性能问题,如果没有NUMA,可以关闭他们,单机上的cpu,memory是通过qos进行管理的。

Kubelet

PodConfig

该组件的主要作用是监听pod资源,使用spec.nodename作为过滤条件,检索调度到本地的pod资源,然后对其进行操作,比如运行容器等等。

secretManager

与configmap 异曲同工,功能一致。
设置了获取secret与configmap的方法,与informer机制类似,可以说是一个翻版

configMapManager
livenessManager
readinessManager
startupManager
podCache

缓存了pod的status信息,主要用于pleg
没有start方法,那么也就是说它只是一个存储器,存储了pod对应的status信息

type Cache interface {
	Get(types.UID) (*PodStatus, error)
	Set(types.UID, *PodStatus, error, time.Time)
	// GetNewerThan is a blocking call that only returns the status
	// when it is newer than the given time.
	GetNewerThan(types.UID, time.Time) (*PodStatus, error)
	Delete(types.UID)
	UpdateTime(time.Time)
}
podManager

pod管理器,记录了当前kubelet中所有pod信息
在每次添加pod触发sysnloop中,都会调用podManager.set(pod) 将该pod添加到管理器中,而其余组件获取pod都是通过该管理器进行获取

statusManager

状态管理器,主要用来更新pod的状态到apiserver。
启动后,会监听管道,触发事件,事件就是更新pod状态到apiserver。管道可以通过setPodStatus()进行触发

podWorkers

该方法主要对更改后的pod进行一系列的操作。其主要被sysnloop进行调用,是核心的核心。

	klet.podWorkers = newPodWorkers(
		klet.syncPod,
		klet.syncTerminatingPod,
		klet.syncTerminatedPod,

		kubeDeps.Recorder,
		klet.workQueue,
		klet.resyncInterval,
		backOffPeriod,
		klet.podCache,
	)
runtimeService

运行时client
也就是CRI

containerRuntime

容器运行时,是对上面的一个封装

runtimeState

主要记录运行时的网络,存储的状态,是这个运行时的状态而不是里面启动的容器的状态。

s, err := kl.containerRuntime.Status()
	networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)
	if networkReady == nil || !networkReady.Status {
		kl.runtimeState.setNetworkState(fmt.Errorf("container runtime network not ready: %v", networkReady))
containerLogManager

容器日志管理器

StatsProvider

获取容器运行时的状态信息,比如cpu的使用情况,内存使用情况等,要比pleg全面,pleg仅仅关心pod是否runing或者create还是delete这几个状态。可以说是对上面status管理器的一个封装。

func (p *Provider) GetContainerInfo(podFullName string, podUID types.UID, containerName string, req *cadvisorapiv1.ContainerInfoRequest) (*cadvisorapiv1.ContainerInfo, error) {
	podUID = types.UID(p.podManager.TranslatePodUID(podUID))
	pods, err := p.runtimeCache.GetPods()
	pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
	container := pod.FindContainerByName(containerName)
	ci, err := p.cadvisor.DockerContainer(container.ID.ID, req)
	return &ci, nil
}

因为本身是状态组件的一个封装,所以不需要start

pleg

pod生命周期管理。
获取当前pod的状态(也就是pod中的容器的状态),然后根据状态作出响应。

func (g *GenericPLEG) Start() {
	go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
}
containerGC

容器GC处理

func (cgc *containerGC) GarbageCollect(gcPolicy kubecontainer.GCPolicy, allSourcesReady bool, evictNonDeletedPods bool) error {
	errors := []error{}
	// Remove evictable containers
	if err := cgc.evictContainers(gcPolicy, allSourcesReady, evictNonDeletedPods); err != nil {
		errors = append(errors, err)
	}

	// Remove sandboxes with zero containers
	if err := cgc.evictSandboxes(evictNonDeletedPods); err != nil {
		errors = append(errors, err)
	}

	// Remove pod sandbox log directory
	if err := cgc.evictPodLogsDirectories(allSourcesReady); err != nil {
		errors = append(errors, err)
	}
	return utilerrors.NewAggregate(errors)
}

go wait.Until(func() {
		if err := kl.containerGC.GarbageCollect(); err != nil {
		} else {
		}
	}, ContainerGCPeriod, wait.NeverStop)
imageManager

镜像管理器,主要是镜像GC

go wait.Until(func() {
		if err := kl.imageManager.GarbageCollect(); err != nil {
		} else {
		}
	}, ImageGCPeriod, wait.NeverStop)
serverCertificateManager

轮换证书

ProbeManager

探针管理器
管理器的作用是,将pod根据探针类型(就绪,安装,存活)创建work

case update := <-kl.livenessManager.Updates():
		if update.Result == proberesults.Failure {
			handleProbeSync(kl, update, handler, "liveness", "unhealthy")
		}
重新进行部署这个pod了

而就绪探针 直接更新apiserver里的status

switch probeType {
	case readiness:
		w.spec = container.ReadinessProbe
		w.resultsManager = m.readinessManager
		w.initialValue = results.Failure
	case liveness:
		w.spec = container.LivenessProbe
		w.resultsManager = m.livenessManager
		w.initialValue = results.Success
	case startup:
		w.spec = container.StartupProbe
		w.resultsManager = m.startupManager
		w.initialValue = results.Unknown
	}

三个类型都是由work实现的,resultsManager.set的时候就是将其发送给update。

tokenManager

主要获取SA ,主要用于volumePluginMgr中

volumePluginMgr

存储插件的管理
获取CSIDriver,该资源是配置csi插件的,比如配置attachRequired,skipAttach() ,跳过挂接操作,fsGroupPolicy ,定义底层卷是否支持在挂载之前更改卷的所有权和权限。
它是csi插件的主体,volumemanage 其实是调用它来实现功能。

pluginManager

对插件进行注册,创建csinode 资源,别的组件用它主要是获取插件csiClient,并且csinode资源是对当前插件的配置,比如说**allocatable.count,**这是一个节点上可使用的、由 CSI 驱动管理的独立卷个数的上限。调度器会根据当前值计算是否可以调度。
这里的csiClient主要用于运行NodeStageVolume,NodePublishVolume等方法

func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
	highestSupportedVersion, err := h.validateVersions("RegisterPlugin", pluginName, endpoint, versions)
	csiDrivers.Set(pluginName, Driver{
		endpoint:                endpoint,
		highestSupportedVersion: highestSupportedVersion,
	})
    // 
	csi, err := newCsiDriverClient(csiDriverName(pluginName))

	ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
	defer cancel()

	driverNodeID, maxVolumePerNode, accessibleTopology, err := csi.NodeGetInfo(ctx)

	err = nim.InstallCSIDriver(pluginName, driverNodeID, maxVolumePerNode, accessibleTopology)
}

volumeManager

全局目录(deviceMountPath):/var/lib/kubelet/plugins/csi-nfsplugin/
容器卷管理是这样的,有两部分组成desiredStateOfWorldPopulator,reconciler,这与上面pluginManager机制类似,desiredStateOfWorldPopulator是想要实现的效果也就是pod将要挂载的卷,reconciler就是根据desiredStateOfWorld的值进行一系列的挂载操作。
根据pvc指定的容器卷从volumePluginMgr获取插件
运行该插件,Attacher()操作,目的是创建VolumeAttachment,它的作用是将指定卷挂接到指定节点,而csicontroller则需要对该资源进行监听然后进行挂载到物理机上等操作。最后调用的是NodeStageVolume
等待Attacher状态是成功,这里注意csi插件可能没有实现Attacher功能,它主要由external-attacher管理,如果csi没有实现,那么kubelet会自己进行挂载操作。MountDevice() 方法挂载到全局目录中
SetUp() 将全局目录挂载到pod中 最后调用的是NodePublishVolume方法


func (rc *reconciler) mountOrAttachVolumes() {
	// Ensure volumes that should be attached/mounted are attached/mounted.
	for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
		volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, volumeToMount.PersistentVolumeSize)
		volumeToMount.DevicePath = devicePath
		if cache.IsVolumeNotAttachedError(err) {
			rc.waitForVolumeAttach(volumeToMount)
		} else if !volMounted || cache.IsRemountRequiredError(err) {
			rc.mountAttachedVolumes(volumeToMount, err)
		} else if cache.IsFSResizeRequiredError(err) {
			fsResizeRequiredErr, _ := err.(cache.FsResizeRequiredError)
			rc.expandVolume(volumeToMount, fsResizeRequiredErr.CurrentSize)
		}
	}
}

注意:对于动态创建pv来说,是需要通过pvControllerManager调用provision()->CreateVolume()在文件服务器上面创建相对应的子目录,而对于静态pv来说,因为本身不是动态分配资源是手工指定,那么也就不用动态在文件服务器上创建子目录(属于自己的目录),那么也就不会调用CreateVolume方法。直接走Attacher()与SetUp()即可

evictionManager
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) []*v1.Pod {
	// if we have nothing to do, just return
	// 资源阈值配置信息
	thresholds := m.config.Thresholds
    if m.dedicatedImageFs == nil {
		// diskInfoProvider 就是StatsProvider
		// 而StatsProvider 就是提供当前运行时的状态信息以及物理信息等.
		hasImageFs, ok := diskInfoProvider.HasDedicatedImageFs()
		if ok != nil {
			return nil
		}
		m.dedicatedImageFs = &hasImageFs
		// 根据资源类型存储pod排序方法
		m.signalToRankFunc = buildSignalToRankFunc(hasImageFs)
		// 根据资源类型存储相对应的GC方法
		m.signalToNodeReclaimFuncs = buildSignalToNodeReclaimFuncs(m.imageGC, m.containerGC, hasImageFs)
	}
	// 获取所有pod的方法
	activePods := podFunc()
	updateStats := true
	// 获取当前node信息以及pods 信息状态
	summary, err := m.summaryProvider.Get(updateStats)
	if err != nil {
		klog.ErrorS(err, "Eviction manager: failed to get summary stats")
		return nil
	}
	// make observations and get a function to derive pod usage stats relative to those observations.
	// 创建一个照影,通过分析summary中的节点,pod信息,进行整合,比如内存当前使用情况
	observations, statsFunc := makeSignalObservations(summary)
	debugLogObservations("observations", observations)

	// determine the set of thresholds met independent of grace period
	// 这里判断资源使用情况是否到达阈值
	thresholds = thresholdsMet(thresholds, observations, false)
	debugLogThresholdsWithObservation("thresholds - ignoring grace period", thresholds, observations)
	// 与上次的observations进行比对,如果两个资源一致则不用驱逐,所以可以消除它的处理
	if len(m.thresholdsMet) > 0 {
		thresholdsNotYetResolved := thresholdsMet(m.thresholdsMet, observations, true)
		thresholds = mergeThresholds(thresholds, thresholdsNotYetResolved)
	}
	// rank the thresholds by eviction priority
	sort.Sort(byEvictionPriority(thresholds))
	thresholdToReclaim, resourceToReclaim, foundAny := getReclaimableThreshold(thresholds)
	// 这里是先用GC处理器运行一遍查看是否到达阈值,如果还到达阈值那就开始进行驱逐
	m.reclaimNodeLevelResources(thresholdToReclaim.Signal, resourceToReclaim) 

	// 根据资源类型获取驱逐排序队列
	rank, ok := m.signalToRankFunc[thresholdToReclaim.Signal]
	// 对当前pods进行排序
	rank(activePods, statsFunc)
	// 进行pod驱逐
	for i := range activePods {
		pod := activePods[i]
		gracePeriodOverride := int64(0)
		if !isHardEvictionThreshold(thresholdToReclaim) {
			gracePeriodOverride = m.config.MaxPodGracePeriodSeconds
		}
		message, annotations := evictionMessage(resourceToReclaim, pod, statsFunc)
		if m.evictPod(pod, gracePeriodOverride, message, annotations) {
			metrics.Evictions.WithLabelValues(string(thresholdToReclaim.Signal)).Inc()
			return []*v1.Pod{pod}
		}
	}
	return nil
}
shutdownManager

优雅关闭kubelet,在关闭过程中使用adim禁止一切新pod

setNodeStatusFuncs

设置node的状态
tryUpdateNodeStatus 将node状态更新到apiserver

go wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop)
func (kl *Kubelet) setNodeStatus(node *v1.Node) {
	for i, f := range kl.setNodeStatusFuncs {
		klog.V(5).InfoS("Setting node status condition code", "position", i, "node", klog.KObj(node))
		if err := f(node); err != nil {
			klog.ErrorS(err, "Failed to set some node status fields", "node", klog.KObj(node))
		}
	}
}
func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {
	// if cloud is not nil, we expect the cloud resource sync manager to exist
	var nodeAddressesFunc func() ([]v1.NodeAddress, error)
	if kl.cloud != nil {
		nodeAddressesFunc = kl.cloudResourceSyncManager.NodeAddresses
	}
	var validateHostFunc func() error
	if kl.appArmorValidator != nil {
		validateHostFunc = kl.appArmorValidator.ValidateHost
	}
	var setters []func(n *v1.Node) error
	setters = append(setters,
		nodestatus.NodeAddress(kl.nodeIPs, kl.nodeIPValidator, kl.hostname, kl.hostnameOverridden, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc),
		nodestatus.MachineInfo(string(kl.nodeName), kl.maxPods, kl.podsPerCore, kl.GetCachedMachineInfo, kl.containerManager.GetCapacity,
			kl.containerManager.GetDevicePluginResourceCapacity, kl.containerManager.GetNodeAllocatableReservation, kl.recordEvent),
		nodestatus.VersionInfo(kl.cadvisor.VersionInfo, kl.containerRuntime.Type, kl.containerRuntime.Version),
		nodestatus.DaemonEndpoints(kl.daemonEndpoints),
		nodestatus.Images(kl.nodeStatusMaxImages, kl.imageManager.GetImageList),
		nodestatus.GoRuntime(),
	)
	// Volume limits
	setters = append(setters, nodestatus.VolumeLimits(kl.volumePluginMgr.ListVolumePluginWithLimits))

	setters = append(setters,
		nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent),
		nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent),
		nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent),
		nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, kl.runtimeState.storageErrors, validateHostFunc, kl.containerManager.Status, kl.shutdownManager.ShutdownStatus, kl.recordNodeStatusEvent),
		nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse),
		// TODO(mtaufen): I decided not to move this setter for now, since all it does is send an event
		// and record state back to the Kubelet runtime object. In the future, I'd like to isolate
		// these side-effects by decoupling the decisions to send events and partial status recording
		// from the Node setters.
		kl.recordNodeSchedulableEvent,
	)
	return setters
}

扩展

容器运行目录

容器默认使用的是overlayfs 格式,它是一个分层的文件系统,分为lower(基础文件目录层,该层是只读的),upper(可写层,该容器修改的内容都记录在该层里)
merge 则是把upper目录和lowerer目录合,这个合并规则就是overlayfs要实现的逻辑。
在使用如上mount进行OverlayFS合并之后,遵循如下规则:
•lowerdir和upperdir两个目录存在同名文件时,lowerdir的文件将会被隐藏,用户只能看到upperdir的文件。
•lowerdir低优先级的同目录同名文件将会被隐藏。
•如果存在同名目录,那么lowerdir和upperdir目录中的内容将会合并。
•当用户修改mergedir中来自upperdir的数据时,数据将直接写入upperdir中原来目录中,删除文件也同理。
•当用户修改mergedir中来自lowerdir的数据时,lowerdir中内容均不会发生任何改变。因为lowerdir是只读的,用户想修改来自lowerdir数据时,overlayfs会首先拷贝一份lowerdir中文件副本到upperdir中(这也被称作OverlayFS的copy-up特性)。后续修改或删除将会在upperdir下的副本中进行,lowerdir中原文件将会被隐藏。
•如果某一个目录单纯来自lowerdir或者lowerdir和upperdir合并,默认无法进行rename系统调用。但是可以通过mv重命名。如果要支持rename,需要CONFIG_OVERLAY_FS_REDIRECT_DIR。
合并规则

  1. 如果 upper 和 lower 目录下同时存在同一文件,那么按 upper 目录的文件为准。比如 upper 与 lower 目录下同时存在文件 a.txt,那么按 upper 目录的 a.txt 文件为准。
  2. 如果 upper 和 lower 目录下同时存在同一目录,那么把 upper 目录与 lower 目录的内容合并起来。比如 upper 与 lower 目录下同时存在目录 test,那么把 upper 目录下的 test 目录中的内容与 lower 目录下的 test 目录中的内容合并起来。
overlay on  # 解释了挂载到下面的这个目录的是什么文件
/run/containerd/io.containerd.runtime.v2.task/k8s.io/0aefb24c56bbfd8ebf42c0f9e11b627cfb31207de0c12dff99af1b8b3637cb88/rootfs  # merged 层,也就是合并展示层
type overlay  # 说明挂载的类型是 overlay
(rw, # 读写
relatime,
lowerdir=/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/18/fs:
/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/17/fs: 
/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/16/fs,  # lowerdir层,不可层
upperdir=/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/1037/fs,
# 可写层
workdir=/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/1037/work
# 合并层,只是我们看不到然后挂载到了merged层中。
)

参考文献

https://kubernetes.io/zh-cn/docs/tasks/administer-cluster/kubelet-config-file/ (kubelet 配置说明)
https://cloud.tencent.com/developer/article/1804676 (qosContainerManager)
https://blog.csdn.net/qjm1993/article/details/103237944 (topology manager 讲解)
https://blog.csdn.net/kyle18826138721/article/details/117636884 (volume manager)
https://blog.csdn.net/kyle18826138721/article/details/118268855 (csi driver注册分析)
https://blog.csdn.net/qq_34556414/article/details/120004267(csi 原因)
https://blog.csdn.net/kyle18826138721/article/details/117386142(AD Cotroller与kubelet中的volume manager逻辑相似,都可以做Attach/Detach操作,但是kube-controller-manager与kubelet中,只会有一个组件做Attach/Detach操作,通过kubelet启动参数–enable-controller-attach-detach设置。设置为 true 表示启用kube-controller-manager的AD controller来做Attach/Detach操作,同时禁用 kubelet 执行 Attach/Detach 操作(默认值为 true))
https://blog.hdls.me/16255765577465.html (csi讲解)
https://zhuanlan.zhihu.com/p/415765175(overlayfs 结构讲解)
https://cloud.tencent.com/developer/article/1684523(OverlayFS设计与实现)
https://zhuanlan.zhihu.com/p/436450556(OverlayFS文件删除原理)

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐