本博客分析的kubernetes的源码是v1.11.0

kubelet执行StartGarbageCollection的时序图

grpic

通过时序图,就可以看得出启动GC功能的步骤了

具体实现的逻辑代码 k8s.io/kubernetes/pkg/kubelet/kubelet.go

// StartGarbageCollection starts garbage collection threads.
func (kl *Kubelet) StartGarbageCollection() {
 //容器垃圾回收机制
    loggedContainerGCFailure := false
    go wait.Until(func() {
        if err := kl.containerGC.GarbageCollect(); err != nil {
            glog.Errorf("Container garbage collection failed: %v", err)
            kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ContainerGCFailed, err.Error())
            loggedContainerGCFailure = true
        } else {
            var vLevel glog.Level = 4
            if loggedContainerGCFailure {
                vLevel = 1
                loggedContainerGCFailure = false
            }

            glog.V(vLevel).Infof("Container garbage collection succeeded")
        }
    }, ContainerGCPeriod, wait.NeverStop)

    stopChan := make(chan struct{})
    defer close(stopChan)
    // when the high threshold is set to 100, stub the image GC manager
    if kl.kubeletConfiguration.ImageGCHighThresholdPercent == 100 {
        glog.V(2).Infof("ImageGCHighThresholdPercent is set 100, Disable image GC")
        go func() { stopChan <- struct{}{} }()
    }

    //镜像垃圾回收机制
    prevImageGCFailed := false
    go wait.Until(func() {
        if err := kl.imageManager.GarbageCollect(); err != nil {
            if prevImageGCFailed {
                glog.Errorf("Image garbage collection failed multiple times in a row: %v", err)
                // Only create an event for repeated failures
                kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ImageGCFailed, err.Error())
            } else {
                glog.Errorf("Image garbage collection failed once. Stats initialization may not have completed yet: %v", err)
            }
            prevImageGCFailed = true
        } else {
            var vLevel glog.Level = 4
            if prevImageGCFailed {
                vLevel = 1
                prevImageGCFailed = false
            }

            glog.V(vLevel).Infof("Image garbage collection succeeded")
        }
    }, ImageGCPeriod, stopChan)
}

首先来看容器垃圾回收机制


容器垃圾回收机制

相关代码

go wait.Until(func() {
        if err := kl.containerGC.GarbageCollect(); err != nil {
            glog.Errorf("Container garbage collection failed: %v", err)
            kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ContainerGCFailed, err.Error())
            loggedContainerGCFailure = true
        } else {
            var vLevel glog.Level = 4
            if loggedContainerGCFailure {
                vLevel = 1
                loggedContainerGCFailure = false
            }

            glog.V(vLevel).Infof("Container garbage collection succeeded")
        }
    }, ContainerGCPeriod, wait.NeverStop)

从代码可以看出容器的垃圾回收机制是没2min执行一次,虽然周期ContainerGCPeriod=1min,但是因为使用了抖动机制,实际上是每2min执行一次垃圾回收机制

结构体 realContainerGC实现了ContainerGC接口,相关的代码在k8s.io/kubernetes/pkg/kubelet/container/container_gc.go 文件中

ContainerGC 接口的方法有两个

// Implementation is thread-compatible.
type ContainerGC interface {
    // Garbage collect containers.
    GarbageCollect() error
    // Deletes all unused containers, including containers belonging to pods that are terminated but not deleted
    DeleteAllUnusedContainers() error
}

结构体realContainerGC

type realContainerGC struct {
    // Container runtime
    runtime Runtime

    // Policy for garbage collection.
    policy ContainerGCPolicy

    // sourcesReadyProvider provides the readiness of kubelet configuration sources.
    sourcesReadyProvider SourcesReadyProvider
}

该结构体实现了ContainerGC接口的两个方法
具体实现为

func (cgc *realContainerGC) GarbageCollect() error {
    return cgc.runtime.GarbageCollect(cgc.policy, cgc.sourcesReadyProvider.AllReady(), false)
}

func (cgc *realContainerGC) DeleteAllUnusedContainers() error {
    glog.Infof("attempting to delete unused containers")
    return cgc.runtime.GarbageCollect(cgc.policy, cgc.sourcesReadyProvider.AllReady(), true)
}

可以看得出,都是通过runtime.GarbageCollect接口实现的具体逻辑,调用的方式一个传false 一个传true

所以,我们有必要查看cgc.runtime.GarbageCollect的实现方式
为了查看cgc.runtime.GarbageCollect方法的具体实现逻辑代码,就必须知道函数NewContainerGC是在哪里调用的,传的是如何传的
经查看,NewContainerGC是在函数 NewMainKubelet中 调用的,具体实现逻辑代码在k8s.io/kubernetes/pkg/kubelet/kubelet.go 文件的708
而生成klet.containerRuntime这个对象是在k8s.io/kubernetes/pkg/kubelet/kubelet.go文件中 657-676行New出来的,具体的实现函数NewKubeGenericRuntimeManager在k8s.io/kubernetes/pkg/kubelet/kuberuntime/kuberuntime_manager.go中135

runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
        kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
        klet.livenessManager,
        seccompProfileRoot,
        containerRefManager,
        machineInfo,
        klet,
        kubeDeps.OSInterface,
        klet,
        httpClient,
        imageBackOff,
        kubeCfg.SerializeImagePulls,
        float32(kubeCfg.RegistryPullQPS),
        int(kubeCfg.RegistryBurst),
        kubeCfg.CPUCFSQuota,
        runtimeService,
        imageService,
        kubeDeps.ContainerManager.InternalContainerLifecycle(),
        legacyLogProvider,
    )
    if err != nil {
        return nil, err
    }
    klet.containerRuntime = runtime
    klet.streamingRuntime = runtime
    klet.runner = runtime

NewKubeGenericRuntimeManager最终返回实现了KubeGenericRuntime接口的 kubeGenericRuntimeManager结构体

分析函数NewKubeGenericRuntimeManager
代码

func NewKubeGenericRuntimeManager(
    recorder record.EventRecorder,
    livenessManager proberesults.Manager,
    seccompProfileRoot string,
    containerRefManager *kubecontainer.RefManager,
    machineInfo *cadvisorapi.MachineInfo,
    podStateProvider podStateProvider,
    osInterface kubecontainer.OSInterface,
    runtimeHelper kubecontainer.RuntimeHelper,
    httpClient types.HttpGetter,
    imageBackOff *flowcontrol.Backoff,
    serializeImagePulls bool,
    imagePullQPS float32,
    imagePullBurst int,
    cpuCFSQuota bool,
    runtimeService internalapi.RuntimeService,
    imageService internalapi.ImageManagerService,
    internalLifecycle cm.InternalContainerLifecycle,
    legacyLogProvider LegacyLogProvider,
) (KubeGenericRuntime, error) {
    kubeRuntimeManager := &kubeGenericRuntimeManager{
        recorder:            recorder,
        cpuCFSQuota:         cpuCFSQuota,
        seccompProfileRoot:  seccompProfileRoot,
        livenessManager:     livenessManager,
        containerRefManager: containerRefManager,
        machineInfo:         machineInfo,
        osInterface:         osInterface,
        runtimeHelper:       runtimeHelper,
        runtimeService:      newInstrumentedRuntimeService(runtimeService),
        imageService:        newInstrumentedImageManagerService(imageService),
        keyring:             credentialprovider.NewDockerKeyring(),
        internalLifecycle:   internalLifecycle,
        legacyLogProvider:   legacyLogProvider,
    }

    typedVersion, err := kubeRuntimeManager.runtimeService.Version(kubeRuntimeAPIVersion)
    if err != nil {
        glog.Errorf("Get runtime version failed: %v", err)
        return nil, err
    }

    // Only matching kubeRuntimeAPIVersion is supported now
    // TODO: Runtime API machinery is under discussion at https://github.com/kubernetes/kubernetes/issues/28642
    if typedVersion.Version != kubeRuntimeAPIVersion {
        glog.Errorf("Runtime api version %s is not supported, only %s is supported now",
            typedVersion.Version,
            kubeRuntimeAPIVersion)
        return nil, ErrVersionNotSupported
    }

    kubeRuntimeManager.runtimeName = typedVersion.RuntimeName
    glog.Infof("Container runtime %s initialized, version: %s, apiVersion: %s",
        typedVersion.RuntimeName,
        typedVersion.RuntimeVersion,
        typedVersion.RuntimeApiVersion)

    // If the container logs directory does not exist, create it.
    // TODO: create podLogsRootDirectory at kubelet.go when kubelet is refactored to
    // new runtime interface
    if _, err := osInterface.Stat(podLogsRootDirectory); os.IsNotExist(err) {
        if err := osInterface.MkdirAll(podLogsRootDirectory, 0755); err != nil {
            glog.Errorf("Failed to create directory %q: %v", podLogsRootDirectory, err)
        }
    }

    kubeRuntimeManager.imagePuller = images.NewImageManager(
        kubecontainer.FilterEventRecorder(recorder),
        kubeRuntimeManager,
        imageBackOff,
        serializeImagePulls,
        imagePullQPS,
        imagePullBurst)
    kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(httpClient, kubeRuntimeManager, kubeRuntimeManager)
    kubeRuntimeManager.containerGC = NewContainerGC(runtimeService, podStateProvider, kubeRuntimeManager)

    kubeRuntimeManager.versionCache = cache.NewObjectCache(
        func() (interface{}, error) {
            return kubeRuntimeManager.getTypedVersion()
        },
        versionCacheTTL,
    )

    return kubeRuntimeManager, nil
}

NewKubeGenericRuntimeManager函数的入参可以知道
该函数拥有EventRecorder时间记录的功能,liveness探针 容器映射管理,获取机器信息MachineInfo的功能,pod的状态podStateProvider 镜像pull流量管理(QPS) 镜像服务imageService 容器的生病周期以及日志管理,镜像back off等功能

具体实现GarbageCollect函数的逻辑代码在k8s.io/kubernetes/pkg/kubelet/kuberuntime/kuberuntime_manager.go文件 897-899

// GarbageCollect removes dead containers using the specified container gc policy.
func (m *kubeGenericRuntimeManager) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSourcesReady bool, evictNonDeletedPods bool) error {
    return m.containerGC.GarbageCollect(gcPolicy, allSourcesReady, evictNonDeletedPods)
}

真正调用的垃圾回收机制的具体实现是containerGC对象的GarbageCollect方法,
根据传入的gcPolicy evictNonDeletedPods的值对容器进行清理操作

containerGC对象的GarbageCollect方法的代码实现在k8s.io/kubernetes/pkg/kubelet/kuberuntime/kuberuntime_gc.go文件381

// GarbageCollect removes dead containers using the specified container gc policy.
// Note that gc policy is not applied to sandboxes. Sandboxes are only removed when they are
// not ready and containing no containers.
//
// GarbageCollect consists of the following steps:
// * gets evictable containers which are not active and created more than gcPolicy.MinAge ago.
// * removes oldest dead containers for each pod by enforcing gcPolicy.MaxPerPodContainer.
// * removes oldest dead containers by enforcing gcPolicy.MaxContainers.
// * gets evictable sandboxes which are not ready and contains no containers.
// * removes evictable sandboxes.
func (cgc *containerGC) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSourcesReady bool, evictTerminatedPods bool) error {
    // Remove evictable containers
    if err := cgc.evictContainers(gcPolicy, allSourcesReady, evictTerminatedPods); err != nil {
        return err
    }

    // Remove sandboxes with zero containers
    //删除没有容器的sandbox
    if err := cgc.evictSandboxes(evictTerminatedPods); err != nil {
        return err
    }

    // Remove pod sandbox log directory
    //删除pod的日志
    return cgc.evictPodLogsDirectories(allSourcesReady)
}

根据代码注释,我们可以知道执行的逻辑

GarbageCollect使用指定的容器gc policy策略删除死掉的容器。
请注意,gc策略不适用于沙箱。 sandboxes仅在not ready且不包含容器的情况下被删除
 GarbageCollect包含以下步骤:
  1.获取比gcPolicy.MinAge更长的且死掉的容器。
  2.通过强制执行gcPolicy.MaxPerPodContainer来删除每个pod的最旧的死容器。
  3.通过强制执行gcPolicy.MaxContainers删除最旧的死容器。
  对sandboxes的垃圾回收处理逻辑
  1.获得没有准备好且不包含容器的可迁移沙箱。
  2.删除可迁移的沙箱。

这个就是容器的垃圾回收机制的执行逻辑

接下来分析containerGC.evictContainers的实现逻辑 k8s.io/kubernetes/pkg/kubelet/kuberuntime/kuberuntime_gc.go文件205-249

// evict all containers that are evictable
func (cgc *containerGC) evictContainers(gcPolicy kubecontainer.ContainerGCPolicy, allSourcesReady bool, evictTerminatedPods bool) error {
    // Separate containers by evict units.
    //获取待删除的所有的容器列表:包括不运行的container以及比容器策略设定的最小时长早的container
    evictUnits, err := cgc.evictableContainers(gcPolicy.MinAge)
    if err != nil {
        return err
    }

    // Remove deleted pod containers if all sources are ready.
    if allSourcesReady {
        for key, unit := range evictUnits {
            if cgc.podStateProvider.IsPodDeleted(key.uid) || (cgc.podStateProvider.IsPodTerminated(key.uid) && evictTerminatedPods) {
                cgc.removeOldestN(unit, len(unit)) // Remove all.
                delete(evictUnits, key)
            }
        }
    }

    // Enforce max containers per evict unit.
    if gcPolicy.MaxPerPodContainer >= 0 {
        cgc.enforceMaxContainersPerEvictUnit(evictUnits, gcPolicy.MaxPerPodContainer)
    }

    // Enforce max total number of containers.
    if gcPolicy.MaxContainers >= 0 && evictUnits.NumContainers() > gcPolicy.MaxContainers {
        // Leave an equal number of containers per evict unit (min: 1).
        numContainersPerEvictUnit := gcPolicy.MaxContainers / evictUnits.NumEvictUnits()
        if numContainersPerEvictUnit < 1 {
            numContainersPerEvictUnit = 1
        }
        cgc.enforceMaxContainersPerEvictUnit(evictUnits, numContainersPerEvictUnit)

        // If we still need to evict, evict oldest first.
        numContainers := evictUnits.NumContainers()
        if numContainers > gcPolicy.MaxContainers {
            flattened := make([]containerGCInfo, 0, numContainers)
            for key := range evictUnits {
                flattened = append(flattened, evictUnits[key]...)
            }
            sort.Sort(byCreated(flattened))
            //执行删除操作
            cgc.removeOldestN(flattened, numContainers-gcPolicy.MaxContainers)
        }
    }
    return nil
}

所有的删除操作都是通过函数containerGC.removeOldestN进行的

// removeOldestN removes the oldest toRemove containers and returns the resulting slice.
func (cgc *containerGC) removeOldestN(containers []containerGCInfo, toRemove int) []containerGCInfo {
    // Remove from oldest to newest (last to first).
    numToKeep := len(containers) - toRemove
    for i := len(containers) - 1; i >= numToKeep; i-- {
        if err := cgc.manager.removeContainer(containers[i].id); err != nil {
            glog.Errorf("Failed to remove container %q: %v", containers[i].id, err)
        }
    }

    // Assume we removed the containers so that we're not too aggressive.
    return containers[:numToKeep]
}

另外的函数containerGC.evictSandboxes 以及containerGC.evictPodLogsDirectories这两个函数都是在k8s.io/kubernetes/pkg/kubelet/kuberuntime/kuberuntime_gc.go文件中,分别在257-312316-355

代码evictSandboxes

// evictSandboxes remove all evictable sandboxes. An evictable sandbox must
// meet the following requirements:
//   1. not in ready state
//   2. contains no containers.
//   3. belong to a non-existent (i.e., already removed) pod, or is not the
//      most recently created sandbox for the pod.
func (cgc *containerGC) evictSandboxes(evictTerminatedPods bool) error {
    containers, err := cgc.manager.getKubeletContainers(true)
    if err != nil {
        return err
    }

    // collect all the PodSandboxId of container
    sandboxIDs := sets.NewString()
    for _, container := range containers {
        sandboxIDs.Insert(container.PodSandboxId)
    }

    sandboxes, err := cgc.manager.getKubeletSandboxes(true)
    if err != nil {
        return err
    }

    sandboxesByPod := make(sandboxesByPodUID)
    for _, sandbox := range sandboxes {
        podUID := types.UID(sandbox.Metadata.Uid)
        sandboxInfo := sandboxGCInfo{
            id:         sandbox.Id,
            createTime: time.Unix(0, sandbox.CreatedAt),
        }

        // Set ready sandboxes to be active.
        if sandbox.State == runtimeapi.PodSandboxState_SANDBOX_READY {
            sandboxInfo.active = true
        }

        // Set sandboxes that still have containers to be active.
        if sandboxIDs.Has(sandbox.Id) {
            sandboxInfo.active = true
        }

        sandboxesByPod[podUID] = append(sandboxesByPod[podUID], sandboxInfo)
    }

    // Sort the sandboxes by age.
    for uid := range sandboxesByPod {
        sort.Sort(sandboxByCreated(sandboxesByPod[uid]))
    }

    for podUID, sandboxes := range sandboxesByPod {
        if cgc.podStateProvider.IsPodDeleted(podUID) || (cgc.podStateProvider.IsPodTerminated(podUID) && evictTerminatedPods) {
            // Remove all evictable sandboxes if the pod has been removed.
            // Note that the latest dead sandbox is also removed if there is
            // already an active one.
            cgc.removeOldestNSandboxes(sandboxes, len(sandboxes))
        } else {
            // Keep latest one if the pod still exists.
            cgc.removeOldestNSandboxes(sandboxes, len(sandboxes)-1)
        }
    }
    return nil
}

evictPodLogsDirectories的代码

// evictPodLogsDirectories evicts all evictable pod logs directories. Pod logs directories
// are evictable if there are no corresponding pods.
func (cgc *containerGC) evictPodLogsDirectories(allSourcesReady bool) error {
    osInterface := cgc.manager.osInterface
    if allSourcesReady {
        // Only remove pod logs directories when all sources are ready.
        dirs, err := osInterface.ReadDir(podLogsRootDirectory)
        if err != nil {
            return fmt.Errorf("failed to read podLogsRootDirectory %q: %v", podLogsRootDirectory, err)
        }
        for _, dir := range dirs {
            name := dir.Name()
            podUID := types.UID(name)
            if !cgc.podStateProvider.IsPodDeleted(podUID) {
                continue
            }
            err := osInterface.RemoveAll(filepath.Join(podLogsRootDirectory, name))
            if err != nil {
                glog.Errorf("Failed to remove pod logs directory %q: %v", name, err)
            }
        }

        dirs, err = osInterface.ReadDir(podLogsFileDirectory)
        if err != nil {
            return fmt.Errorf("failed to read podLogsRootDirectory %q: %v", podLogsFileDirectory, err)
        }
        for _, dir := range dirs {
            name := dir.Name()
            uid := strings.Split(name, "_")
            if len(uid) > 2 {
                name = uid[len(uid)-2]
            }
            podUID := types.UID(name)
            if !cgc.podStateProvider.IsPodDeleted(podUID) {
                continue
            }
            err := osInterface.RemoveAll(filepath.Join(podLogsFileDirectory, dir.Name()))
            if err != nil {
                glog.Errorf("Failed to remove pod logs directory %q: %v", name, err)
            }
        }
    }

    // Remove dead container log symlinks.
    // TODO(random-liu): Remove this after cluster logging supports CRI container log path.
    logSymlinks, _ := osInterface.Glob(filepath.Join(legacyContainerLogsDir, fmt.Sprintf("*.%s", legacyLogSuffix)))
    for _, logSymlink := range logSymlinks {
        if _, err := osInterface.Stat(logSymlink); os.IsNotExist(err) {
            err := osInterface.Remove(logSymlink)
            if err != nil {
                glog.Errorf("Failed to remove container log dead symlink %q: %v", logSymlink, err)
            }
        }
    }
    return nil
}

至此,容器垃圾回收机制已经全部分析完成,每隔2min循环执行容器垃圾回收机制功能

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐