查看案例

字段含义
podAffinityPod 间的亲和性定义
podAntiAffinityPod 间的反亲和性定义
requiredDuringSchedulingIgnoredDuringExecution硬性要求,必须满足条件,保证分散部署的效果最好使用用此方式
preferredDuringSchedulingIgnoredDuringExecution软性要求,可以不完全满足,即有可能同一node上可以跑多个副本
requiredDuringSchedulingIgnoredDuringExecutionlabelSelector
topologyKey
preferredDuringSchedulingIgnoredDuringExecutionweight
podAffinityTermlabelSelector
topologyKey
topologyKey可以理解为 Node 的 Label,具有相同的 Label 的 Node,视为同一拓扑
如三个节点打上 Label :
- Node1 —— zone:beijing
- Node2 —— zone:shanghai
- Node3 —— zone:beijing
那么 Node1 和 Node3 为同一拓扑,Node2 为另一拓扑
topologyKey: kubernetes.io/hostname
上面为常见的配置,可以通过 kubectl get nodes --show-labels看到
节点上的 Lable,就具有此 kubernetes.io/hostname Label
因此就是将每个节点,作为一个独立的拓扑
apiVersion: v1
kind: Pod
metadata:
  name: test-pod
spec:
  affinity:
    # 首先根据 labelSelector 选择具有 service.cpaas.io/name: deployment-nginx  Label 的 所有 Pod
    # 接下来根据 podAffinity 亲和性,将此 pod 调度到与选中 Pod 中具有 topologyKey 的 Node 上
    podAffinity:
      requiredDuringSchedulingIgnoredDuringExecution:
      - labelSelector:
          matchLabels:
            service.cpaas.io/name: deployment-nginx
        topologyKey: kubernetes.io/hostname
      - labelSelector:
          matchLabels:
            service.cpaas.io/name: deployment-busybox
        topologyKey: kubernetes.io/hostname
    # 首先根据 labelSelector 选择具有 key 为 a ,value为 b 或 c 的 Label 的 Pod
    # 接下来根据 podAntiAffinity,将此 pod 调度到与选中 Pod 中都不相同的 Node 上,该节点需要具有 topologyKey label
    podAntiAffinity:
      preferredDuringSchedulingIgnoredDuringExecution:
      - weight: 100
        podAffinityTerm:
          labelSelector:
            matchExpressions:
            - key: a
              operator: In
              values: ["b", "c"]
          topologyKey: kubernetes.io/hostname
  containers:
  - name: test-pod
    image: nginx:1.18


代码分析

代码路径:pkg/scheduler/framework/plugins/interpodaffinity

img

首先根据调度器框架,观察源码,可以看出实现了一下四个接口:

  1. PreFilter
  2. Filter
  3. PreScore
  4. Score

首先明确几点

  • 该插件是考虑 Pod 间的亲和性和反亲和性(就是新Pod 和 现存 Pod 的关系)
  • 但最终结果是将 Pod 调度到合适的 Node 上(因此要记录 Node 的信息)

1 | PreFilter

此步骤作用:

  1. 梳理出【现存哪些 Pod】 讨厌【新 Pod】,记录【满足条件的现存 Pod】 对应 Node 信息为 existingPodAntiAffinityMap
  2. 梳理出【新 Pod】喜欢【哪些现存Pod】,记录【满足条件的现存 Pod】 对应 Node 信息为 incomingPodAffinityMap
  3. 梳理出【新 Pod】讨厌【哪些现存Pod】,记录【满足条件的现存 Pod】 对应 Node 信息为 incomingPodAntiAffinityMap

所以可以小总结一下

  • existingPodAntiAffinityMap 和 incomingPodAntiAffinityMap 这些记录的节点,新 Pod 不喜欢
  • incomingPodAffinityMap 记录的节点,Pod 喜欢

问题 —— 为什么不梳理 【现存哪些 Pod】 喜欢【新 Pod】?

  • 因为现在是调度【新 Pod】,只要不被讨厌,不影响【现存 Pod 】就行,因此只需要可能会影响的【现存 Pod】

注意上面所说的【条件】—— 指的是【硬性要求 requiredDuringSchedulingIgnoredDuringExecution 】 —— 因此才考虑这么详细

// 这里只截取了 PreFilter 部分重要函数
// pkg/scheduler/framework/plugins/interpodaffinity/filtering.go

// 考虑现存 Pod 的 反亲和性 anti-affinity
// 简单理解:就是用现存 Pod 的 anti-affinity Terms 配置,要求 NewPod,记录下满足的 Node,说明这些节点不能调度(因为现存 Pod 排斥新 Pod)
// 这里的 anti-affinity Terms 是指 requiredDuringSchedulingIgnoredDuringExecution 定义的硬性要求
// 问题:为什么不考虑现存 Pod 的亲和性? —— 因为现存 Pod 的亲和性(是亲和他之前 Pod),在其调度的时候早已考虑,现在只需要考虑其反感的
// 代码级理解:
// 1. 遍历所有具有 anti-affinity 现存 Pod
// 2. 若即将调度的 NewPod 满足该 Pod 的 anti-affnity Terms,
// 3. 就记录到 existingPodAntiAffinityMap 中,key 为该 Pod 所在的 node 信息(topologyKey、topologyValue),value 为满足的 Terms 次数
// 例如 map{(hostname:node01):1}
// existingPodAntiAffinityMap will be used later for efficient check on existing pods' anti-affinity
existingPodAntiAffinityMap := getTPMapMatchingExistingAntiAffinity(pod, nodesWithRequiredAntiAffinityPods)

// 考虑新 NewPod 的亲和性和反亲和性
// 简单理解: 就是用 NewPod 的 anti-affinity 和 affinity Terms 配置,要求现存的 Pod,记录下满足的 Node
// incomingPodAffinityMap will be used later for efficient check on incoming pod's affinity
// incomingPodAntiAffinityMap will be used later for efficient check on incoming pod's anti-affinity
incomingPodAffinityMap, incomingPodAntiAffinityMap := getTPMapMatchingIncomingAffinityAntiAffinity(podInfo, allNodes)

2 | Filter

  • *framework.CycleState 将上面统计的信息传递过来
  • 现在的工作就是:
    • 传来了一个 Node 信息
    • 判断该 Node 与上面的 existingPodAntiAffinityMap、incomingPodAntiAffinityMap 、incomingPodAffinityMap 的关系
    • 若该 Node 满足条件,那么可以进入到下面的【打分阶段】
// pkg/scheduler/framework/plugins/interpodaffinity/filtering.go
func (pl *InterPodAffinity) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
	if nodeInfo.Node() == nil {
		return framework.NewStatus(framework.Error, "node not found")
	}

	state, err := getPreFilterState(cycleState)
	if err != nil {
		return framework.NewStatus(framework.Error, err.Error())
	}

	if !satisfyPodAffinity(state, nodeInfo) {
		return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonAffinityNotMatch, ErrReasonAffinityRulesNotMatch)
	}

	if !satisfyPodAntiAffinity(state, nodeInfo) {
		return framework.NewStatus(framework.Unschedulable, ErrReasonAffinityNotMatch, ErrReasonAntiAffinityRulesNotMatch)
	}

	if !satisfyExistingPodsAntiAffinity(state, nodeInfo) {
		return framework.NewStatus(framework.Unschedulable, ErrReasonAffinityNotMatch, ErrReasonExistingAntiAffinityRulesNotMatch)
	}

	return nil
}

3 | PreScore

这部分主要看 processExistingPod 函数

  • 可以看出根据【现存 Pod】 和【新 Pod】的【软性要求preferredDuringSchedulingIgnoredDuringExecution】,对节点进行打分
// pkg/scheduler/framework/plugins/interpodaffinity/scoring.go
// PreScore builds and writes cycle state used by Score and NormalizeScore.
func (pl *InterPodAffinity) PreScore(
	pCtx context.Context,
	cycleState *framework.CycleState,
	pod *v1.Pod,
	nodes []*v1.Node,
) *framework.Status {
  
  // ... ...
	topoScores := make([]scoreMap, len(allNodes))
	index := int32(-1)
	processNode := func(i int) {
		nodeInfo := allNodes[i]
		if nodeInfo.Node() == nil {
			return
		}
		// Unless the pod being scheduled has affinity terms, we only
		// need to process pods with affinity in the node.
		podsToProcess := nodeInfo.PodsWithAffinity
		if hasAffinityConstraints || hasAntiAffinityConstraints {
			// We need to process all the pods.
			podsToProcess = nodeInfo.Pods
		}

		topoScore := make(scoreMap)
		for _, existingPod := range podsToProcess {
			pl.processExistingPod(state, existingPod, nodeInfo, pod, topoScore)
		}
		if len(topoScore) > 0 {
			topoScores[atomic.AddInt32(&index, 1)] = topoScore
		}
	}
	parallelize.Until(context.Background(), len(allNodes), processNode)

	for i := 0; i <= int(index); i++ {
		state.topologyScore.append(topoScores[i])
	}

	cycleState.Write(preScoreStateKey, state)
	return nil
}

func (pl *InterPodAffinity) processExistingPod(
	state *preScoreState,
	existingPod *framework.PodInfo,
	existingPodNodeInfo *framework.NodeInfo,
	incomingPod *v1.Pod,
	topoScore scoreMap,
) {
	existingPodNode := existingPodNodeInfo.Node()

	// For every soft pod affinity term of <pod>, if <existingPod> matches the term,
	// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
	// value as that of <existingPods>`s node by the term`s weight.
	topoScore.processTerms(state.podInfo.PreferredAffinityTerms, existingPod.Pod, existingPodNode, 1)

	// For every soft pod anti-affinity term of <pod>, if <existingPod> matches the term,
	// decrement <p.counts> for every node in the cluster with the same <term.TopologyKey>
	// value as that of <existingPod>`s node by the term`s weight.
	topoScore.processTerms(state.podInfo.PreferredAntiAffinityTerms, existingPod.Pod, existingPodNode, -1)

	// For every hard pod affinity term of <existingPod>, if <pod> matches the term,
	// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
	// value as that of <existingPod>'s node by the constant <args.hardPodAffinityWeight>
	if pl.args.HardPodAffinityWeight > 0 {
		for _, term := range existingPod.RequiredAffinityTerms {
			t := framework.WeightedAffinityTerm{AffinityTerm: term, Weight: pl.args.HardPodAffinityWeight}
			topoScore.processTerm(&t, incomingPod, existingPodNode, 1)
		}
	}

	// For every soft pod affinity term of <existingPod>, if <pod> matches the term,
	// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
	// value as that of <existingPod>'s node by the term's weight.
	topoScore.processTerms(existingPod.PreferredAffinityTerms, incomingPod, existingPodNode, 1)

	// For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term,
	// decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey>
	// value as that of <existingPod>'s node by the term's weight.
	topoScore.processTerms(existingPod.PreferredAntiAffinityTerms, incomingPod, existingPodNode, -1)
}

4 | Score

这部分就是,将节点的得分进行累计计算,返回此符合条件的节点的得分数

  • 注意,所有符合条件都会调用此函数,得到自己对应的分数
// pkg/scheduler/framework/plugins/interpodaffinity/scoring.go
// Score invoked at the Score extension point.
// The "score" returned in this function is the sum of weights got from cycleState which have its topologyKey matching with the node's labels.
// it is normalized later.
// Note: the returned "score" is positive for pod-affinity, and negative for pod-antiaffinity.
func (pl *InterPodAffinity) Score(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
	nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName)
	if err != nil || nodeInfo.Node() == nil {
		return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v, node is nil: %v", nodeName, err, nodeInfo.Node() == nil))
	}
	node := nodeInfo.Node()

	s, err := getPreScoreState(cycleState)
	if err != nil {
		return 0, framework.NewStatus(framework.Error, err.Error())
	}
	var score int64
	for tpKey, tpValues := range s.topologyScore {
		if v, exist := node.Labels[tpKey]; exist {
			score += tpValues[v]
		}
	}

	return score, nil
}
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐