想系统学习k8s源码,云原生的可以加:mkjnnm

dc.sync

// sync is responsible for reconciling deployments on scaling events or when they
// are paused.
func (dc *DeploymentController) sync(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
	if err != nil {
		return err
	}
	if err := dc.scale(ctx, d, newRS, oldRSs); err != nil {
		// If we get an error while trying to scale, the deployment will be requeued
		// so we can abort this resync
		return err
	}

	// Clean up the deployment when it's paused and no rollback is in flight.
	if d.Spec.Paused && getRollbackTo(d) == nil {
		if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
			return err
		}
	}

	allRSs := append(oldRSs, newRS)
	return dc.syncDeploymentStatus(ctx, allRSs, newRS, d)
}

下面来分析一下dc.sync方法,以下两种情况下,都会调用dc.sync,然后直接return:
(1)判断deployment的.Spec.Paused属性值是否为true,是则调用dc.sync做处理,调用完成后直接return;
(2)先调用dc.isScalingEvent,检查deployment对象是否处于 scaling 状态,是则调用dc.sync做处理,调用完成后直接return。

关于Paused字段

deployment的.Spec.Paused为true时代表该deployment处于暂停状态,false则代表处于正常状态。当deployment处于暂停状态时,deployment对象的PodTemplateSpec的任何修改都不会触发deployment的更新,当.Spec.Paused再次赋值为false时才会触发deployment更新。

dc.sync主要逻辑:
(1)调用dc.getAllReplicaSetsAndSyncRevision获取最新的replicaset对象以及旧的replicaset对象列表;
(2)调用dc.scale,判断是否需要进行扩缩容操作,需要则进行扩缩容操作;
(3)当deployment的.Spec.Paused为true且不需要做回滚操作时,调dc.cleanupDeployment, 根据deployment配置的保留历史版本数(.Spec.RevisionHistoryLimit)以及replicaset的创建时 间,把最老的旧的replicaset给删除清理掉;
(4)调用dc.syncDeploymentStatus,计算并更新deployment对象的status字段。

dc.scale

dc.scale主要作用是处理deployment的扩缩容操作,其主要逻辑如下:
(1)调用deploymentutil.FindActiveOrLatest,判断是否只有最新的replicaset对象的副本数不为0,是则找到最新的replicaset对象,并判断其副本数是否与deployment期望副本数一致,是则直接return,否则调用dc.scaleReplicaSetAndRecordEvent更新其副本数为deployment的期望副本数;
(2)当最新的replicaset对象的副本数与deployment期望副本数一致,且旧的replicaset对象中有副本数不为0的,则从旧的replicset对象列表中找出副本数不为0的replicaset,调用dc.scaleReplicaSetAndRecordEvent将其副本数缩容为0,然后return;
(3)当最新的replicaset对象的副本数与deployment期望副本数不一致,旧的replicaset对象中有副本数不为0的,且deployment的更新策略为滚动更新,说明deployment可能正在滚动更新,则按一定的比例对新旧replicaset进行扩缩容操作,保证滚动更新的稳定性。

	// There are old replica sets with pods and the new replica set is not saturated.
	// We need to proportionally scale all replica sets (new and old) in case of a
	// rolling deployment.
	if deploymentutil.IsRollingUpdate(deployment) {
		allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS))
		allRSsReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)

		allowedSize := int32(0)
		if *(deployment.Spec.Replicas) > 0 {
			allowedSize = *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment)
		}

		// Number of additional replicas that can be either added or removed from the total
		// replicas count. These replicas should be distributed proportionally to the active
		// replica sets.
		deploymentReplicasToAdd := allowedSize - allRSsReplicas

		// The additional replicas should be distributed proportionally amongst the active
		// replica sets from the larger to the smaller in size replica set. Scaling direction
		// drives what happens in case we are trying to scale replica sets of the same size.
		// In such a case when scaling up, we should scale up newer replica sets first, and
		// when scaling down, we should scale down older replica sets first.
		var scalingOperation string
		switch {
		case deploymentReplicasToAdd > 0:
			sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs))
			scalingOperation = "up"

		case deploymentReplicasToAdd < 0:
			sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs))
			scalingOperation = "down"
		}

		// Iterate over all active replica sets and estimate proportions for each of them.
		// The absolute value of deploymentReplicasAdded should never exceed the absolute
		// value of deploymentReplicasToAdd.
		deploymentReplicasAdded := int32(0)
		nameToSize := make(map[string]int32)
		logger := klog.FromContext(ctx)
		for i := range allRSs {
			rs := allRSs[i]

			// Estimate proportions if we have replicas to add, otherwise simply populate
			// nameToSize with the current sizes for each replica set.
			if deploymentReplicasToAdd != 0 {
				proportion := deploymentutil.GetProportion(logger, rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded)

				nameToSize[rs.Name] = *(rs.Spec.Replicas) + proportion
				deploymentReplicasAdded += proportion
			} else {
				nameToSize[rs.Name] = *(rs.Spec.Replicas)
			}
		}

		// Update all replica sets
		for i := range allRSs {
			rs := allRSs[i]

			// Add/remove any leftovers to the largest replica set.
			if i == 0 && deploymentReplicasToAdd != 0 {
				leftover := deploymentReplicasToAdd - deploymentReplicasAdded
				nameToSize[rs.Name] = nameToSize[rs.Name] + leftover
				if nameToSize[rs.Name] < 0 {
					nameToSize[rs.Name] = 0
				}
			}

			// TODO: Use transactions when we have them.
			if _, _, err := dc.scaleReplicaSet(ctx, rs, nameToSize[rs.Name], deployment, scalingOperation); err != nil {
				// Return as soon as we fail, the deployment is requeued
				return err
			}
		}
	}

Deployment 会在 .spec.strategy.type==RollingUpdate时,采取 滚动更新的方式更新 Pod。你可以指定 maxUnavailablemaxSurge 来控制滚动更新过程。

最大不可用

.spec.strategy.rollingUpdate.maxUnavailable 是一个可选字段, 用来指定更新过程中不可用的 Pod 的个数上限。该值可以是绝对数字(例如,5),也可以是所需 Pod 的百分比(例如,10%)。百分比值会转换成绝对数并去除小数部分。 如果 .spec.strategy.rollingUpdate.maxSurge 为 0,则此值不能为 0。 默认值为 25%。

例如,当此值设置为 30% 时,滚动更新开始时会立即将旧 ReplicaSet 缩容到期望 Pod 个数的70%。 新 Pod 准备就绪后,可以继续缩容旧有的 ReplicaSet,然后对新的 ReplicaSet 扩容, 确保在更新期间可用的 Pod 总数在任何时候都至少为所需的 Pod 个数的 70%。

最大峰值

.spec.strategy.rollingUpdate.maxSurge 是一个可选字段,用来指定可以创建的超出期望 Pod 个数的 Pod 数量。此值可以是绝对数(例如,5)或所需 Pod 的百分比(例如,10%)。 如果 MaxUnavailable 为 0,则此值不能为 0。百分比值会通过向上取整转换为绝对数。 此字段的默认值为 25%。

例如,当此值为 30% 时,启动滚动更新后,会立即对新的 ReplicaSet 扩容,同时保证新旧 Pod 的总数不超过所需 Pod 总数的 130%。一旦旧 Pod 被杀死,新的 ReplicaSet 可以进一步扩容, 同时确保更新期间的任何时候运行中的 Pod 总数最多为所需 Pod 总数的 130%。

dc.RolloutRolling

	switch d.Spec.Strategy.Type {
	case apps.RecreateDeploymentStrategyType:
		return dc.rolloutRecreate(ctx, d, rsList, podMap)
	case apps.RollingUpdateDeploymentStrategyType:
		return dc.rolloutRolling(ctx, d, rsList)
	}
	return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
// rolloutRolling implements the logic for rolling a new replica set.
func (dc *DeploymentController) rolloutRolling(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true)
	if err != nil {
		return err
	}
	allRSs := append(oldRSs, newRS)

	// Scale up, if we can.
	scaledUp, err := dc.reconcileNewReplicaSet(ctx, allRSs, newRS, d)
	if err != nil {
		return err
	}
	if scaledUp {
		// Update DeploymentStatus
		return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
	}

	// Scale down, if we can.
	scaledDown, err := dc.reconcileOldReplicaSets(ctx, allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
	if err != nil {
		return err
	}
	if scaledDown {
		// Update DeploymentStatus
		return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
	}

	if deploymentutil.DeploymentComplete(d, &d.Status) {
		if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
			return err
		}
	}

	// Sync deployment status
	return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐