在Node节点上,k8s是通过kubelet删除具体的pod。

而用户删除pod一般通过kubectl delete pod 命令,或者利用http直接调用api-server暴露的接口去删除pod。因此在分析kubelet删除具体pod之前,先分析用户删除pod的命令是如何一步步传达给具体node上的(以kubectl delete pod 命令为例)。

用户删除Pod流程

k8s中采用cobra框架作为命令行的实现。kubectl delete操作通过NewCmdDelete中的RunDelete函数实现。

pkg/kubectl/cmd/delete.go

NewCmdDelete源码

func NewCmdDelete(f cmdutil.Factory, streams genericclioptions.IOStreams) *cobra.Command {
	deleteFlags := NewDeleteCommandFlags("containing the resource to delete.")

	cmd := &cobra.Command{
		Use: "delete ([-f FILENAME] | TYPE [(NAME | -l label | --all)])",
		DisableFlagsInUseLine: true,
		Short:   i18n.T("Delete resources by filenames, stdin, resources and names, or by resources and label selector"),
		Long:    delete_long,
		Example: delete_example,
		Run: func(cmd *cobra.Command, args []string) {
			o := deleteFlags.ToOptions(nil, streams)
			cmdutil.CheckErr(o.Complete(f, args, cmd))
			cmdutil.CheckErr(o.Validate(cmd))
			cmdutil.CheckErr(o.RunDelete())
		},
		SuggestFor: []string{"rm"},
	}

	deleteFlags.AddFlags(cmd)

	cmdutil.AddIncludeUninitializedFlag(cmd)
	return cmd
}

 可以看出,是非常典型的cobra框架,Use字段说明delete的用法,Short和long字段分别表示delete命令的短和长的说明。Run字段则是正式运行delete命令的操作,匿名函数中依次调用了Complete、Valodata和RunDelete函数。Complete函数主要是通过visitor模式构造builder(通过传入的参数)。

includeUninitialized := cmdutil.ShouldIncludeUninitialized(cmd, false)
	r := f.NewBuilder().
		Unstructured().
		ContinueOnError().
		NamespaceParam(cmdNamespace).DefaultNamespace().
		FilenameParam(enforceNamespace, &o.FilenameOptions).
		LabelSelectorParam(o.LabelSelector).
		FieldSelectorParam(o.FieldSelector).
		IncludeUninitialized(includeUninitialized).
		SelectAllParam(o.DeleteAll).
		ResourceTypeOrNameArgs(false, args...).RequireObject(false).
		Flatten().
		Do()

FilenameParam是这里唯一指定builder的资源参数的方法,即把命令行传入的filenames参数作为资源参数。Flatten方法则告诉Builder,这里的资源对象其实是一个数组,需要builder构造一个FlattenListVisitor来遍历Visit数组中的每个资源项目,Do方法则返回一个result对象,里面包括与资源相关的visitor对象,这个visitor对象包含了一个可由用户指定(编写)的匿名回调函数,这个回调函数就是资源处理的最终逻辑。

Do函数:/vendor/k8s.io/cli-runtime/pkg/genericclioptions/resource/builder.go

unc (b *Builder) Do() *Result {
        ......
	r.visitor = NewDecoratedVisitor(r.visitor, helpers...)
	if b.continueOnError {
		r.visitor = ContinueOnErrorVisitor{r.visitor}
	}
	return r
}

 Do函数调用NewDecoratedVisitor方法生成一个Visitor对象。来看看NewDecoratedVisitor方法:

/vendor/k8s.io/cli-runtime/pkg/genericclioptions/resource/visitor.go

type DecoratedVisitor struct {
	visitor    Visitor
	decorators []VisitorFunc
}

// NewDecoratedVisitor will create a visitor that invokes the provided visitor functions before
// the user supplied visitor function is invoked, giving them the opportunity to mutate the Info
// object or terminate early with an error.
func NewDecoratedVisitor(v Visitor, fn ...VisitorFunc) Visitor {
	if len(fn) == 0 {
		return v
	}
	return DecoratedVisitor{v, fn}
}

// Visit implements Visitor
func (v DecoratedVisitor) Visit(fn VisitorFunc) error {
	return v.visitor.Visit(func(info *Info, err error) error {
		if err != nil {
			return err
		}
		for i := range v.decorators {
			if err := v.decorators[i](info, nil); err != nil {
				return err
			}
		}
		return fn(info, nil)
	})
}

 可以看到,newdecoratedVisitor将创建一个调用用户提供的visitorFunc的Visitor。Visit函数是Visitor的实现,Visit函数中调用了用户提供的visitorFunc。那么visitorFunc函数在哪提供呢?继续往下看代码。

先小结builder和visitor的操作:

这段build的操作目的目标是根据命令行输入的资源相关的参数,创建针对性的visitor对象来获取相关资源,最后遍历相关的所有visitor对象,触发用户指定的visitorFun回调函数来处理每个具体的资源,最终完成资源对象的业务处理逻辑。这个visitorFun回调函数就在下面的RunDelete函数中实现。具体往下看。

NewCmdDelete函数在调用完Complete函数构造完visitor对象后,调用RunDelete函数来处理该对象。RunDelete函数实现如下,函数中直接调用DeleteResult函数。

func (o *DeleteOptions) RunDelete() error {
	return o.DeleteResult(o.Result)
} 
func (o *DeleteOptions) DeleteResult(r *resource.Result) error {
	found := 0
	if o.IgnoreNotFound {
		r = r.IgnoreErrors(errors.IsNotFound)
	}
	deletedInfos := []*resource.Info{}
	uidMap := kubectlwait.UIDMap{}
	err := r.Visit(func(info *resource.Info, err error) error {
		if err != nil {
			return err
		}
		deletedInfos = append(deletedInfos, info)
		found++

		options := &metav1.DeleteOptions{}
		if o.GracePeriod >= 0 {
			options = metav1.NewDeleteOptions(int64(o.GracePeriod))
		}
		policy := metav1.DeletePropagationBackground
		if !o.Cascade {
			policy = metav1.DeletePropagationOrphan
		}
		options.PropagationPolicy = &policy

		response, err := o.deleteResource(info, options)
		if err != nil {
			return err
		}

代码中的Visit中的匿名函数func就是result对象的回调处理函数。该函数对请求的资源进行处理(向apiserver发送删除请求)。具体是调用了deleteResource函数。该这个函数实现如下。

pkg/kubectl/cmd/delete.go

func (o *DeleteOptions) deleteResource(info *resource.Info, deleteOptions *metav1.DeleteOptions) (runtime.Object, error) {
	deleteResponse, err := resource.NewHelper(info.Client, info.Mapping).DeleteWithOptions(info.Namespace, info.Name, deleteOptions)
	if err != nil {
		return nil, cmdutil.AddSourceToErr("deleting", info.Source, err)
	}

	o.PrintObj(info)
	return deleteResponse, nil
}
可以看到调用了DeleteWithOptions方法,该方法实现如下:
vendor/k8s.io/cli-runtime/pkg/genericclioptions/resource/helper.go
func (m *Helper) DeleteWithOptions(namespace, name string, options *metav1.DeleteOptions) (runtime.Object, error) {
	return m.RESTClient.Delete().
		NamespaceIfScoped(namespace, m.NamespaceScoped).
		Resource(m.Resource).
		Name(name).
		Body(options).
		Do().
		Get()
}

很明显,这里调用了RESTClient的delete post方法向apiserver发送post请求。Do方法中具体发送http request。
以上就是kubectl发送pod delete事件的流程。


apiserver删除pod流程

接下来分析apiserver接收到kubectl的delete pod请求的响应。
k8s的apisever使用的是go-restful架构来处理http请求。核心处理方法是install和registerResourceHandlers。
install方法主要是调用registerResourceHandlers方法对每个api进行注册和路由绑定(WebService绑定)。具体代码如下:
staging/src/k8s.io/apiserver/pkg/endpoints/installer.go
func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) {
	var apiResources []metav1.APIResource
	var errors []error
	ws := a.newWebService()

	// Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
	paths := make([]string, len(a.group.Storage))
	var i int = 0
	for path := range a.group.Storage {
		paths[i] = path
		i++
	}
	sort.Strings(paths)
	for _, path := range paths {
		apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
		if err != nil {
			errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err))
		}
		if apiResource != nil {
			apiResources = append(apiResources, *apiResource)
		}
	}
	return apiResources, ws, errors
}

install方法先创建了一个websevice。然后将所有的api 路径都存入一个数组:paths。对该数组排序(sort)。然后利用for range遍历数组的所有元素,调用registerResourceHandlers方法来对每个api路径注册,也就是和对应的storage以及Webservice绑定。

这里的storage指的是后端etcd的存储。storage变量是个map,Key是REST API的path,Value是rest.Storage接口,该接口就是一个通用的符合Restful要求的资源存储接口。

接下来详细分析registerResourceHandlers代码,由于代码较长,进行分段分析。

/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go

func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) {
	......
	// what verbs are supported by the storage, used to know what verbs we support per path
	creater, isCreater := storage.(rest.Creater)
	namedCreater, isNamedCreater := storage.(rest.NamedCreater)
	lister, isLister := storage.(rest.Lister)
	getter, isGetter := storage.(rest.Getter)
	getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
	gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
	collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
	updater, isUpdater := storage.(rest.Updater)
	patcher, isPatcher := storage.(rest.Patcher)
	watcher, isWatcher := storage.(rest.Watcher)
	connecter, isConnecter := storage.(rest.Connecter)
	storageMeta, isMetadata := storage.(rest.StorageMetadata)

 首先对资源的后端存储storage(etcd的存储)进行验证,判断那些方法是storage所支持的。

接下来将所有支持的方法存入action数组中:

	// Handler for standard REST verbs (GET, PUT, POST and DELETE).
		// Add actions at the resource path: /api/apiVersion/resource
		actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
		actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
		actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
		// DEPRECATED in 1.11
		actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList)

		// Add actions at the item path: /api/apiVersion/resource/{name}
		actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
		if getSubpath {
			actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter)
		}
		actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)
		actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)
		actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter)
		// DEPRECATED in 1.11
		actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher)
		actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter)
		actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath)

然后,遍历actions数组,在一个switch语句中,为所有元素定义路由。如贴出的case "GET"这一块,首先创建并包装一个handler对象,然后调用WebService的一系列方法,创建一个route对象,将handler绑定到这个route上。后面还有case "PUT"、case "DELETE"等一系列case,不一一贴出。最后,将route加入routes数组中。

		case "DELETE": // Delete a resource.
			article := getArticleForNoun(kind, " ")
			doc := "delete" + article + kind
			if isSubresource {
				doc = "delete " + subresource + " of" + article + kind
			}
			handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, restfulDeleteResource(gracefulDeleter, isGracefulDeleter, reqScope, admit))
			route := ws.DELETE(action.Path).To(handler).
				Doc(doc).
				Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
				Operation("delete"+namespaced+kind+strings.Title(subresource)+operationSuffix).
				Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
				Writes(versionedStatus).
				Returns(http.StatusOK, "OK", versionedStatus).
				Returns(http.StatusAccepted, "Accepted", versionedStatus)
			if isGracefulDeleter {
				route.Reads(versionedDeleterObject)
				if err := addObjectParams(ws, route, versionedDeleteOptions); err != nil {
					return nil, err
				}
			}

 注意,上述代码中的Operation绑定字段,就是绑定具体handler函数的地方。对于delete操作,handler函数为restfulDeleteResource。

接着,遍历route数组,将route加入WebService中。

		for _, route := range routes {
			route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{
				Group:   reqScope.Kind.Group,
				Version: reqScope.Kind.Version,
				Kind:    reqScope.Kind.Kind,
			})
			route.Metadata(ROUTE_META_ACTION, strings.ToLower(action.Verb))
			ws.Route(route)
		}

 这样,Install方法就通过调用registerResourceHandlers方法,完成了WebService与APIResource的绑定。

那么对于delete的操作,肯定是调用registerResourceHandlers方法中,delete api的route绑定的operation handler函数。即restfulDeleteResource。

接下来分析该函数代码:

/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go

func restfulDeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
	return func(req *restful.Request, res *restful.Response) {
		handlers.DeleteResource(r, allowsOptions, scope, admit)(res.ResponseWriter, req.Request)
	}
}

 该函数调用了DeleteResource进行处理。

DeleteResource主要是返回了具体的resource delete操作处理函数。

代码很长,这里贴出最关键的部分。

trace.Step("About to delete object from database")
		wasDeleted := true
		result, err := finishRequest(timeout, func() (runtime.Object, error) {
			obj, deleted, err := r.Delete(ctx, name, options)
			wasDeleted = deleted
			return obj, err
		})
		if err != nil {
			scope.err(err, w, req)
			return
		}
		trace.Step("Object deleted from database")

 可以看到r.Delete方法执行了具体的delete操作。

逐步分析该方法:

/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go

func (e *Store) Delete(ctx context.Context, name string, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
	key, err := e.KeyFunc(ctx, name)
	if err != nil {
		return nil, false, err
	}
	obj := e.NewFunc()
	qualifiedResource := e.qualifiedResourceFromContext(ctx)
	if err := e.Storage.Get(ctx, key, "", obj, false); err != nil {
		return nil, false, storeerr.InterpretDeleteError(err, qualifiedResource, name)
	}
	// support older consumers of delete by treating "nil" as delete immediately
	if options == nil {
		options = metav1.NewDeleteOptions(0)
	}
	var preconditions storage.Preconditions
	if options.Preconditions != nil {
		preconditions.UID = options.Preconditions.UID
	}
	graceful, pendingGraceful, err := rest.BeforeDelete(e.DeleteStrategy, ctx, obj, options)
	if err != nil {
		return nil, false, err
	}

 首先调用BeforeDelete方法,该方法是查看要删除的resource对象支不支持graceful删除,也就是优雅的删除。如果支持,则graceful变量为true。并且在BeforeDelete方法中,改变了pod的两个字段:

DeletionTimestampDeletionGracePeriodSeconds

/vendor/k8s.io/apiserver/pkg/registry/rest/delete.go

	if !gracefulStrategy.CheckGracefulDelete(ctx, obj, options) {
		return false, false, nil
	}
	now := metav1.NewTime(metav1.Now().Add(time.Second * time.Duration(*options.GracePeriodSeconds)))
	objectMeta.SetDeletionTimestamp(&now)
	objectMeta.SetDeletionGracePeriodSeconds(options.GracePeriodSeconds)
	// If it's the first graceful deletion we are going to set the DeletionTimestamp to non-nil.
	// Controllers of the object that's being deleted shouldn't take any nontrivial actions, hence its behavior changes.
	// Thus we need to bump object's Generation (if set). This handles generation bump during graceful deletion.
	// The bump for objects that don't support graceful deletion is handled in pkg/registry/generic/registry/store.go.
	if objectMeta.GetGeneration() > 0 {
		objectMeta.SetGeneration(objectMeta.GetGeneration() + 1)
	}

 第一个是设置删除的时间戳,设置的为now。第二个是优雅删除的时间限制,由用户设定。

继续分析Delete方法:

	shouldUpdateFinalizers, _ := deletionFinalizersForGarbageCollection(ctx, e, accessor, options)
	// TODO: remove the check, because we support no-op updates now.
	if graceful || pendingFinalizers || shouldUpdateFinalizers {
		err, ignoreNotFound, deleteImmediately, out, lastExisting = e.updateForGracefulDeletionAndFinalizers(ctx, name, key, options, preconditions, obj)
	}

	// !deleteImmediately covers all cases where err != nil. We keep both to be future-proof.
	if !deleteImmediately || err != nil {
		return out, false, err
	}

 在这里,如果graceful为真true,那么会调用updateForGracefulDeletionAndFinalizers方法,该方法用于改变pod的两个字段,

DeletionTimestampDeletionGracePeriodSeconds。调用的还是BeforeDelete方法。

func (e *Store) updateForGracefulDeletionAndFinalizers(ctx context.Context, name, key string, options *metav1.DeleteOptions, preconditions storage.Preconditions, in runtime.Object) (err error, ignoreNotFound, deleteImmediately bool, out, lastExisting runtime.Object) {
	lastGraceful := int64(0)
	var pendingFinalizers bool
	out = e.NewFunc()
	err = e.Storage.GuaranteedUpdate(
		ctx,
		key,
		out,
		false, /* ignoreNotFound */
		&preconditions,
		storage.SimpleUpdate(func(existing runtime.Object) (runtime.Object, error) {
			graceful, pendingGraceful, err := rest.BeforeDelete(e.DeleteStrategy, ctx, existing, options)
			if err != nil {

 调用完updateForGracefulDeletionAndFinalizers方法后,会返回deleteImmediately字段,表示是否需要立即删除,也就是不支持graceful。

然后对deleteImmediately字段进行判断,如果为false,也即是不需要立即删除,则Delete方法返回。如果为true,则继续往下走,即调用storage.delete对resource真正删除(etcd中删除)。

	if !deleteImmediately || err != nil {
		return out, false, err
	}

	// Going further in this function is not useful when we are
	// performing a dry-run request. Worse, it will actually
	// override "out" with the version of the object in database
	// that doesn't have the finalizer and deletiontimestamp set
	// (because the update above was dry-run too). If we already
	// have that version available, let's just return it now,
	// otherwise, we can call dry-run delete that will get us the
	// latest version of the object.
	if dryrun.IsDryRun(options.DryRun) && out != nil {
		return out, true, nil
	}

	// delete immediately, or no graceful deletion supported
	glog.V(6).Infof("going to delete %s from registry: ", name)
	out = e.NewFunc()
	if err := e.Storage.Delete(ctx, key, out, &preconditions, dryrun.IsDryRun(options.DryRun)); err != nil {
		// Please refer to the place where we set ignoreNotFound for the reason
		// why we ignore the NotFound error .
		if storage.IsNotFound(err) && ignoreNotFound && lastExisting != nil {
			// The lastExisting object may not be the last state of the object
			// before its deletion, but it's the best approximation.
			out, err := e.finalizeDelete(ctx, lastExisting, true)
			return out, true, err
		}
		return nil, false, storeerr.InterpretDeleteError(err, qualifiedResource, name)
	}
	out, err = e.finalizeDelete(ctx, out, true)
	return out, true, err

 

注意,如果不需要立即删除,即支持graceful删除,则delete方法不删除etcd中的resource数据,直接返回。即删除到此结束。apiserver处理逻辑到此结束(或称为中断)。

那么为什么不会删除etcd中的信息呢?接下来就是kubelet的逻辑了。这里先放一个钩子,等分析完kubelet删除逻辑后,再回来分析。

kubelet删除pod

与kubelet新建pod的流程类似,kubelet利用syncLoopIteration方法监听api server。一旦出现删除事件,则调用HandlePodUpdates方法处理,该方法与kubelet add pod事件调用的方法的功能很类似。对于每一个delete类型的pod。

首先调用podmanager的updatePod方法来更新pod的信息。

然后判断该pod是否为mirror pod。如果是,则调用handleMirrorPod方法来处理该pod。

否则利用GetMirrorPodByPod方法得到该pod的mirror pod,接着调用dispatchWork将pod 删除分配给具体的worker处理。

dispatchwork方法在上篇pod创建流程中已经详细分析,在此不再赘述,感兴趣的同学可以翻上一篇文章。

dispatchwork方法调用了UpdatePod方法对pod进行删除。

UpdatePod方法也在上篇文章中描述过了,这里简单说下该方法的流程:

首先对该worker上锁,p.podLock.Lock(),保证处理期间不会被其他进程调用。这个操作很细节。

然后利用podUpdates字典判断,pod id作为key,如果当前pod还没有启动过go routine,则启动一个,并创建一个channel用于传递pod信息,go runtine中调用了managePodLoop方法,对pod进行处理。

managePodLoop方法中会调用syncPod方法具体处理,syncPod方法对pod的新建、更新、删除等操作都进行了处理。

syncPod方法:

/pkg/kubelet/kubelet.go

	// if we want to kill a pod, do it now!
	if updateType == kubetypes.SyncPodKill {
		killPodOptions := o.killPodOptions
		if killPodOptions == nil || killPodOptions.PodStatusFunc == nil {
			return fmt.Errorf("kill pod options are required if update type is kill")
		}
		apiPodStatus := killPodOptions.PodStatusFunc(pod, podStatus)
		kl.statusManager.SetPodStatus(pod, apiPodStatus)
		// we kill the pod with the specified grace period since this is a termination
		if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil {
			kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
			// there was an error killing the pod, so we return that error directly
			utilruntime.HandleError(err)
			return err
		}
		return nil
	}

 syncPod方法一开始就对是否删除pod事件进行了判断,如果是,则执行killPod方法对pod进行删除。注意killpod的最后一个参数是PodTerminationGracePeriodSecondsOverride,这个是优雅删除pod的时间限制。也就是说现在pod的删除是优雅的停止。

killPod方法位于/pkg/kubelet/kubelet_pods.go中,主要调用了containerRuntime.KillPod方法来利用container的runtime来杀掉pod中所有的容器。

if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil {
			kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)

 containerRuntime.KillPod方法位于/pkg/kubelet/kuberuntime/kuberuntime_manager.go


func (m *kubeGenericRuntimeManager) KillPod(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) error {
	err := m.killPodWithSyncResult(pod, runningPod, gracePeriodOverride)
	return err.Error()
}

调用了killPodWithSyncResult方法来删除pod。

func (m *kubeGenericRuntimeManager) killPodWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (result kubecontainer.PodSyncResult) {
	killContainerResults := m.killContainersWithSyncResult(pod, runningPod, gracePeriodOverride)
	for _, containerResult := range killContainerResults {
		result.AddSyncResult(containerResult)
	}

	// stop sandbox, the sandbox will be removed in GarbageCollect
	killSandboxResult := kubecontainer.NewSyncResult(kubecontainer.KillPodSandbox, runningPod.ID)
	result.AddSyncResult(killSandboxResult)
	// Stop all sandboxes belongs to same pod
	for _, podSandbox := range runningPod.Sandboxes {
		if err := m.runtimeService.StopPodSandbox(podSandbox.ID.ID); err != nil {
			killSandboxResult.Fail(kubecontainer.ErrKillPodSandbox, err.Error())
			glog.Errorf("Failed to stop sandbox %q", podSandbox.ID)
		}
	}

	return
}

killPodWithSyncResult方法中,首先杀掉所有运行的container ,然后删除pod的sandbox。可以看到删除顺序是和创建pod的顺序相反。

杀掉所有运行的container:

killContainersWithSyncResult

/pkg/kubelet/kuberuntime/kuberuntime_container.go

func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (syncResults []*kubecontainer.SyncResult) {
	containerResults := make(chan *kubecontainer.SyncResult, len(runningPod.Containers))
	wg := sync.WaitGroup{}

	wg.Add(len(runningPod.Containers))
	for _, container := range runningPod.Containers {
		go func(container *kubecontainer.Container) {
			defer utilruntime.HandleCrash()
			defer wg.Done()

			killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, container.Name)
			if err := m.killContainer(pod, container.ID, container.Name, "Need to kill Pod", gracePeriodOverride); err != nil {
				killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
			}
			containerResults <- killContainerResult
		}(container)
	}
	wg.Wait()
	close(containerResults)

	for containerResult := range containerResults {
		syncResults = append(syncResults, containerResult)
	}
	return
}

在 killContainersWithSyncResult函数中,利用go routine多协程来对Pod中的每一个container进行删除。删除container的方法是

killContainer。在多协程处理时,killContainersWithSyncResult利用了sync.WaitGroup特性对每个协程进行等待,直到所有协程运行完毕,主协程才会继续。

接下来分析删除container的killContainer方法

/pkg/kubelet/kuberuntime/kuberuntime_container.go

KillContainer方法主要步骤如下:

1. 判断需删除的pod和container是否为空。

2. 设定优雅删除的时限。

	// From this point , pod and container must be non-nil.
	gracePeriod := int64(minimumGracePeriodInSeconds)
	switch {
	case pod.DeletionGracePeriodSeconds != nil:
		gracePeriod = *pod.DeletionGracePeriodSeconds
	case pod.Spec.TerminationGracePeriodSeconds != nil:
		gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
	}

	glog.V(2).Infof("Killing container %q with %d second grace period", containerID.String(), gracePeriod)

 其中TerminationGracePeriodSeconds可以在yaml文件中进行设置,默认为30秒。这个时间是,给pod发出关闭指令时,k8s将会给应用发送SIGTERM信号,程序只需要捕获SIGTERM信号并做相应处理即可。配置为k8s会等待设定的秒后关闭。也就是TerminationGracePeriodSeconds是pod接收到sigterm信号后,应用能够优雅关闭的时间。

这里的DeletionGracePeriodSeconds是由api server设置的。在上面分析api server时,api serve设置了DeletionGracePeriodSeconds值。

3. 运行pre stop hook:executePreStopHook方法。如果用户设置了pre stop,则会执行,pre stop主要是为了业务在pod删除前前,能够优雅的停止,比如注销等操作。

4. 停止container:StopContainer方法。

container删掉后,回到killPodWithSyncResult方法,接下来是停止sandbox。

	// Stop all sandboxes belongs to same pod
	for _, podSandbox := range runningPod.Sandboxes {
		if err := m.runtimeService.StopPodSandbox(podSandbox.ID.ID); err != nil {
			killSandboxResult.Fail(kubecontainer.ErrKillPodSandbox, err.Error())
			glog.Errorf("Failed to stop sandbox %q", podSandbox.ID)
		}
	}

 调用StopPodSandbox方法来删除sandbox,也就是pause容器。

在StopPodSandbox方法中调用了runtimeClient.StopPodSandbox方法来删除。

StopPodSandbox:/pkg/kubelet/apis/cri/runtime/v1alpha2/api.pb.go

func (c *runtimeServiceClient) StopPodSandbox(ctx context.Context, in *StopPodSandboxRequest, opts ...grpc.CallOption) (*StopPodSandboxResponse, error) {
	out := new(StopPodSandboxResponse)
	err := grpc.Invoke(ctx, "/runtime.v1alpha2.RuntimeService/StopPodSandbox", in, out, c.cc, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

 到此为止,kubelet对pod进行优雅的删除,资源优雅的释放。但这还没有结束。

kubelet在最开始,也就是刚启动的run方法中,在调用syncLoop进行事件监听前,调用了statusmanager的start方法。

	// Start component sync loops.
	kl.statusManager.Start()
	kl.probeManager.Start()

	// Start syncing RuntimeClasses if enabled.
	if kl.runtimeClassManager != nil {
		go kl.runtimeClassManager.Run(wait.NeverStop)
	}

	// Start the pod lifecycle event generator.
	kl.pleg.Start()
	kl.syncLoop(updates, kl)
}

statusmanager是用来时刻与api server同步状态的client。不会停止:

statusmanager.start():/pkg/kubelet/status/status_manager.go

func (m *manager) Start() {
	// Don't start the status manager if we don't have a client. This will happen
	// on the master, where the kubelet is responsible for bootstrapping the pods
	// of the master components.
	if m.kubeClient == nil {
		glog.Infof("Kubernetes client is nil, not starting status manager.")
		return
	}

	glog.Info("Starting to sync pod status with apiserver")
	syncTicker := time.Tick(syncPeriod)
	// syncPod and syncBatch share the same go routine to avoid sync races.
	go wait.Forever(func() {
		select {
		case syncRequest := <-m.podStatusChannel:
			glog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",
				syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)
			m.syncPod(syncRequest.podUID, syncRequest.status)
		case <-syncTicker:
			m.syncBatch()
		}
	}, 0)
}

 方法中调用了statusmanager的syncPod方法来具体同步pod状态:

在该方法的最后,有这么一段代码:

statusmanager.syncPod

/pkg/kubelet/status/status_manager.go

	// We don't handle graceful deletion of mirror pods.
	if m.canBeDeleted(pod, status.status) {
		deleteOptions := metav1.NewDeleteOptions(0)
		// Use the pod UID as the precondition for deletion to prevent deleting a newly created pod with the same name and namespace.
		deleteOptions.Preconditions = metav1.NewUIDPreconditions(string(pod.UID))
		err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, deleteOptions)
		if err != nil {
			glog.Warningf("Failed to delete status for pod %q: %v", format.Pod(pod), err)
			return
		}
		glog.V(3).Infof("Pod %q fully terminated and removed from etcd", format.Pod(pod))
		m.deletePodStatus(uid)
	}

 这段代码会调用canBeDeleted方法来判断pod是否已经优雅的停止,也就是资源是否删除干净。

canBeDeleted方法里最后调用了PodResourcesAreReclaimed方法来判断资源是否删除干净,如果删除干净,则返回true。

于是如果canBeDeleted返回true,则表示pod已经优雅的停止,于是调用api接口,向api server发送delete,再次删除pod。

这次的grace 时间设置为0:metav1.NewDeleteOptions(0)。表示这次是强制删除Pod。因此,apiserver会再次收到DELETE的

请求,继续执行DELETE handler的流程。与第一次不同的时,这次是强制删除Pod,所以会执行完整的过程,apiserver去etcd

删除最终的Pod信息。这里解释了上述api server的红字部分。

也即是说,kubelet的DELETE操作其实监听到的是Pod的更新事件,更新的就是时间戳和时间限制。Pod删除之后,执行的是REMOVE操作。

kubelet接收到事件变化之后,转化为REMOVE事件,完成Pod的最终清理工作。至此,Pod删除流程结束。

REMOVE事件会调用HandlePodRemoves进行处理。该方法调用probeManager的RemovePod方法来最后删除pod遗留的一些资源,比如探针prober worker等。

因此,在stke的kubelet日志中,可以看到对于pod删除,一开始会是delete事件,到最后是remove事件,remove事件也表示,pod彻底从node上删除。

 

以上就是Pod删除的完整流程。以一张流程图进行总结:

 

其中有一些关键点,总结如下:
1、apiserver handler执行了两次,第一次主要是修改Pod信息,设置DeletionTimestamp和DeletionGracePeriodSeconds信息,第二次去数据库etcd删除Pod信息;
2、kubelet通过检测到Pod内的资源已经完全释放之后,触发了第二次删除事件,且是强制删除Pod;
3、kubelet的DELETE操作其实监听到的是Pod的更新事件,Pod删除之后,执行的是REMOVE操作;

Logo

开源、云原生的融合云平台

更多推荐