想系统学习k8s源码,云原生的可以加:mkjnnm

Deployment Controller 是 Kube-Controller-Manager 中最常用的 Controller 之一管理 Deployment 资源。而 Deployment 的本质就是通过管理 ReplicaSet 和 Pod 在 Kubernetes 集群中部署 无状态 Workload。

Deployment 与控制器模式

在 K8s 中,pod 是最小的资源单位,而 pod 的副本管理是通过 ReplicaSet(RS) 实现的;而 deployment 实则是基于 RS 做了更上层的工作。

Deployment控制器是建立在ReplicaSet (rs)之上的一个控制器,可以管理多个rs,每次更新镜像版本,都会生成一个新的rs,把旧的rs替换掉,多个rs同时存在,但是只有一个rs运行。

通过Deployment对象,你可以轻松的做到以下事情:

  • 创建ReplicaSet和Pod
  • 滚动升级(不停止旧服务的状态下升级)和回滚应用(将应用回滚到之前的版本)
  • 平滑地扩容和缩容
  • 暂停和继续Deployment

deployment 操作

1.Deployment资源定义规范

2.Deployment示例

3. 应用配置清单

root@k8s-master01: kubectl apply -f deployment-demo.yaml
deployment.apps/deployment created

# 查看deployment信息
root@k8s-master01:~/yaml/chapter08# kubectl get deployments.apps
NAME              READY   UP-TO-DATE   AVAILABLE   AGE
deployment-demo   4/4     4            4           12s

# 查看pod信息
root@k8s-master01:~/yaml/chapter08# kubectl get pods -l 'app=demoapp,release=stable'
NAME                              READY   STATUS    RESTARTS   AGE
deployment-demo-fb544c5d8-2687q   1/1     Running   0          2m16s
deployment-demo-fb544c5d8-2t6q4   1/1     Running   0          2m16s
deployment-demo-fb544c5d8-pkgzn   1/1     Running   0          2m16s
deployment-demo-fb544c5d8-w52qp   1/1     Running   0          2m16s
# 可以看到第一段为deployment名字,最后一段为随机值,中间的fb544c5d8为replicaset中Pod模板的哈希值,也就是template字段的哈希值

# 查看replicaset信息
root@k8s-master01:~/yaml/chapter08# kubectl get replicasets.apps
NAME                        DESIRED   CURRENT   READY   AGE
deployment-demo-fb544c5d8   4         4         4       4m5s

# 一旦pod模板发生变更,会导致ReplicaSet的哈希值发生变化,然后出发deployment更新的

4.查看deployment的描述信息

root@k8s-master01:~/yaml/chapter08# kubectl describe deployments.apps deployment-demo
Name:                   deployment-demo
Namespace:              default
CreationTimestamp:      Wed, 21 Apr 2024 13:23:13 +0000
Labels:                 <none>
Annotations:            deployment.kubernetes.io/revision: 1
Selector:               app=demoapp,release=stable
Replicas:               4 desired | 4 updated | 4 total | 4 available | 0 unavailable
StrategyType:           RollingUpdate      # 一旦模板发生变化将触发滚动跟新
MinReadySeconds:        0
RollingUpdateStrategy:  25% max unavailable, 25% max surge     # 滚动更新逻辑
Pod Template:
  Labels:  app=demoapp
           release=stable
  Containers:
   demoapp:
    Image:        ikubernetes/demoapp:v1.0
    Port:         80/TCP
    Host Port:    0/TCP
    Environment:  <none>
    Mounts:       <none>
  Volumes:        <none>
Conditions:
  Type           Status  Reason
  ----           ------  ------
  Available      True    MinimumReplicasAvailable
  Progressing    True    NewReplicaSetAvailable
OldReplicaSets:  <none>
NewReplicaSet:   deployment-demo-fb544c5d8 (4/4 replicas created)
Events:
  Type    Reason             Age   From                   Message
  ----    ------             ----  ----                   -------
  Normal  ScalingReplicaSet  19m   deployment-controller  Scaled up replica set deployment-demo-fb544c5d8 to 4

5.更新

使用 kubectl scale 命令扩容 Deployment 到 5 个副本:

kubectl scale deployments/deployment-demo --replicas=5

要更新应用程序的镜像版本到 v2,请使用 set image 子命令,后面给出 Deployment 名称和新的镜像版本:

kubectl set image deployments/deployment-demo ikubernetes/demoapp:v1.0=ikubernetes/demoapp:v2.0

回滚:

// 查看deployment的更新历史信息
root@k8s-master01:~# kubectl rollout history deployment deployment-demo
deployment.apps/deployment-demo
REVISION  CHANGE-CAUSE
1         <none>
2         <none>   # 此为当前版本信息


// 快速回滚到上一个版本的Deployment,可以使用以下命令:
root@k8s-master01:~# kubectl rollout undo deployment deployment-demo
deployment.apps/deployment-demo rolled back

// 回滚指定的版本
kubectl rollout undo deployment <deployment-name> --to-revision=<revision-number>

工作流程

dc 的 Informer 主要监听三种资源,Deployment,ReplicaSet,Pod。其中 Deployment,ReplicaSet 监听 Add, Update, Delete。 Pod 只监听 Delete 事件。

// DeploymentController is responsible for synchronizing Deployment objects stored
// in the system with actual running replica sets and pods.
type DeploymentController struct {
	// rsControl is used for adopting/releasing replica sets.
	rsControl controller.RSControlInterface
	client    clientset.Interface

	eventBroadcaster record.EventBroadcaster
	eventRecorder    record.EventRecorder

	// To allow injection of syncDeployment for testing.
	syncHandler func(ctx context.Context, dKey string) error
	// used for unit testing
	enqueueDeployment func(deployment *apps.Deployment)

	// dLister can list/get deployments from the shared informer's store
	dLister appslisters.DeploymentLister
	// rsLister can list/get replica sets from the shared informer's store
	rsLister appslisters.ReplicaSetLister
	// podLister can list/get pods from the shared informer's store
	podLister corelisters.PodLister

	// dListerSynced returns true if the Deployment store has been synced at least once.
	// Added as a member to the struct to allow injection for testing.
	dListerSynced cache.InformerSynced
	// rsListerSynced returns true if the ReplicaSet store has been synced at least once.
	// Added as a member to the struct to allow injection for testing.
	rsListerSynced cache.InformerSynced
	// podListerSynced returns true if the pod store has been synced at least once.
	// Added as a member to the struct to allow injection for testing.
	podListerSynced cache.InformerSynced

	// Deployments that need to be synced
	queue workqueue.TypedRateLimitingInterface[string]
}


func startDeploymentController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
	dc, err := deployment.NewDeploymentController(
		ctx,
		controllerContext.InformerFactory.Apps().V1().Deployments(),
		controllerContext.InformerFactory.Apps().V1().ReplicaSets(),
		controllerContext.InformerFactory.Core().V1().Pods(),
		controllerContext.ClientBuilder.ClientOrDie("deployment-controller"),
	)
	if err != nil {
		return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
	}
	go dc.Run(ctx, int(controllerContext.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs))
	return nil, true, nil
}

接下来我们接着看 DeploymentController 的核心处理逻辑设计

1 dc.syncHandler = dc.syncDeployment
2 dc.enqueueDeployment = dc.enqueue

dc 的核心就是一个 Deployment 队列 enqueueDeployment,一个Deployment 同步器 syncHandler

跟踪 enqueueDeployment 可以看到在注册的 Informer 中所有可以关联到 Deployment 的事件都会调用 enqueueDeployment 并把 Deployment 对象传给它。enqueue 被初始化为 enqueueDeployment,其实就是传给了 enqueue,enqueue 方法则是提取 Deployment 对象中的属性拼成字符串添加到 queue 中。queue 是一个可以限速的队列 workqueue.TypedRateLimitingInterface[string]

func (dc *DeploymentController) enqueue(deployment *apps.Deployment) {
	key, err := controller.KeyFunc(deployment)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
		return
	}

	dc.queue.Add(key)
}

接下来继续看syncHandler,跟着源码可以看到 ControllerManager 在启动 Controller 时候调用 run 方法,在 run (worker 调用)方法中调用了 syncHandler 相关代码:

// Run begins watching and syncing.
func (dc *DeploymentController) Run(ctx context.Context, workers int) {
	defer utilruntime.HandleCrash()

	// Start events processing pipeline.
	dc.eventBroadcaster.StartStructuredLogging(3)
	dc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.client.CoreV1().Events("")})
	defer dc.eventBroadcaster.Shutdown()

	defer dc.queue.ShutDown()

	logger := klog.FromContext(ctx)
	logger.Info("Starting controller", "controller", "deployment")
	defer logger.Info("Shutting down controller", "controller", "deployment")

	if !cache.WaitForNamedCacheSync("deployment", ctx.Done(), dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
		return
	}

	for i := 0; i < workers; i++ {
		go wait.UntilWithContext(ctx, dc.worker, time.Second)
	}

	<-ctx.Done()
}

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (dc *DeploymentController) worker(ctx context.Context) {
	for dc.processNextWorkItem(ctx) {
	}
}

func (dc *DeploymentController) processNextWorkItem(ctx context.Context) bool {
	key, quit := dc.queue.Get()
	if quit {
		return false
	}
	defer dc.queue.Done(key)

	err := dc.syncHandler(ctx, key)
	dc.handleErr(ctx, err, key)

	return true
}

dc.worker函数很简单,循环的调用processNextWorkItem,processNextWorkItem去队列中取元素(deployment),然后调用syncHandler进行处理,这里的syncHandler就是上面注册的startDeploymentController函数。如果队列是空的话,processNextWorkItem会一直阻塞在dc.queue.Get()这一步。如果取到了元素,调用syncDeployment进行处理。上面的步骤可以简单描述为下图:

// 我们着重分析syncDeployment函数:
// syncDeployment will sync the deployment with the given key.
// This function is not meant to be invoked concurrently with the same key.
func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) error {
	logger := klog.FromContext(ctx)
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		logger.Error(err, "Failed to split meta namespace cache key", "cacheKey", key)
		return err
	}

	startTime := time.Now()
	logger.V(4).Info("Started syncing deployment", "deployment", klog.KRef(namespace, name), "startTime", startTime)
	defer func() {
		logger.V(4).Info("Finished syncing deployment", "deployment", klog.KRef(namespace, name), "duration", time.Since(startTime))
	}()

	deployment, err := dc.dLister.Deployments(namespace).Get(name)
	if errors.IsNotFound(err) {
		logger.V(2).Info("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
		return nil
	}
	if err != nil {
		return err
	}

	// Deep-copy otherwise we are mutating our cache.
	// TODO: Deep-copy only when needed.
	d := deployment.DeepCopy()

	everything := metav1.LabelSelector{}
	if reflect.DeepEqual(d.Spec.Selector, &everything) {
		dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
		if d.Status.ObservedGeneration < d.Generation {
			d.Status.ObservedGeneration = d.Generation
			dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
		}
		return nil
	}

	// List ReplicaSets owned by this Deployment, while reconciling ControllerRef
	// through adoption/orphaning.
	rsList, err := dc.getReplicaSetsForDeployment(ctx, d)
	if err != nil {
		return err
	}
	// List all Pods owned by this Deployment, grouped by their ReplicaSet.
	// Current uses of the podMap are:
	//
	// * check if a Pod is labeled correctly with the pod-template-hash label.
	// * check that no old Pods are running in the middle of Recreate Deployments.
	podMap, err := dc.getPodMapForDeployment(d, rsList)
	if err != nil {
		return err
	}

	if d.DeletionTimestamp != nil {
		return dc.syncStatusOnly(ctx, d, rsList)
	}

	// Update deployment conditions with an Unknown condition when pausing/resuming
	// a deployment. In this way, we can be sure that we won't timeout when a user
	// resumes a Deployment with a set progressDeadlineSeconds.
	if err = dc.checkPausedConditions(ctx, d); err != nil {
		return err
	}

	if d.Spec.Paused {
		return dc.sync(ctx, d, rsList)
	}

	// rollback is not re-entrant in case the underlying replica sets are updated with a new
	// revision so we should ensure that we won't proceed to update replica sets until we
	// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
	if getRollbackTo(d) != nil {
		return dc.rollback(ctx, d, rsList)
	}

	scalingEvent, err := dc.isScalingEvent(ctx, d, rsList)
	if err != nil {
		return err
	}
	if scalingEvent {
		return dc.sync(ctx, d, rsList)
	}

	switch d.Spec.Strategy.Type {
	case apps.RecreateDeploymentStrategyType:
		return dc.rolloutRecreate(ctx, d, rsList, podMap)
	case apps.RollingUpdateDeploymentStrategyType:
		return dc.rolloutRolling(ctx, d, rsList)
	}
	return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}

取出 deployment对象

进入该函数,先从队列里取出来的元素(格式为namespace/dp_name)中提取出dp和所属的命名空间,然后根据命名空间从本地indexer中取出dp对象(就是我们实际看到的完整的dp yaml的go对象)

这里的indexer是一个带索引的存储,informer在感知到资源变化后,将获取到的资源(如dp)以key/value的方式存储在index中(一个线程安全的map)。

	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		logger.Error(err, "Failed to split meta namespace cache key", "cacheKey", key)
		return err
	}

如果发现该dp在本地存储中不存在了,那么说明dp已经被删除了,那么就什么都不做返回。

这里需要说明的是,如用户调用api或者执行kubectl delete命令删除dp时,此时dp不会被直接删除,api server只会在dp的metadata中加上deletionTimestamp字段,然后以HTTP202码返回(表示接受),真正的删除工作是由垃圾收集器(可参考https://kubernetes.io/docs/concepts/overview/working-with-objects/finalizers/)完成的,当垃圾收集器删除dp后,dc中注册的informer感知到资源变化,就会删除本地缓存中的dp,于是就走到了这里的IsNotFound。


if errors.IsNotFound(err) {
  klog.V(2).InfoS("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
  return nil
}

判断dp的selector字段,如果为空的话,那么什么都不做,直接返回,因为没有资源能被它管理。

everything := metav1.LabelSelector{}
	if reflect.DeepEqual(d.Spec.Selector, &everything) {
		dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
		if d.Status.ObservedGeneration < d.Generation {
			d.Status.ObservedGeneration = d.Generation
			dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
		}
		return nil
	}

根据标签从本地缓存获取属于该dp的rs放入集合中,有如下情况:

  1. 本来rs就属于dp(指的是dp的selector和rs的labels匹配),rs的ownerReference属于该dp,放入集合
  2. rs的labels和dp的selector一样,但是还没有ownerReference,这种属于新建的rs,需要dp领养,放入集合
  3. rs的labels和dp的selector不一样,但是ownerReference是该dp,此时dp需要释放该rs(删除ownerReferencr),使它成为孤儿,会被垃圾收集器删除

我们可以想象下我们平时的哪些操作会出现上面三种情况?

  • 针对于1:我们对dp做了扩缩容(修改replicas字段),这样改变rs和dp的属主关系
  • 针对于2:我们对dp的template做了修改,例如升级,修改了镜像,此时会有新的rs被创建出来
  • 针对于3:修改了dp的selector,和dp匹配的rs现在不匹配了
	// List ReplicaSets owned by this Deployment, while reconciling ControllerRef
	// through adoption/orphaning.
	rsList, err := dc.getReplicaSetsForDeployment(ctx, d)
	if err != nil {
		return err
	}

判断是否有删除标记

判断dp是否有deletionTimestamp字段。我们在上面谈到过,通过api或kubectl删除dp,dp不会直接被删除,而是在dp的metadata中加入deletionTimestamp。如果存在deletionTimestamp,则调用syncStatusOnly同步状态,那么这个状态是什么?

针对deletionTimestamp不为空的场景下,由于只是metadata发生变化,所以该函数没有对dp和rs做改变。

if d.DeletionTimestamp != nil {
    return dc.syncStatusOnly(ctx, d, rsList)
}

判断是否是暂停操作

 if err = dc.checkPausedConditions(ctx, d); err != nil {
    return err
  }
    if d.Spec.Paused {
    return dc.sync(ctx, d, rsList)
  }

如果用户pause dp,如调用如下命令

kubectl rollout pause deploy/nginx

那么dp的spec字段会被加入paused: true的标记

此时在checkPausedConditions函数中会给dp的staus字段加上如下condition"


  - lastTransitionTime: "2023-03-21T14:03:52Z"
    lastUpdateTime: "2023-03-21T14:03:52Z"
    message: Deployment is paused
    reason: DeploymentPaused
    status: Unknown
    type: Progressing

判断是否是回滚操作

接着判断dp的变化是不是由于回滚引起的,如果是的话执行rollback开始回滚

	// rollback is not re-entrant in case the underlying replica sets are updated with a new
	// revision so we should ensure that we won't proceed to update replica sets until we
	// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
	if getRollbackTo(d) != nil {
		return dc.rollback(ctx, d, rsList)
	}

回滚的逻辑:判断回滚的版本是不是0,如果是0代表回滚到上个版本,找到上个版本,如果找不到上个版本,则放弃回滚;如果不是0,则找到对应版本(revision)的rs,把rs的template拷贝到dp中,然后调用接口更新(如果目标版本rs和现在dp的pod template一样,也不会滚动)。

判断是不是扩缩容事件

判断的依据是当前dp的replicas是不是和活跃(有pod副本的rs)的rs deployment.kubernetes.io/desired-replicas一样,如果不一样,那么需要调用sync回滚:

	scalingEvent, err := dc.isScalingEvent(ctx, d, rsList)
	if err != nil {
		return err
	}
	if scalingEvent {
		return dc.sync(ctx, d, rsList)
	}

选定更新策略

最后就来到了dc的最后一步判断,根据dp的更新策略是Recreate还是RollingUpdate:

	switch d.Spec.Strategy.Type {
	case apps.RecreateDeploymentStrategyType:
		return dc.rolloutRecreate(ctx, d, rsList, podMap)
	case apps.RollingUpdateDeploymentStrategyType:
		return dc.rolloutRolling(ctx, d, rsList)
	}

dp 的 RollingUpdate 是比较常见的,在具体介绍滚动更新的流程之前,我们首先需要了解滚动更新策略使用的两个参数 maxUnavailable 和 maxSurge:maxUnavailable 表示在更新过程中能够进入不可用状态的 Pod 的最大值;

  • maxSurge 表示能够额外创建的 Pod 个数;
  • maxUnavailable 和 maxSurge 这两个滚动更新的配置都可以使用绝对值或者百分比表示,使用百分比时需要用 Replicas
// rolloutRolling implements the logic for rolling a new replica set.
func (dc *DeploymentController) rolloutRolling(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true)
	if err != nil {
		return err
	}
	allRSs := append(oldRSs, newRS)

	// Scale up, if we can.
	scaledUp, err := dc.reconcileNewReplicaSet(ctx, allRSs, newRS, d)
	if err != nil {
		return err
	}
	if scaledUp {
		// Update DeploymentStatus
		return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
	}

	// Scale down, if we can.
	scaledDown, err := dc.reconcileOldReplicaSets(ctx, allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
	if err != nil {
		return err
	}
	if scaledDown {
		// Update DeploymentStatus
		return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
	}

	if deploymentutil.DeploymentComplete(d, &d.Status) {
		if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
			return err
		}
	}

	// Sync deployment status
	return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}
  1. 首先获取 Deployment 对应的全部 ReplicaSet 资源;
  2. 通过 reconcileNewReplicaSet 调解新 ReplicaSet 的副本数,创建新的 Pod 并保证额外的副本数量不超过 maxSurge;
  3. 通过 reconcileOldReplicaSets 调解历史 ReplicaSet 的副本数,删除旧的 Pod 并保证不可用的部分数不会超过 maxUnavailable;
  4. 最后删除无用的 ReplicaSet 并更新 Deployment 的状态;

dp的另外一个更新模式就是Recreate,该模式比较粗暴,直接将旧的rs副本数设置为0,然后重新创建rs,这样旧的pod也会全部一次性结束,会导致一定时间内服务不可用,所以这个模式一般不会使用到

dc整体的逻辑:

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐