kubelet 垃圾回收机制分析-容器垃圾回收机制
本博客分析的kubernetes的源码是v1.11.0kubelet执行StartGarbageCollection的时序图通过时序图,就可以看得出启动GC功能的步骤了具体实现的逻辑代码 k8s.io/kubernetes/pkg/kubelet/kubelet.go// StartGarbageCollection starts garbage collection thre...
本博客分析的kubernetes的源码是v1.11.0
kubelet执行StartGarbageCollection的时序图
通过时序图,就可以看得出启动GC功能的步骤了
具体实现的逻辑代码 k8s.io/kubernetes/pkg/kubelet/kubelet.go
// StartGarbageCollection starts garbage collection threads.
func (kl *Kubelet) StartGarbageCollection() {
//容器垃圾回收机制
loggedContainerGCFailure := false
go wait.Until(func() {
if err := kl.containerGC.GarbageCollect(); err != nil {
glog.Errorf("Container garbage collection failed: %v", err)
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ContainerGCFailed, err.Error())
loggedContainerGCFailure = true
} else {
var vLevel glog.Level = 4
if loggedContainerGCFailure {
vLevel = 1
loggedContainerGCFailure = false
}
glog.V(vLevel).Infof("Container garbage collection succeeded")
}
}, ContainerGCPeriod, wait.NeverStop)
stopChan := make(chan struct{})
defer close(stopChan)
// when the high threshold is set to 100, stub the image GC manager
if kl.kubeletConfiguration.ImageGCHighThresholdPercent == 100 {
glog.V(2).Infof("ImageGCHighThresholdPercent is set 100, Disable image GC")
go func() { stopChan <- struct{}{} }()
}
//镜像垃圾回收机制
prevImageGCFailed := false
go wait.Until(func() {
if err := kl.imageManager.GarbageCollect(); err != nil {
if prevImageGCFailed {
glog.Errorf("Image garbage collection failed multiple times in a row: %v", err)
// Only create an event for repeated failures
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ImageGCFailed, err.Error())
} else {
glog.Errorf("Image garbage collection failed once. Stats initialization may not have completed yet: %v", err)
}
prevImageGCFailed = true
} else {
var vLevel glog.Level = 4
if prevImageGCFailed {
vLevel = 1
prevImageGCFailed = false
}
glog.V(vLevel).Infof("Image garbage collection succeeded")
}
}, ImageGCPeriod, stopChan)
}
首先来看容器垃圾回收机制
容器垃圾回收机制
相关代码
go wait.Until(func() {
if err := kl.containerGC.GarbageCollect(); err != nil {
glog.Errorf("Container garbage collection failed: %v", err)
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ContainerGCFailed, err.Error())
loggedContainerGCFailure = true
} else {
var vLevel glog.Level = 4
if loggedContainerGCFailure {
vLevel = 1
loggedContainerGCFailure = false
}
glog.V(vLevel).Infof("Container garbage collection succeeded")
}
}, ContainerGCPeriod, wait.NeverStop)
从代码可以看出容器的垃圾回收机制是没2min执行一次
,虽然周期ContainerGCPeriod
=1min,但是因为使用了抖动机制,实际上是每2min执行一次垃圾回收机制
结构体 realContainerGC
实现了ContainerGC
接口,相关的代码在k8s.io/kubernetes/pkg/kubelet/container/container_gc.go 文件中
ContainerGC 接口的方法有两个
// Implementation is thread-compatible.
type ContainerGC interface {
// Garbage collect containers.
GarbageCollect() error
// Deletes all unused containers, including containers belonging to pods that are terminated but not deleted
DeleteAllUnusedContainers() error
}
结构体realContainerGC
type realContainerGC struct {
// Container runtime
runtime Runtime
// Policy for garbage collection.
policy ContainerGCPolicy
// sourcesReadyProvider provides the readiness of kubelet configuration sources.
sourcesReadyProvider SourcesReadyProvider
}
该结构体实现了ContainerGC
接口的两个方法
具体实现为
func (cgc *realContainerGC) GarbageCollect() error {
return cgc.runtime.GarbageCollect(cgc.policy, cgc.sourcesReadyProvider.AllReady(), false)
}
func (cgc *realContainerGC) DeleteAllUnusedContainers() error {
glog.Infof("attempting to delete unused containers")
return cgc.runtime.GarbageCollect(cgc.policy, cgc.sourcesReadyProvider.AllReady(), true)
}
可以看得出,都是通过runtime.GarbageCollect
接口实现的具体逻辑,调用的方式一个传false
一个传true
所以,我们有必要查看cgc.runtime.GarbageCollect
的实现方式
为了查看cgc.runtime.GarbageCollect
方法的具体实现逻辑代码,就必须知道函数NewContainerGC
是在哪里调用的,传的是如何传的
经查看,NewContainerGC
是在函数 NewMainKubelet
中 调用的,具体实现逻辑代码在k8s.io/kubernetes/pkg/kubelet/kubelet.go 文件的708
行
而生成klet.containerRuntime
这个对象是在k8s.io/kubernetes/pkg/kubelet/kubelet.go文件中 657-676
行New出来的,具体的实现函数NewKubeGenericRuntimeManager
在k8s.io/kubernetes/pkg/kubelet/kuberuntime/kuberuntime_manager.go中135
行
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
seccompProfileRoot,
containerRefManager,
machineInfo,
klet,
kubeDeps.OSInterface,
klet,
httpClient,
imageBackOff,
kubeCfg.SerializeImagePulls,
float32(kubeCfg.RegistryPullQPS),
int(kubeCfg.RegistryBurst),
kubeCfg.CPUCFSQuota,
runtimeService,
imageService,
kubeDeps.ContainerManager.InternalContainerLifecycle(),
legacyLogProvider,
)
if err != nil {
return nil, err
}
klet.containerRuntime = runtime
klet.streamingRuntime = runtime
klet.runner = runtime
NewKubeGenericRuntimeManager
最终返回实现了KubeGenericRuntime接口的 kubeGenericRuntimeManager
结构体
分析函数NewKubeGenericRuntimeManager
代码
func NewKubeGenericRuntimeManager(
recorder record.EventRecorder,
livenessManager proberesults.Manager,
seccompProfileRoot string,
containerRefManager *kubecontainer.RefManager,
machineInfo *cadvisorapi.MachineInfo,
podStateProvider podStateProvider,
osInterface kubecontainer.OSInterface,
runtimeHelper kubecontainer.RuntimeHelper,
httpClient types.HttpGetter,
imageBackOff *flowcontrol.Backoff,
serializeImagePulls bool,
imagePullQPS float32,
imagePullBurst int,
cpuCFSQuota bool,
runtimeService internalapi.RuntimeService,
imageService internalapi.ImageManagerService,
internalLifecycle cm.InternalContainerLifecycle,
legacyLogProvider LegacyLogProvider,
) (KubeGenericRuntime, error) {
kubeRuntimeManager := &kubeGenericRuntimeManager{
recorder: recorder,
cpuCFSQuota: cpuCFSQuota,
seccompProfileRoot: seccompProfileRoot,
livenessManager: livenessManager,
containerRefManager: containerRefManager,
machineInfo: machineInfo,
osInterface: osInterface,
runtimeHelper: runtimeHelper,
runtimeService: newInstrumentedRuntimeService(runtimeService),
imageService: newInstrumentedImageManagerService(imageService),
keyring: credentialprovider.NewDockerKeyring(),
internalLifecycle: internalLifecycle,
legacyLogProvider: legacyLogProvider,
}
typedVersion, err := kubeRuntimeManager.runtimeService.Version(kubeRuntimeAPIVersion)
if err != nil {
glog.Errorf("Get runtime version failed: %v", err)
return nil, err
}
// Only matching kubeRuntimeAPIVersion is supported now
// TODO: Runtime API machinery is under discussion at https://github.com/kubernetes/kubernetes/issues/28642
if typedVersion.Version != kubeRuntimeAPIVersion {
glog.Errorf("Runtime api version %s is not supported, only %s is supported now",
typedVersion.Version,
kubeRuntimeAPIVersion)
return nil, ErrVersionNotSupported
}
kubeRuntimeManager.runtimeName = typedVersion.RuntimeName
glog.Infof("Container runtime %s initialized, version: %s, apiVersion: %s",
typedVersion.RuntimeName,
typedVersion.RuntimeVersion,
typedVersion.RuntimeApiVersion)
// If the container logs directory does not exist, create it.
// TODO: create podLogsRootDirectory at kubelet.go when kubelet is refactored to
// new runtime interface
if _, err := osInterface.Stat(podLogsRootDirectory); os.IsNotExist(err) {
if err := osInterface.MkdirAll(podLogsRootDirectory, 0755); err != nil {
glog.Errorf("Failed to create directory %q: %v", podLogsRootDirectory, err)
}
}
kubeRuntimeManager.imagePuller = images.NewImageManager(
kubecontainer.FilterEventRecorder(recorder),
kubeRuntimeManager,
imageBackOff,
serializeImagePulls,
imagePullQPS,
imagePullBurst)
kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(httpClient, kubeRuntimeManager, kubeRuntimeManager)
kubeRuntimeManager.containerGC = NewContainerGC(runtimeService, podStateProvider, kubeRuntimeManager)
kubeRuntimeManager.versionCache = cache.NewObjectCache(
func() (interface{}, error) {
return kubeRuntimeManager.getTypedVersion()
},
versionCacheTTL,
)
return kubeRuntimeManager, nil
}
从NewKubeGenericRuntimeManager
函数的入参可以知道
该函数拥有EventRecorder
时间记录的功能,liveness
探针 容器映射管理,获取机器信息MachineInfo
的功能,pod的状态podStateProvider
镜像pull流量管理(QPS) 镜像服务imageService
容器的生病周期以及日志管理,镜像back off
等功能
具体实现GarbageCollect
函数的逻辑代码在k8s.io/kubernetes/pkg/kubelet/kuberuntime/kuberuntime_manager.go文件 897-899
行
// GarbageCollect removes dead containers using the specified container gc policy.
func (m *kubeGenericRuntimeManager) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSourcesReady bool, evictNonDeletedPods bool) error {
return m.containerGC.GarbageCollect(gcPolicy, allSourcesReady, evictNonDeletedPods)
}
真正调用的垃圾回收机制的具体实现是containerGC
对象的GarbageCollect
方法,
根据传入的gcPolicy
evictNonDeletedPods
的值对容器进行清理操作
containerGC
对象的GarbageCollect
方法的代码实现在k8s.io/kubernetes/pkg/kubelet/kuberuntime/kuberuntime_gc.go文件381
行
// GarbageCollect removes dead containers using the specified container gc policy.
// Note that gc policy is not applied to sandboxes. Sandboxes are only removed when they are
// not ready and containing no containers.
//
// GarbageCollect consists of the following steps:
// * gets evictable containers which are not active and created more than gcPolicy.MinAge ago.
// * removes oldest dead containers for each pod by enforcing gcPolicy.MaxPerPodContainer.
// * removes oldest dead containers by enforcing gcPolicy.MaxContainers.
// * gets evictable sandboxes which are not ready and contains no containers.
// * removes evictable sandboxes.
func (cgc *containerGC) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSourcesReady bool, evictTerminatedPods bool) error {
// Remove evictable containers
if err := cgc.evictContainers(gcPolicy, allSourcesReady, evictTerminatedPods); err != nil {
return err
}
// Remove sandboxes with zero containers
//删除没有容器的sandbox
if err := cgc.evictSandboxes(evictTerminatedPods); err != nil {
return err
}
// Remove pod sandbox log directory
//删除pod的日志
return cgc.evictPodLogsDirectories(allSourcesReady)
}
根据代码注释,我们可以知道执行的逻辑
GarbageCollect使用指定的容器gc policy策略删除死掉的容器。
请注意,gc策略不适用于沙箱。 sandboxes仅在not ready且不包含容器的情况下被删除
GarbageCollect包含以下步骤:
1.获取比gcPolicy.MinAge更长的且死掉的容器。
2.通过强制执行gcPolicy.MaxPerPodContainer来删除每个pod的最旧的死容器。
3.通过强制执行gcPolicy.MaxContainers删除最旧的死容器。
对sandboxes的垃圾回收处理逻辑
1.获得没有准备好且不包含容器的可迁移沙箱。
2.删除可迁移的沙箱。
这个就是容器的垃圾回收机制的执行逻辑
接下来分析containerGC.evictContainers
的实现逻辑 k8s.io/kubernetes/pkg/kubelet/kuberuntime/kuberuntime_gc.go文件205-249
行
// evict all containers that are evictable
func (cgc *containerGC) evictContainers(gcPolicy kubecontainer.ContainerGCPolicy, allSourcesReady bool, evictTerminatedPods bool) error {
// Separate containers by evict units.
//获取待删除的所有的容器列表:包括不运行的container以及比容器策略设定的最小时长早的container
evictUnits, err := cgc.evictableContainers(gcPolicy.MinAge)
if err != nil {
return err
}
// Remove deleted pod containers if all sources are ready.
if allSourcesReady {
for key, unit := range evictUnits {
if cgc.podStateProvider.IsPodDeleted(key.uid) || (cgc.podStateProvider.IsPodTerminated(key.uid) && evictTerminatedPods) {
cgc.removeOldestN(unit, len(unit)) // Remove all.
delete(evictUnits, key)
}
}
}
// Enforce max containers per evict unit.
if gcPolicy.MaxPerPodContainer >= 0 {
cgc.enforceMaxContainersPerEvictUnit(evictUnits, gcPolicy.MaxPerPodContainer)
}
// Enforce max total number of containers.
if gcPolicy.MaxContainers >= 0 && evictUnits.NumContainers() > gcPolicy.MaxContainers {
// Leave an equal number of containers per evict unit (min: 1).
numContainersPerEvictUnit := gcPolicy.MaxContainers / evictUnits.NumEvictUnits()
if numContainersPerEvictUnit < 1 {
numContainersPerEvictUnit = 1
}
cgc.enforceMaxContainersPerEvictUnit(evictUnits, numContainersPerEvictUnit)
// If we still need to evict, evict oldest first.
numContainers := evictUnits.NumContainers()
if numContainers > gcPolicy.MaxContainers {
flattened := make([]containerGCInfo, 0, numContainers)
for key := range evictUnits {
flattened = append(flattened, evictUnits[key]...)
}
sort.Sort(byCreated(flattened))
//执行删除操作
cgc.removeOldestN(flattened, numContainers-gcPolicy.MaxContainers)
}
}
return nil
}
所有的删除操作都是通过函数containerGC.removeOldestN
进行的
// removeOldestN removes the oldest toRemove containers and returns the resulting slice.
func (cgc *containerGC) removeOldestN(containers []containerGCInfo, toRemove int) []containerGCInfo {
// Remove from oldest to newest (last to first).
numToKeep := len(containers) - toRemove
for i := len(containers) - 1; i >= numToKeep; i-- {
if err := cgc.manager.removeContainer(containers[i].id); err != nil {
glog.Errorf("Failed to remove container %q: %v", containers[i].id, err)
}
}
// Assume we removed the containers so that we're not too aggressive.
return containers[:numToKeep]
}
另外的函数containerGC.evictSandboxes
以及containerGC.evictPodLogsDirectories
这两个函数都是在k8s.io/kubernetes/pkg/kubelet/kuberuntime/kuberuntime_gc.go文件中,分别在257-312
和316-355
行
代码evictSandboxes
// evictSandboxes remove all evictable sandboxes. An evictable sandbox must
// meet the following requirements:
// 1. not in ready state
// 2. contains no containers.
// 3. belong to a non-existent (i.e., already removed) pod, or is not the
// most recently created sandbox for the pod.
func (cgc *containerGC) evictSandboxes(evictTerminatedPods bool) error {
containers, err := cgc.manager.getKubeletContainers(true)
if err != nil {
return err
}
// collect all the PodSandboxId of container
sandboxIDs := sets.NewString()
for _, container := range containers {
sandboxIDs.Insert(container.PodSandboxId)
}
sandboxes, err := cgc.manager.getKubeletSandboxes(true)
if err != nil {
return err
}
sandboxesByPod := make(sandboxesByPodUID)
for _, sandbox := range sandboxes {
podUID := types.UID(sandbox.Metadata.Uid)
sandboxInfo := sandboxGCInfo{
id: sandbox.Id,
createTime: time.Unix(0, sandbox.CreatedAt),
}
// Set ready sandboxes to be active.
if sandbox.State == runtimeapi.PodSandboxState_SANDBOX_READY {
sandboxInfo.active = true
}
// Set sandboxes that still have containers to be active.
if sandboxIDs.Has(sandbox.Id) {
sandboxInfo.active = true
}
sandboxesByPod[podUID] = append(sandboxesByPod[podUID], sandboxInfo)
}
// Sort the sandboxes by age.
for uid := range sandboxesByPod {
sort.Sort(sandboxByCreated(sandboxesByPod[uid]))
}
for podUID, sandboxes := range sandboxesByPod {
if cgc.podStateProvider.IsPodDeleted(podUID) || (cgc.podStateProvider.IsPodTerminated(podUID) && evictTerminatedPods) {
// Remove all evictable sandboxes if the pod has been removed.
// Note that the latest dead sandbox is also removed if there is
// already an active one.
cgc.removeOldestNSandboxes(sandboxes, len(sandboxes))
} else {
// Keep latest one if the pod still exists.
cgc.removeOldestNSandboxes(sandboxes, len(sandboxes)-1)
}
}
return nil
}
evictPodLogsDirectories
的代码
// evictPodLogsDirectories evicts all evictable pod logs directories. Pod logs directories
// are evictable if there are no corresponding pods.
func (cgc *containerGC) evictPodLogsDirectories(allSourcesReady bool) error {
osInterface := cgc.manager.osInterface
if allSourcesReady {
// Only remove pod logs directories when all sources are ready.
dirs, err := osInterface.ReadDir(podLogsRootDirectory)
if err != nil {
return fmt.Errorf("failed to read podLogsRootDirectory %q: %v", podLogsRootDirectory, err)
}
for _, dir := range dirs {
name := dir.Name()
podUID := types.UID(name)
if !cgc.podStateProvider.IsPodDeleted(podUID) {
continue
}
err := osInterface.RemoveAll(filepath.Join(podLogsRootDirectory, name))
if err != nil {
glog.Errorf("Failed to remove pod logs directory %q: %v", name, err)
}
}
dirs, err = osInterface.ReadDir(podLogsFileDirectory)
if err != nil {
return fmt.Errorf("failed to read podLogsRootDirectory %q: %v", podLogsFileDirectory, err)
}
for _, dir := range dirs {
name := dir.Name()
uid := strings.Split(name, "_")
if len(uid) > 2 {
name = uid[len(uid)-2]
}
podUID := types.UID(name)
if !cgc.podStateProvider.IsPodDeleted(podUID) {
continue
}
err := osInterface.RemoveAll(filepath.Join(podLogsFileDirectory, dir.Name()))
if err != nil {
glog.Errorf("Failed to remove pod logs directory %q: %v", name, err)
}
}
}
// Remove dead container log symlinks.
// TODO(random-liu): Remove this after cluster logging supports CRI container log path.
logSymlinks, _ := osInterface.Glob(filepath.Join(legacyContainerLogsDir, fmt.Sprintf("*.%s", legacyLogSuffix)))
for _, logSymlink := range logSymlinks {
if _, err := osInterface.Stat(logSymlink); os.IsNotExist(err) {
err := osInterface.Remove(logSymlink)
if err != nil {
glog.Errorf("Failed to remove container log dead symlink %q: %v", logSymlink, err)
}
}
}
return nil
}
至此,容器垃圾回收机制已经全部分析完成,每隔2min
循环执行容器垃圾回收机制功能
更多推荐
所有评论(0)