kubelet源码分析-创建
kubelet是k8s单独部署的应用,它是一个节点的监视器,管理器。 我们操作的所有资源信息最后的最后都是由kubelet呈现出来的,比如创建pod。所以它是k8s的核心(我认为)。![image.png](https://img-blog.csdnimg.cn/img_convert/f434af953e0b87be12f27be40ddf1509.png#clientId=u5956da4a-
前言
kubelet是k8s单独部署的应用,它是一个节点的监视器,管理器。 我们操作的所有资源信息最后的最后都是由kubelet呈现出来的,比如创建pod。所以它是k8s的核心(我认为)。
![image.png](https://img-blog.csdnimg.cn/img_convert/f434af953e0b87be12f27be40ddf1509.png#clientId=u5956da4a-678f-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=439&id=ua10a8a80&margin=[object Object]&name=image.png&originHeight=658&originWidth=943&originalType=binary&ratio=1&rotation=0&showTitle=false&size=200683&status=done&style=none&taskId=u2fe7c614-782c-4593-9e4c-2438bb1f9d1&title=&width=628.6666666666666)
创建kubelet组件
func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
// Set global feature gates based on the value on the initial KubeletServer
// 检查featuregates 插件 是否有实验性版本,有设置为false
err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
if err != nil {
return err
}
// validate the initial KubeletServer (we set feature gates first, because this validation depends on feature gates)
// 校验 kubeletServerconfig 值正确性
if err := options.ValidateKubeletServer(s); err != nil {
return err
}
// Warn if MemoryQoS enabled with cgroups v1
if utilfeature.DefaultFeatureGate.Enabled(features.MemoryQoS) &&
!isCgroup2UnifiedMode() {
klog.InfoS("Warning: MemoryQoS feature only works with cgroups v2 on Linux, but enabled with cgroups v1")
}
// Obtain Kubelet Lock File
// 锁文件,防止并发冲突
if s.ExitOnLockContention && s.LockFilePath == "" {
return errors.New("cannot exit on lock file contention: no lock file specified")
}
done := make(chan struct{})
if s.LockFilePath != "" {
klog.InfoS("Acquiring file lock", "path", s.LockFilePath)
if err := flock.Acquire(s.LockFilePath); err != nil {
return fmt.Errorf("unable to acquire file lock on %q: %w", s.LockFilePath, err)
}
if s.ExitOnLockContention {
klog.InfoS("Watching for inotify events", "path", s.LockFilePath)
if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
return err
}
}
}
// Register current configuration with /configz endpoint
// configz的作用是暴露出restful接口将 当前运行的配置信息返回。
err = initConfigz(&s.KubeletConfiguration)
if err != nil {
klog.ErrorS(err, "Failed to register kubelet configuration with configz")
}
if len(s.ShowHiddenMetricsForVersion) > 0 {
metrics.SetShowHidden()
}
// About to get clients and such, detect standaloneMode
standaloneMode := true
if len(s.KubeConfig) > 0 {
standaloneMode = false
}
if kubeDeps == nil {
kubeDeps, err = UnsecuredDependencies(s, featureGate)
if err != nil {
return err
}
}
if kubeDeps.Cloud == nil {
if !cloudprovider.IsExternal(s.CloudProvider) {
cloudprovider.DeprecationWarningForProvider(s.CloudProvider)
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
if err != nil {
return err
}
if cloud != nil {
klog.V(2).InfoS("Successfully initialized cloud provider", "cloudProvider", s.CloudProvider, "cloudConfigFile", s.CloudConfigFile)
}
kubeDeps.Cloud = cloud
}
}
hostName, err := nodeutil.GetHostname(s.HostnameOverride)
if err != nil {
return err
}
nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
if err != nil {
return err
}
// if in standalone mode, indicate as much by setting all clients to nil
switch {
case standaloneMode:
kubeDeps.KubeClient = nil
kubeDeps.EventClient = nil
kubeDeps.HeartbeatClient = nil
klog.InfoS("Standalone mode, no API client")
case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
// 创建client客户端,与apiserver建立连接,健康检查,事件客户端
clientConfig, onHeartbeatFailure, err := buildKubeletClientConfig(ctx, s, nodeName)
if err != nil {
return err
}
if onHeartbeatFailure == nil {
return errors.New("onHeartbeatFailure must be a valid function other than nil")
}
kubeDeps.OnHeartbeatFailure = onHeartbeatFailure
kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet client: %w", err)
}
// make a separate client for events
// 创建获取event资源的客户端
eventClientConfig := *clientConfig
eventClientConfig.QPS = float32(s.EventRecordQPS)
eventClientConfig.Burst = int(s.EventBurst)
kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet event client: %w", err)
}
// make a separate client for heartbeat with throttling disabled and a timeout attached
heartbeatClientConfig := *clientConfig
heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
// The timeout is the minimum of the lease duration and status update frequency
leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
if heartbeatClientConfig.Timeout > leaseTimeout {
heartbeatClientConfig.Timeout = leaseTimeout
}
heartbeatClientConfig.QPS = float32(-1)
// 创建心跳检测客户端,主要是apisever <-> kubelet之间的心跳检测
kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet heartbeat client: %w", err)
}
}
// 鉴权,验证 主要由 x509证书,token,webhook
if kubeDeps.Auth == nil {
auth, runAuthenticatorCAReload, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
if err != nil {
return err
}
kubeDeps.Auth = auth
runAuthenticatorCAReload(ctx.Done())
}
// 下面是设置了cgroup路径位置。
// 主要设置 CAdvisor从那个路径下 获取pod 资源使用信息.
var cgroupRoots []string
nodeAllocatableRoot := cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupsPerQOS, s.CgroupDriver)
// /kubepods
cgroupRoots = append(cgroupRoots, nodeAllocatableRoot)
kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
if err != nil {
klog.InfoS("Failed to get the kubelet's cgroup. Kubelet system container metrics may be missing.", "err", err)
} else if kubeletCgroup != "" {
cgroupRoots = append(cgroupRoots, kubeletCgroup)
}
if err != nil {
klog.InfoS("Failed to get the container runtime's cgroup. Runtime system container metrics may be missing.", "err", err)
} else if s.RuntimeCgroups != "" {
// RuntimeCgroups is optional, so ignore if it isn't specified
cgroupRoots = append(cgroupRoots, s.RuntimeCgroups)
}
if s.SystemCgroups != "" {
// SystemCgroups is optional, so ignore if it isn't specified
cgroupRoots = append(cgroupRoots, s.SystemCgroups)
}
// 对当前容器中的state 进行监控并发送给收集器 比如 prometheus
if kubeDeps.CAdvisorInterface == nil {
imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.RemoteRuntimeEndpoint)
kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.RemoteRuntimeEndpoint))
if err != nil {
return err
}
}
// Setup event recorder if required.
// 事件记录器
makeEventRecorder(kubeDeps, nodeName)
if kubeDeps.ContainerManager == nil {
if s.CgroupsPerQOS && s.CgroupRoot == "" {
klog.InfoS("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /")
s.CgroupRoot = "/"
}
machineInfo, err := kubeDeps.CAdvisorInterface.MachineInfo()
if err != nil {
return err
}
// 下面是对资源进行保留的操作
// 为系统内核保留需要的资源
reservedSystemCPUs, err := getReservedCPUs(machineInfo, s.ReservedSystemCPUs)
if err != nil {
return err
}
if reservedSystemCPUs.Size() > 0 {
// at cmd option validation phase it is tested either --system-reserved-cgroup or --kube-reserved-cgroup is specified, so overwrite should be ok
klog.InfoS("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved and SystemReserved", "kubeReservedCPUs", s.KubeReserved, "systemReservedCPUs", s.SystemReserved)
if s.KubeReserved != nil {
delete(s.KubeReserved, "cpu")
}
if s.SystemReserved == nil {
s.SystemReserved = make(map[string]string)
}
s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size())
klog.InfoS("After cpu setting is overwritten", "kubeReservedCPUs", s.KubeReserved, "systemReservedCPUs", s.SystemReserved)
}
// 为kubelet保留需要的资源
kubeReserved, err := parseResourceList(s.KubeReserved)
if err != nil {
return err
}
// 为系统内核保留需要的资源
systemReserved, err := parseResourceList(s.SystemReserved)
if err != nil {
return err
}
// 资源驱逐设置, 这个是我们设置的阈值,是OOM的上一级,当到达阈值后会驱逐pod 而OOM 驱逐POD 是系统内核的一个机制.
var hardEvictionThresholds []evictionapi.Threshold
// If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
if err != nil {
return err
}
}
// 设置 oom 时 qos策略
experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
if err != nil {
return err
}
devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)
var cpuManagerPolicyOptions map[string]string
if utilfeature.DefaultFeatureGate.Enabled(features.CPUManager) {
if utilfeature.DefaultFeatureGate.Enabled(features.CPUManagerPolicyOptions) {
cpuManagerPolicyOptions = s.CPUManagerPolicyOptions
} else if s.CPUManagerPolicyOptions != nil {
return fmt.Errorf("CPU Manager policy options %v require feature gates %q, %q enabled",
s.CPUManagerPolicyOptions, features.CPUManager, features.CPUManagerPolicyOptions)
}
}
// 创建ContainerManager
kubeDeps.ContainerManager, err = cm.NewContainerManager(
kubeDeps.Mounter,
kubeDeps.CAdvisorInterface,
cm.NodeConfig{
RuntimeCgroupsName: s.RuntimeCgroups,
SystemCgroupsName: s.SystemCgroups,
KubeletCgroupsName: s.KubeletCgroups,
KubeletOOMScoreAdj: s.OOMScoreAdj,
CgroupsPerQOS: s.CgroupsPerQOS,
CgroupRoot: s.CgroupRoot,
CgroupDriver: s.CgroupDriver,
KubeletRootDir: s.RootDirectory,
ProtectKernelDefaults: s.ProtectKernelDefaults,
NodeAllocatableConfig: cm.NodeAllocatableConfig{
KubeReservedCgroupName: s.KubeReservedCgroup,
SystemReservedCgroupName: s.SystemReservedCgroup,
EnforceNodeAllocatable: sets.NewString(s.EnforceNodeAllocatable...),
KubeReserved: kubeReserved,
SystemReserved: systemReserved,
ReservedSystemCPUs: reservedSystemCPUs,
HardEvictionThresholds: hardEvictionThresholds,
},
QOSReserved: *experimentalQOSReserved,
ExperimentalCPUManagerPolicy: s.CPUManagerPolicy,
ExperimentalCPUManagerPolicyOptions: cpuManagerPolicyOptions,
ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
ExperimentalMemoryManagerPolicy: s.MemoryManagerPolicy,
ExperimentalMemoryManagerReservedMemory: s.ReservedMemory,
ExperimentalPodPidsLimit: s.PodPidsLimit,
EnforceCPULimits: s.CPUCFSQuota,
CPUCFSQuotaPeriod: s.CPUCFSQuotaPeriod.Duration,
ExperimentalTopologyManagerPolicy: s.TopologyManagerPolicy,
ExperimentalTopologyManagerScope: s.TopologyManagerScope,
},
s.FailSwapOn,
devicePluginEnabled,
kubeDeps.Recorder)
if err != nil {
return err
}
}
// 将kubelet进程设置OOM策略分数为指定分数
oomAdjuster := kubeDeps.OOMAdjuster
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
klog.InfoS("Failed to ApplyOOMScoreAdj", "err", err)
}
// 运行时,例如docker,创建docker客户端连接器。
err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration, kubeDeps, s.RemoteRuntimeEndpoint, s.RemoteImageEndpoint)
if err != nil {
return err
}
// 运行kubelet
if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
return err
}
if s.HealthzPort > 0 {
mux := http.NewServeMux()
healthz.InstallHandler(mux)
go wait.Until(func() {
err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
if err != nil {
klog.ErrorS(err, "Failed to start healthz server")
}
}, 5*time.Second, wait.NeverStop)
}
return nil
}
重要组件kubelet.Server
启动参数的配置含义请参考下文提供的链接进行了解。
下面我们介绍一下kubelet.Dependencies 这个结构体。
kubelet.Dependencies
该结构体主要存储了kubelet全局配置,比如kubeclient,auth,CAdvisor,EventClient等,翻译上是kubelet依赖,那么也等于kubelet的一些通用配置。
CAdvisor
主要提供容器静态信息,状态信息,以及设定的监控指标信息。
监控cgroupPath 根据此判断容器是否创建。
func (m *manager) createContainerLocked(containerName string, watchSource watcher.ContainerWatchSource) error {
namespacedName := namespacedContainerName{
Name: containerName,
}
// Check that the container didn't already exist.
if _, ok := m.containers[namespacedName]; ok {
return nil
}
handler, accept, err := container.NewContainerHandler(containerName, watchSource, m.containerEnvMetadataWhiteList, m.inHostNamespace)
if err != nil {
return err
}
if !accept {
// ignoring this container.
klog.V(4).Infof("ignoring container %q", containerName)
return nil
}
collectorManager, err := collector.NewCollectorManager()
if err != nil {
return err
}
logUsage := *logCadvisorUsage && containerName == m.cadvisorContainer
cont, err := newContainerData(containerName, m.memoryCache, handler, logUsage, collectorManager, m.maxHousekeepingInterval, m.allowDynamicHousekeeping, clock.RealClock{})
if err != nil {
return err
}
if !cgroups.IsCgroup2UnifiedMode() {
devicesCgroupPath, err := handler.GetCgroupPath("devices")
if err != nil {
klog.Warningf("Error getting devices cgroup path: %v", err)
} else {
cont.nvidiaCollector, err = m.nvidiaManager.GetCollector(devicesCgroupPath)
if err != nil {
klog.V(4).Infof("GPU metrics may be unavailable/incomplete for container %s: %s", cont.info.Name, err)
}
}
}
if m.includedMetrics.Has(container.PerfMetrics) {
perfCgroupPath, err := handler.GetCgroupPath("perf_event")
if err != nil {
klog.Warningf("Error getting perf_event cgroup path: %q", err)
} else {
cont.perfCollector, err = m.perfManager.GetCollector(perfCgroupPath)
if err != nil {
klog.Errorf("Perf event metrics will not be available for container %q: %v", containerName, err)
}
}
}
if m.includedMetrics.Has(container.ResctrlMetrics) {
cont.resctrlCollector, err = m.resctrlManager.GetCollector(containerName, func() ([]string, error) {
return cont.getContainerPids(m.inHostNamespace)
}, len(m.machineInfo.Topology))
if err != nil {
klog.V(4).Infof("resctrl metrics will not be available for container %s: %s", cont.info.Name, err)
}
}
// Add collectors
labels := handler.GetContainerLabels()
collectorConfigs := collector.GetCollectorConfigs(labels)
err = m.registerCollectors(collectorConfigs, cont)
if err != nil {
klog.Warningf("Failed to register collectors for %q: %v", containerName, err)
}
// Add the container name and all its aliases. The aliases must be within the namespace of the factory.
m.containers[namespacedName] = cont
for _, alias := range cont.info.Aliases {
m.containers[namespacedContainerName{
Namespace: cont.info.Namespace,
Name: alias,
}] = cont
}
klog.V(3).Infof("Added container: %q (aliases: %v, namespace: %q)", containerName, cont.info.Aliases, cont.info.Namespace)
contSpec, err := cont.handler.GetSpec()
if err != nil {
return err
}
contRef, err := cont.handler.ContainerReference()
if err != nil {
return err
}
newEvent := &info.Event{
ContainerName: contRef.Name,
Timestamp: contSpec.CreationTime,
EventType: info.EventContainerCreation,
}
err = m.eventHandler.AddEvent(newEvent)
if err != nil {
return err
}
// Start the container's housekeeping.
return cont.Start()
}
逻辑为监听目录也就是监听容器创建事件(这里只分析创建),然后为其创建一个ContainerHandler,该结构体主要实现以下方法
- ContainerReference()
- GetSpec()
- GetStats()
- ListContainers()
- …
这些值对应了cri 架构。
ContainerManager
qosContainerManager
它的作用是为pod提供创建cgroup的功能,通过cgroupManager.create 创建根Cgroupboot(/kubepods)中的Burstable,BestEffort,以及cpu,内存等.
[root@master01 cgroup]# ls
blkio cpu cpuacct cpu,cpuacct cpuset devices freezer hugetlb memory net_cls net_cls,net_prio net_prio perf_event pids systemd
注意上面的文件,当设置资源限额后,如果是pod里设置的kubepods下面的cpu,memory等等值是由全部pod的资源总和动态更新的(对的这里是动态更新的)。
至于pod里的资源限制 是在下面的文件夹中设置,可以看到是通过kubepods-besteffort , kubepods-burstable分开设置的。
[root@master01 cgroup]# ls cpu/kubepods.slice/kubepods-besteffort.slice/
cgroup.clone_children cpuacct.usage_percpu cpu.shares kubepods-besteffort-pod93e35254_9260_4f0b_a89d_af89e95e2f24.slice tasks
启动后,会定时获取pod然后根据其pod分析资源限制的类型,然后更新到系统中。
topologyManager
deviceManager
从Kubernetes 1.8开始,官方推荐使用Device Plugins方式来使用GPU、FPGA、NIC、InfiniBand等高性能硬件。
创建了一个server供硬件插件进行注册。start主要是对server进行启动监听unix套接字。
最主要的是Allocate(),它是对插件进行调用,申请资源的方法。
// 类似于过滤器,先进行申请资源,如果能够申请下来则说明该pod可以在node上部署
func (m *resourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
pod := attrs.Pod
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
err := m.deviceManager.Allocate(pod, &container)
if err != nil {
return admission.GetPodAdmitResult(err)
}
if m.cpuManager != nil {
err = m.cpuManager.Allocate(pod, &container)
if err != nil {
return admission.GetPodAdmitResult(err)
}
}
if m.memoryManager != nil {
err = m.memoryManager.Allocate(pod, &container)
if err != nil {
return admission.GetPodAdmitResult(err)
}
}
}
return admission.GetPodAdmitResult(nil)
}
cpuManager
cset := m.state.GetCPUSetOrDefault(string(pod.UID), container.Name)
lcset := m.lastUpdateState.GetCPUSetOrDefault(string(pod.UID), container.Name)
if !cset.Equals(lcset) {
err = m.updateContainerCPUSet(containerID, cset)
}
m.lastUpdateState.SetCPUSet(string(pod.UID), container.Name, cset)
func (m *manager) updateContainerCPUSet(containerID string, cpus cpuset.CPUSet) error {
// TODO: Consider adding a `ResourceConfigForContainer` helper in
// helpers_linux.go similar to what exists for pods.
// It would be better to pass the full container resources here instead of
// this patch-like partial resources.
return m.containerRuntime.UpdateContainerResources(
containerID,
&runtimeapi.LinuxContainerResources{
CpusetCpus: cpus.String(),
})
}
memoryManager
它主要是协助qos来使用的。
CPU与Memory主要解决的是NUMA机制带来的性能问题,如果没有NUMA,可以关闭他们,单机上的cpu,memory是通过qos进行管理的。
Kubelet
PodConfig
该组件的主要作用是监听pod资源,使用spec.nodename作为过滤条件,检索调度到本地的pod资源,然后对其进行操作,比如运行容器等等。
secretManager
与configmap 异曲同工,功能一致。
设置了获取secret与configmap的方法,与informer机制类似,可以说是一个翻版
configMapManager
livenessManager
readinessManager
startupManager
podCache
缓存了pod的status信息,主要用于pleg
没有start方法,那么也就是说它只是一个存储器,存储了pod对应的status信息
type Cache interface {
Get(types.UID) (*PodStatus, error)
Set(types.UID, *PodStatus, error, time.Time)
// GetNewerThan is a blocking call that only returns the status
// when it is newer than the given time.
GetNewerThan(types.UID, time.Time) (*PodStatus, error)
Delete(types.UID)
UpdateTime(time.Time)
}
podManager
pod管理器,记录了当前kubelet中所有pod信息
在每次添加pod触发sysnloop中,都会调用podManager.set(pod) 将该pod添加到管理器中,而其余组件获取pod都是通过该管理器进行获取
statusManager
状态管理器,主要用来更新pod的状态到apiserver。
启动后,会监听管道,触发事件,事件就是更新pod状态到apiserver。管道可以通过setPodStatus()进行触发
podWorkers
该方法主要对更改后的pod进行一系列的操作。其主要被sysnloop进行调用,是核心的核心。
klet.podWorkers = newPodWorkers(
klet.syncPod,
klet.syncTerminatingPod,
klet.syncTerminatedPod,
kubeDeps.Recorder,
klet.workQueue,
klet.resyncInterval,
backOffPeriod,
klet.podCache,
)
runtimeService
运行时client
也就是CRI
containerRuntime
容器运行时,是对上面的一个封装
runtimeState
主要记录运行时的网络,存储的状态,是这个运行时的状态而不是里面启动的容器的状态。
s, err := kl.containerRuntime.Status()
networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)
if networkReady == nil || !networkReady.Status {
kl.runtimeState.setNetworkState(fmt.Errorf("container runtime network not ready: %v", networkReady))
containerLogManager
容器日志管理器
StatsProvider
获取容器运行时的状态信息,比如cpu的使用情况,内存使用情况等,要比pleg全面,pleg仅仅关心pod是否runing或者create还是delete这几个状态。可以说是对上面status管理器的一个封装。
func (p *Provider) GetContainerInfo(podFullName string, podUID types.UID, containerName string, req *cadvisorapiv1.ContainerInfoRequest) (*cadvisorapiv1.ContainerInfo, error) {
podUID = types.UID(p.podManager.TranslatePodUID(podUID))
pods, err := p.runtimeCache.GetPods()
pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
container := pod.FindContainerByName(containerName)
ci, err := p.cadvisor.DockerContainer(container.ID.ID, req)
return &ci, nil
}
因为本身是状态组件的一个封装,所以不需要start
pleg
pod生命周期管理。
获取当前pod的状态(也就是pod中的容器的状态),然后根据状态作出响应。
func (g *GenericPLEG) Start() {
go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
}
containerGC
容器GC处理
func (cgc *containerGC) GarbageCollect(gcPolicy kubecontainer.GCPolicy, allSourcesReady bool, evictNonDeletedPods bool) error {
errors := []error{}
// Remove evictable containers
if err := cgc.evictContainers(gcPolicy, allSourcesReady, evictNonDeletedPods); err != nil {
errors = append(errors, err)
}
// Remove sandboxes with zero containers
if err := cgc.evictSandboxes(evictNonDeletedPods); err != nil {
errors = append(errors, err)
}
// Remove pod sandbox log directory
if err := cgc.evictPodLogsDirectories(allSourcesReady); err != nil {
errors = append(errors, err)
}
return utilerrors.NewAggregate(errors)
}
go wait.Until(func() {
if err := kl.containerGC.GarbageCollect(); err != nil {
} else {
}
}, ContainerGCPeriod, wait.NeverStop)
imageManager
镜像管理器,主要是镜像GC
go wait.Until(func() {
if err := kl.imageManager.GarbageCollect(); err != nil {
} else {
}
}, ImageGCPeriod, wait.NeverStop)
serverCertificateManager
轮换证书
ProbeManager
探针管理器
管理器的作用是,将pod根据探针类型(就绪,安装,存活)创建work
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
handleProbeSync(kl, update, handler, "liveness", "unhealthy")
}
重新进行部署这个pod了
而就绪探针 直接更新apiserver里的status
switch probeType {
case readiness:
w.spec = container.ReadinessProbe
w.resultsManager = m.readinessManager
w.initialValue = results.Failure
case liveness:
w.spec = container.LivenessProbe
w.resultsManager = m.livenessManager
w.initialValue = results.Success
case startup:
w.spec = container.StartupProbe
w.resultsManager = m.startupManager
w.initialValue = results.Unknown
}
三个类型都是由work实现的,resultsManager.set的时候就是将其发送给update。
tokenManager
主要获取SA ,主要用于volumePluginMgr中
volumePluginMgr
存储插件的管理
获取CSIDriver,该资源是配置csi插件的,比如配置attachRequired,skipAttach() ,跳过挂接操作,fsGroupPolicy ,定义底层卷是否支持在挂载之前更改卷的所有权和权限。
它是csi插件的主体,volumemanage 其实是调用它来实现功能。
pluginManager
对插件进行注册,创建csinode 资源,别的组件用它主要是获取插件csiClient,并且csinode资源是对当前插件的配置,比如说**allocatable.count,**这是一个节点上可使用的、由 CSI 驱动管理的独立卷个数的上限。调度器会根据当前值计算是否可以调度。
这里的csiClient主要用于运行NodeStageVolume,NodePublishVolume等方法
func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
highestSupportedVersion, err := h.validateVersions("RegisterPlugin", pluginName, endpoint, versions)
csiDrivers.Set(pluginName, Driver{
endpoint: endpoint,
highestSupportedVersion: highestSupportedVersion,
})
//
csi, err := newCsiDriverClient(csiDriverName(pluginName))
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
driverNodeID, maxVolumePerNode, accessibleTopology, err := csi.NodeGetInfo(ctx)
err = nim.InstallCSIDriver(pluginName, driverNodeID, maxVolumePerNode, accessibleTopology)
}
volumeManager
全局目录(deviceMountPath):/var/lib/kubelet/plugins/csi-nfsplugin/
容器卷管理是这样的,有两部分组成desiredStateOfWorldPopulator,reconciler,这与上面pluginManager机制类似,desiredStateOfWorldPopulator是想要实现的效果也就是pod将要挂载的卷,reconciler就是根据desiredStateOfWorld的值进行一系列的挂载操作。
根据pvc指定的容器卷从volumePluginMgr获取插件
运行该插件,Attacher()操作,目的是创建VolumeAttachment,它的作用是将指定卷挂接到指定节点,而csicontroller则需要对该资源进行监听然后进行挂载到物理机上等操作。最后调用的是NodeStageVolume
等待Attacher状态是成功,这里注意csi插件可能没有实现Attacher功能,它主要由external-attacher管理,如果csi没有实现,那么kubelet会自己进行挂载操作。MountDevice() 方法挂载到全局目录中
SetUp() 将全局目录挂载到pod中 最后调用的是NodePublishVolume方法
func (rc *reconciler) mountOrAttachVolumes() {
// Ensure volumes that should be attached/mounted are attached/mounted.
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, volumeToMount.PersistentVolumeSize)
volumeToMount.DevicePath = devicePath
if cache.IsVolumeNotAttachedError(err) {
rc.waitForVolumeAttach(volumeToMount)
} else if !volMounted || cache.IsRemountRequiredError(err) {
rc.mountAttachedVolumes(volumeToMount, err)
} else if cache.IsFSResizeRequiredError(err) {
fsResizeRequiredErr, _ := err.(cache.FsResizeRequiredError)
rc.expandVolume(volumeToMount, fsResizeRequiredErr.CurrentSize)
}
}
}
注意:对于动态创建pv来说,是需要通过pvControllerManager调用provision()->CreateVolume()在文件服务器上面创建相对应的子目录,而对于静态pv来说,因为本身不是动态分配资源是手工指定,那么也就不用动态在文件服务器上创建子目录(属于自己的目录),那么也就不会调用CreateVolume方法。直接走Attacher()与SetUp()即可
evictionManager
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) []*v1.Pod {
// if we have nothing to do, just return
// 资源阈值配置信息
thresholds := m.config.Thresholds
if m.dedicatedImageFs == nil {
// diskInfoProvider 就是StatsProvider
// 而StatsProvider 就是提供当前运行时的状态信息以及物理信息等.
hasImageFs, ok := diskInfoProvider.HasDedicatedImageFs()
if ok != nil {
return nil
}
m.dedicatedImageFs = &hasImageFs
// 根据资源类型存储pod排序方法
m.signalToRankFunc = buildSignalToRankFunc(hasImageFs)
// 根据资源类型存储相对应的GC方法
m.signalToNodeReclaimFuncs = buildSignalToNodeReclaimFuncs(m.imageGC, m.containerGC, hasImageFs)
}
// 获取所有pod的方法
activePods := podFunc()
updateStats := true
// 获取当前node信息以及pods 信息状态
summary, err := m.summaryProvider.Get(updateStats)
if err != nil {
klog.ErrorS(err, "Eviction manager: failed to get summary stats")
return nil
}
// make observations and get a function to derive pod usage stats relative to those observations.
// 创建一个照影,通过分析summary中的节点,pod信息,进行整合,比如内存当前使用情况
observations, statsFunc := makeSignalObservations(summary)
debugLogObservations("observations", observations)
// determine the set of thresholds met independent of grace period
// 这里判断资源使用情况是否到达阈值
thresholds = thresholdsMet(thresholds, observations, false)
debugLogThresholdsWithObservation("thresholds - ignoring grace period", thresholds, observations)
// 与上次的observations进行比对,如果两个资源一致则不用驱逐,所以可以消除它的处理
if len(m.thresholdsMet) > 0 {
thresholdsNotYetResolved := thresholdsMet(m.thresholdsMet, observations, true)
thresholds = mergeThresholds(thresholds, thresholdsNotYetResolved)
}
// rank the thresholds by eviction priority
sort.Sort(byEvictionPriority(thresholds))
thresholdToReclaim, resourceToReclaim, foundAny := getReclaimableThreshold(thresholds)
// 这里是先用GC处理器运行一遍查看是否到达阈值,如果还到达阈值那就开始进行驱逐
m.reclaimNodeLevelResources(thresholdToReclaim.Signal, resourceToReclaim)
// 根据资源类型获取驱逐排序队列
rank, ok := m.signalToRankFunc[thresholdToReclaim.Signal]
// 对当前pods进行排序
rank(activePods, statsFunc)
// 进行pod驱逐
for i := range activePods {
pod := activePods[i]
gracePeriodOverride := int64(0)
if !isHardEvictionThreshold(thresholdToReclaim) {
gracePeriodOverride = m.config.MaxPodGracePeriodSeconds
}
message, annotations := evictionMessage(resourceToReclaim, pod, statsFunc)
if m.evictPod(pod, gracePeriodOverride, message, annotations) {
metrics.Evictions.WithLabelValues(string(thresholdToReclaim.Signal)).Inc()
return []*v1.Pod{pod}
}
}
return nil
}
shutdownManager
优雅关闭kubelet,在关闭过程中使用adim禁止一切新pod
setNodeStatusFuncs
设置node的状态
tryUpdateNodeStatus 将node状态更新到apiserver
go wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop)
func (kl *Kubelet) setNodeStatus(node *v1.Node) {
for i, f := range kl.setNodeStatusFuncs {
klog.V(5).InfoS("Setting node status condition code", "position", i, "node", klog.KObj(node))
if err := f(node); err != nil {
klog.ErrorS(err, "Failed to set some node status fields", "node", klog.KObj(node))
}
}
}
func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {
// if cloud is not nil, we expect the cloud resource sync manager to exist
var nodeAddressesFunc func() ([]v1.NodeAddress, error)
if kl.cloud != nil {
nodeAddressesFunc = kl.cloudResourceSyncManager.NodeAddresses
}
var validateHostFunc func() error
if kl.appArmorValidator != nil {
validateHostFunc = kl.appArmorValidator.ValidateHost
}
var setters []func(n *v1.Node) error
setters = append(setters,
nodestatus.NodeAddress(kl.nodeIPs, kl.nodeIPValidator, kl.hostname, kl.hostnameOverridden, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc),
nodestatus.MachineInfo(string(kl.nodeName), kl.maxPods, kl.podsPerCore, kl.GetCachedMachineInfo, kl.containerManager.GetCapacity,
kl.containerManager.GetDevicePluginResourceCapacity, kl.containerManager.GetNodeAllocatableReservation, kl.recordEvent),
nodestatus.VersionInfo(kl.cadvisor.VersionInfo, kl.containerRuntime.Type, kl.containerRuntime.Version),
nodestatus.DaemonEndpoints(kl.daemonEndpoints),
nodestatus.Images(kl.nodeStatusMaxImages, kl.imageManager.GetImageList),
nodestatus.GoRuntime(),
)
// Volume limits
setters = append(setters, nodestatus.VolumeLimits(kl.volumePluginMgr.ListVolumePluginWithLimits))
setters = append(setters,
nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent),
nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent),
nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent),
nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, kl.runtimeState.storageErrors, validateHostFunc, kl.containerManager.Status, kl.shutdownManager.ShutdownStatus, kl.recordNodeStatusEvent),
nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse),
// TODO(mtaufen): I decided not to move this setter for now, since all it does is send an event
// and record state back to the Kubelet runtime object. In the future, I'd like to isolate
// these side-effects by decoupling the decisions to send events and partial status recording
// from the Node setters.
kl.recordNodeSchedulableEvent,
)
return setters
}
扩展
容器运行目录
容器默认使用的是overlayfs 格式,它是一个分层的文件系统,分为lower(基础文件目录层,该层是只读的),upper(可写层,该容器修改的内容都记录在该层里)
merge 则是把upper目录和lowerer目录合,这个合并规则就是overlayfs要实现的逻辑。
在使用如上mount进行OverlayFS合并之后,遵循如下规则:
•lowerdir和upperdir两个目录存在同名文件时,lowerdir的文件将会被隐藏,用户只能看到upperdir的文件。
•lowerdir低优先级的同目录同名文件将会被隐藏。
•如果存在同名目录,那么lowerdir和upperdir目录中的内容将会合并。
•当用户修改mergedir中来自upperdir的数据时,数据将直接写入upperdir中原来目录中,删除文件也同理。
•当用户修改mergedir中来自lowerdir的数据时,lowerdir中内容均不会发生任何改变。因为lowerdir是只读的,用户想修改来自lowerdir数据时,overlayfs会首先拷贝一份lowerdir中文件副本到upperdir中(这也被称作OverlayFS的copy-up特性)。后续修改或删除将会在upperdir下的副本中进行,lowerdir中原文件将会被隐藏。
•如果某一个目录单纯来自lowerdir或者lowerdir和upperdir合并,默认无法进行rename系统调用。但是可以通过mv重命名。如果要支持rename,需要CONFIG_OVERLAY_FS_REDIRECT_DIR。
合并规则
- 如果 upper 和 lower 目录下同时存在同一文件,那么按 upper 目录的文件为准。比如 upper 与 lower 目录下同时存在文件 a.txt,那么按 upper 目录的 a.txt 文件为准。
- 如果 upper 和 lower 目录下同时存在同一目录,那么把 upper 目录与 lower 目录的内容合并起来。比如 upper 与 lower 目录下同时存在目录 test,那么把 upper 目录下的 test 目录中的内容与 lower 目录下的 test 目录中的内容合并起来。
overlay on # 解释了挂载到下面的这个目录的是什么文件
/run/containerd/io.containerd.runtime.v2.task/k8s.io/0aefb24c56bbfd8ebf42c0f9e11b627cfb31207de0c12dff99af1b8b3637cb88/rootfs # merged 层,也就是合并展示层
type overlay # 说明挂载的类型是 overlay
(rw, # 读写
relatime,
lowerdir=/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/18/fs:
/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/17/fs:
/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/16/fs, # lowerdir层,不可层
upperdir=/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/1037/fs,
# 可写层
workdir=/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/1037/work
# 合并层,只是我们看不到然后挂载到了merged层中。
)
参考文献
https://kubernetes.io/zh-cn/docs/tasks/administer-cluster/kubelet-config-file/ (kubelet 配置说明)
https://cloud.tencent.com/developer/article/1804676 (qosContainerManager)
https://blog.csdn.net/qjm1993/article/details/103237944 (topology manager 讲解)
https://blog.csdn.net/kyle18826138721/article/details/117636884 (volume manager)
https://blog.csdn.net/kyle18826138721/article/details/118268855 (csi driver注册分析)
https://blog.csdn.net/qq_34556414/article/details/120004267(csi 原因)
https://blog.csdn.net/kyle18826138721/article/details/117386142(AD Cotroller与kubelet中的volume manager逻辑相似,都可以做Attach/Detach操作,但是kube-controller-manager与kubelet中,只会有一个组件做Attach/Detach操作,通过kubelet启动参数–enable-controller-attach-detach设置。设置为 true 表示启用kube-controller-manager的AD controller来做Attach/Detach操作,同时禁用 kubelet 执行 Attach/Detach 操作(默认值为 true))
https://blog.hdls.me/16255765577465.html (csi讲解)
https://zhuanlan.zhihu.com/p/415765175(overlayfs 结构讲解)
https://cloud.tencent.com/developer/article/1684523(OverlayFS设计与实现)
https://zhuanlan.zhihu.com/p/436450556(OverlayFS文件删除原理)
更多推荐
所有评论(0)