K8S里的几种缓存

  • lruCache

    可以设置一个固定的大小, 每次新增的时候会淘汰最晚未被使用的元素

  • singleCache

  • LRUExpireCache

    底层基于lruCache实现的, 添加元素的时候会计算超时时间, 获取元素的时候判断元素有没有过期, 和guava cache类似

  • client-go cache

    ThreadSafeStore : 内部是一个有读写锁的map, 可以根据对象计算索引key, 然后根据索引key找到缓存的对象.

  • scheduler cache

重点讲一下 Scheduler Cache

缓存的常见就是加快处理速度, 平时我们使用缓存缓存关注的问题一般是:

  • 缓存和数据源的数据一致性

  • 缓存读写的线程安全

  • 缓存读写性能

    带着问题我们看下scheduler cache是怎么设计的

1. 缓存的目的
  1. cache谁?

    kubernetes的信息都存储在etcd中,而访问kubernetes的etcd的唯一方法是通过apiserver,所以准确的说是缓存etcd的信息。

    scheduler cache还有个作用就是bind操作会先写cache, 再异步调用api server接口bind, 这种先写cache再异步写库的方式也是可以借鉴的.

  2. cache哪些信息?

    调度器需要将Pod调度到满足需求的Node上,所以cache至少要缓存Pod和Node信息,这样才能提高kube-scheduler访问apiserver的性能。

  3. 为什么要cache?

    为了加快调度。Cache不仅缓存了Pod和Node信息,关键的bind操作也是依赖缓存实现的.

2. scheduler cache

在这里插入图片描述

type schedulerCache struct {
	stop   <-chan struct{}	// 用来通知schedulerCache停止的chan,schedulerCache有自己的协程
	ttl    time.Duration	// 假定Pod一旦完成绑定,就要在指定的时间内确认,否则就会超时,ttl就是指定的过期时间,默认30秒
	period time.Duration	// 定时清理过期的Pod,定时周期默认是1秒钟

	// This mutex guards all fields within this cache struct.
	mu sync.RWMutex		// cache不是线程安全的, 对cache的读写要先获取锁.
	// a set of assumed pod keys.	// 假定绑定的pod key set 集合
	// The key could further be used to get an entry in podStates.
	assumedPods sets.String
	// a map from pod key to podState.
	podStates map[string]*podState	// 根据pod key找到podState
	nodes     map[string]*nodeInfoListItem
	// headNode points to the most recently updated NodeInfo in "nodes". It is the
	// head of the linked list.
	headNode *nodeInfoListItem	// headNode 指向最近更新的nodeInfo
	nodeTree *nodeTree
	// A map from image name to its imageState.
	imageStates map[string]*imageState
}

schedulerCache = chan + 锁 + map + 双向链表 + 头指针的一个结构体
每次node更新, 都会把更新后的node版本加一, 然后移动到双向链表的第一个.

已经有了nodes(map类型)变量,为什么还要再加一个双向链表的变量 ?

  • nodes是一个map, 用于根据Node的名字快速找到Node.
  • 链表是按照顺序排过序的, 每次node有更新, 都会版本加一, 并且移动到最前面. 首先链表的移动效率比较高, 比数组和切片快. 然后再增量更新的时候, 从第一个开始遍历, 因为版本号是从高到低的, 只要遍历到版本号比快照小的就可以结束了.
  • 我理解的这是一个以空间换时间的数据结构
3. snapshot快照机制

当scheduler获取一个待调度的pod, 则需要从Cache中获取当前集群中的快照数据(当前此时集群中node的统计信息), 用于后续调度流程中使用

快照是对Cache某一时刻的复制,随着时间的推移,Cache的状态在持续更新,kube-scheduler在调度一个Pod的时候需要获取Cache的快照。相比于直接访问Cache,快照可以解决如下几个问题:

  • 快照不会再有任何变化,可以理解为只读,那么访问快照不需要加锁保证保证原子性
  • 无锁化的设计, 快照和Cache让读写分离, 可以避免使用锁造成Cache访问性能下降
  • 每次调度前都会更新快照信息. 如果k8s集群信息变更, 快照的数据可能不是最新鲜的, 但是不影响调度, 同一时间只有一个进程在调度. 并且如果bind失败还会进行重试的. 保证最终一致性.

在这里插入图片描述

4. 增量更新
// Snapshot is a snapshot of cache NodeInfo and NodeTree order. The scheduler takes a
// snapshot at the beginning of each scheduling cycle and uses it for its operations in that cycle.
type Snapshot struct {	// Snapshot是node和nodeTree的快照, 在每次调度开始的时候获取一个快照.
	// nodeInfoMap a map of node name to a snapshot of its NodeInfo.
	nodeInfoMap map[string]*framework.NodeInfo
	// nodeInfoList is the list of nodes as ordered in the cache's nodeTree.
	nodeInfoList []*framework.NodeInfo
	// havePodsWithAffinityNodeInfoList is the list of nodes with at least one pod declaring affinity terms.
	havePodsWithAffinityNodeInfoList []*framework.NodeInfo
	// havePodsWithRequiredAntiAffinityNodeInfoList is the list of nodes with at least one pod declaring
	// required anti-affinity terms.
	havePodsWithRequiredAntiAffinityNodeInfoList []*framework.NodeInfo
	generation                                   int64	// 调用Cache.UpdateSnapshot的时候只需要更新快照之后有变化的Node即可
}

快照 = map + nodeList + 版本号组成的一个结构体

5. 过期机制

Scheduler进行完成调度流程的决策之后,为pod选择了一个node节点,此时还未进行后续的Bind操作,但实际上资源已经分配给该pod, 此时会先更新到本地缓存(),然后再等待apiserver进行数据的广播并且最终被kubelet来进行实际的调度

但如果因为某些原因导致pod后续的事件都没有被监听到,则需要将对应的pod资源进行删除,并删除对node资源的占用

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ioWAhymB-1665633389704)(/uploads/beaa92984f148333c3f55ababa851669/image.png)]

6. cache的生命周期

在scheduler cache中pod会一个内部的状态机:initial、Assumed、Expired、Added、Delete,实际上所有的操作都是围绕着该状态机在进行,状态如下:

  • Initial: 初始化完成从apiserver监听到(也可能是监听到一个已经完成分配的pod)
  • Assumed: 在scheduler中完成分配最终完成bind操作的pod(未实际分配)
  • Added: 首先监听到事件可能是一个已经完成实际调度的pod(即从initial到Added),其次可能是经过调度决策后,被实际调度(从Assumed到Added),最后则是后续pod的更新(Update), Added语义上其实就是往Cache中添加一个Pod状态
  • Deleted: 某个pod被监听到删除事件,只有被Added过的数据才可以被Deleted
  • Expired: Assumed pod经过一段时间后没有感知到真正的分配事件被删除

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-659hn4Kl-1665633389705)(/uploads/4e65404b4a0511adf17f1d5af2bb59f2/image.png)]

源码注释中关于Pod状态机的注释 :

// State Machine of a pod's events in scheduler's cache:
//
//
//   +-------------------------------------------+  +----+
//   |                            Add            |  |    |
//   |                                           |  |    | Update
//   +      Assume                Add            v  v    |
//Initial +--------> Assumed +------------+---> Added <--+
//   ^                +   +               |       +
//   |                |   |               |       |
//   |                |   |           Add |       | Remove
//   |                |   |               |       |
//   |                |   |               +       |
//   +----------------+   +-----------> Expired   +----> Deleted
//         Forget             Expire
//

上面总结中描述了kube-scheduler大致调度一个Pod的流程

数据结构

1. nodeTree
type NodeTree struct {
	tree      map[string]*nodeArray // 存储zone和zone下面的node信息
	zones     []string              // 存储zones
	zoneIndex int
	numNodes  int
	mu        sync.RWMutex
}
2. nodeArray

nodeArray负责存储一个zone下面的所有node节点,并且通过lastIndex记录当前zone分配的节点索引

type nodeArray struct {
	nodes     []string
	lastIndex int
}
3. Cache接口

interface.go

从接口方法可以看出, Cache主要的方法是围绕pod和node展开的, 而Pod和Node源数据信息存储在etcd中.

type Cache interface {
	// NodeCount returns the number of nodes in the cache.
	// DO NOT use outside of tests.	// 只给测试用的方法, 有点怪.
	NodeCount() int

	// PodCount returns the number of pods in the cache (including those from deleted nodes).
	// DO NOT use outside of tests.
	PodCount() (int, error)

	// AssumePod assumes a pod scheduled and aggregates the pod's information into its node.
	// The implementation also decides the policy to expire pod before being confirmed (receiving Add event).
	// After expiration, its information would be subtracted.
	AssumePod(pod *v1.Pod) error	// 更新缓存, 假定绑定

	// FinishBinding signals that cache for assumed pod can be expired
	FinishBinding(pod *v1.Pod) error	// Bind是一个异步过程,当Bind完成后需要调用这个接口更新Cache,

	// ForgetPod removes an assumed pod from cache.
	ForgetPod(pod *v1.Pod) error	// 调用AssumePod后如果遇到错误失败了,就需要调用这个接口

	// AddPod either confirms a pod if it's assumed, or adds it back if it's expired.
	// If added back, the pod's information would be added again.
	AddPod(pod *v1.Pod) error		// 添加Pod既确认了假定的Pod,也会将假定过期的Pod重新添加回来

	// UpdatePod removes oldPod's information and adds newPod's information.
	UpdatePod(oldPod, newPod *v1.Pod) error		// 更新pod, 内部其实是删除了再添加pod

	// RemovePod removes a pod. The pod's information would be subtracted from assigned node.
	RemovePod(pod *v1.Pod) error	// 删除pod

	// GetPod returns the pod from the cache with the same namespace and the
	// same name of the specified pod.
	GetPod(pod *v1.Pod) (*v1.Pod, error)	// 从缓存中获取pod

	// IsAssumedPod returns true if the pod is assumed and not expired.
	IsAssumedPod(pod *v1.Pod) (bool, error)	// pod是否假定绑定了

	// AddNode adds overall information about node.
	// It returns a clone of added NodeInfo object.
	AddNode(node *v1.Node) *framework.NodeInfo	// 下面的都是node相关的接口, 添加node

	// UpdateNode updates overall information about node.
	// It returns a clone of updated NodeInfo object.
	UpdateNode(oldNode, newNode *v1.Node) *framework.NodeInfo	// 更新node

	// RemoveNode removes overall information about node.
	RemoveNode(node *v1.Node) error		// 删除node

	// UpdateSnapshot updates the passed infoSnapshot to the current contents of Cache.
	// The node info contains aggregated information of pods scheduled (including assumed to be)
	// on this node.
	// The snapshot only includes Nodes that are not deleted at the time this function is called.
	// nodeinfo.Node() is guaranteed to be not nil for all the nodes in the snapshot.
	UpdateSnapshot(nodeSnapshot *Snapshot) error	// 更新信息到cache里, 这里有个快照的概念.

	// Dump produces a dump of the current cache.
	Dump() *Dump		// 也是用于调试的
}
4. NodeInfo

node的各种信息

// NodeInfo is node level aggregated information.
type NodeInfo struct {
	// Overall node information.
	node *v1.Node					// 通用的node结构体

	// Pods running on the node.	// 当前node上运行中的pod
	Pods []*PodInfo

	// The subset of pods with affinity.
	PodsWithAffinity []*PodInfo		// 亲和性

	// The subset of pods with required anti-affinity.
	PodsWithRequiredAntiAffinity []*PodInfo		// 反亲和性

	// Ports allocated on the node.
	UsedPorts HostPortInfo			// 已分配的端口

	// Total requested resources of all pods on this node. This includes assumed
	// pods, which scheduler has sent for binding, but may not be scheduled yet.
	Requested *Resource				// 已分配的所有资源的总和
	// Total requested resources of all pods on this node with a minimum value
	// applied to each container's CPU and memory requests. This does not reflect
	// the actual resource requests for this node, but is used to avoid scheduling
	// many zero-request pods onto one node.
	NonZeroRequested *Resource
	// We store allocatedResources (which is Node.Status.Allocatable.*) explicitly
	// as int64, to avoid conversions and accessing map.
	Allocatable *Resource

	// ImageStates holds the entry of an image if and only if this image is on the node. The entry can be used for
	// checking an image's existence and advanced usage (e.g., image locality scheduling policy) based on the image
	// state information.
	ImageStates map[string]*ImageStateSummary

	// TransientInfo holds the information pertaining to a scheduling cycle. This will be destructed at the end of
	// scheduling cycle.
	// TODO: @ravig. Remove this once we have a clear approach for message passing across predicates and priorities.
	TransientInfo *TransientSchedulerInfo

	// Whenever NodeInfo changes, generation is bumped.
	// This is used to avoid cloning it if the object didn't change.
	Generation int64
}
5. scheduler Cache
type schedulerCache struct {
	stop   <-chan struct{}	// 用来通知schedulerCache停止的chan,schedulerCache有自己的协程
	ttl    time.Duration	// 假定Pod一旦完成绑定,就要在指定的时间内确认,否则就会超时,ttl就是指定的过期时间,默认30秒
	period time.Duration	// 定时清理过期的Pod,定时周期默认是1秒钟

	// This mutex guards all fields within this cache struct.
	mu sync.RWMutex		// cache的线程安全是靠这个读写锁保护的
	// a set of assumed pod keys.	// 假定绑定的pod key set 集合
	// The key could further be used to get an entry in podStates.
	assumedPods sets.String
	// a map from pod key to podState.
	podStates map[string]*podState	// 根据pod key找到podState
	nodes     map[string]*nodeInfoListItem
	// headNode points to the most recently updated NodeInfo in "nodes". It is the
	// head of the linked list.
	headNode *nodeInfoListItem	// headNode 指向最近更新的nodeInfo
	nodeTree *nodeTree
	// A map from image name to its imageState.
	imageStates map[string]*imageState
}
6. AssumePod

当kube-scheduler找到最优的Node调度Pod的时候会调用AssumePod假定Pod调度,在通过另一个协程异步Bind。假定其实就是预先占住资源,kube-scheduler调度下一个Pod的时候不会把这部分资源抢走,直到收到确认消息AddPod确认调度成功,或者是Bind失败ForgetPod取消.

// 同步更新缓存, 异步bind 就叫assume
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
	key, err := framework.GetPodKey(pod)
	if err != nil {
		return err
	}

	cache.mu.Lock()
	defer cache.mu.Unlock()
	if _, ok := cache.podStates[key]; ok {	// 已经在缓存里, 报错, 正常应该不会进来, 防御型的报错.
		return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)
	}

	cache.addPod(pod)
	ps := &podState{
		pod: pod,
	}
	cache.podStates[key] = ps	// 更新pod state
	cache.assumedPods.Insert(key)	// assumedPods保存所有绑定的pod
	return nil
}
7. ForgetPod

假定Pod预先占用了一些资源,如果之后的操作(比如Bind)有什么错误,就需要取消假定调度,释放出资源

// 假定Pod预先占用了一些资源,如果之后的操作有什么错误,就需要取消假定bind,释放出资源
func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error {
	key, err := framework.GetPodKey(pod)
	if err != nil {
		return err
	}

	cache.mu.Lock()
	defer cache.mu.Unlock()

	currState, ok := cache.podStates[key]
	if ok && currState.pod.Spec.NodeName != pod.Spec.NodeName {
		return fmt.Errorf("pod %v was assumed on %v but assigned to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
	}

	switch {
	// Only assumed pod can be forgotten.
	case ok && cache.assumedPods.Has(key):
		err := cache.removePod(pod)
		if err != nil {
			return err
		}
		delete(cache.assumedPods, key)
		delete(cache.podStates, key)
	default:
		return fmt.Errorf("pod %v wasn't assumed so cannot be forgotten", key)
	}
	return nil
}
8. FinishBinding

当假定Pod绑定完成后,需要调用FinishBinding通知Cache开始计时,直到假定Pod过期如果依然没有收到AddPod的请求,则将过期假定Pod删除

func (cache *schedulerCache) FinishBinding(pod *v1.Pod) error {
	return cache.finishBinding(pod, time.Now())
}

// finishBinding exists to make tests determinitistic by injecting now as an argument
func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error {
	key, err := framework.GetPodKey(pod)
	if err != nil {
		return err
	}

	cache.mu.RLock()
	defer cache.mu.RUnlock()

	klog.V(5).Infof("Finished binding for pod %v. Can be expired.", key)
	currState, ok := cache.podStates[key]
	if ok && cache.assumedPods.Has(key) {	// assumedPods 已经保存进去了
		dl := now.Add(cache.ttl)	// 计算cache过期时间
		currState.bindingFinished = true
		currState.deadline = &dl	// 设置过期时间, cache里会有协程定时清理过期的cache.
	}
	return nil
}
9. AddPod

当Pod Bind成功,kube-scheduler会收到消息,然后调用AddPod确认调度结果

// 当Pod Bind成功,会收到informer消息,然后调用AddPod更新调度结果
func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
	key, err := framework.GetPodKey(pod)
	if err != nil {
		return err
	}

	cache.mu.Lock()
	defer cache.mu.Unlock()

	currState, ok := cache.podStates[key]
	switch {
	case ok && cache.assumedPods.Has(key):	// pod有assumed记录
		if currState.pod.Spec.NodeName != pod.Spec.NodeName {	// 真实分配的node和assumed的不一致, 什么路径会走到这里.
			// The pod was added to a different node than it was assumed to.
			klog.Warningf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
			// Clean this up.
			if err = cache.removePod(currState.pod); err != nil {
				klog.Errorf("removing pod error: %v", err)
			}
			cache.addPod(pod)
		}
		delete(cache.assumedPods, key)	// 删除assumed记录
		cache.podStates[key].deadline = nil	// 清理cache过期时间
		cache.podStates[key].pod = pod
	case !ok:
		// Pod was expired. We should add it back.	// 走到这里说明cache过期被清理了, 最终还是以informer下发的为准.
		cache.addPod(pod)
		ps := &podState{
			pod: pod,
		}
		cache.podStates[key] = ps
	default:
		return fmt.Errorf("pod %v was already in added state", key)
	}
	return nil
}
10. RemovePod

kube-scheduler收到删除Pod的请求,如果Pod在Cache中,就需要调用RemovePod

// RemoveNode removes a node from the cache's tree.
// The node might still have pods because their deletion events didn't arrive
// yet. Those pods are considered removed from the cache, being the node tree
// the source of truth.
// However, we keep a ghost node with the list of pods until all pod deletion
// events have arrived. A ghost node is skipped from snapshots.
func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
	cache.mu.Lock()
	defer cache.mu.Unlock()

	n, ok := cache.nodes[node.Name]
	if !ok {
		return fmt.Errorf("node %v is not found", node.Name)
	}
	n.info.RemoveNode()
	// We remove NodeInfo for this node only if there aren't any pods on this node.
	// We can't do it unconditionally, because notifications about pods are delivered
	// in a different watch, and thus can potentially be observed later, even though
	// they happened before node removal.
	if len(n.info.Pods) == 0 {
		cache.removeNodeInfoFromList(node.Name)
	} else {
		cache.moveNodeInfoToHead(node.Name)
	}
	if err := cache.nodeTree.removeNode(node); err != nil {
		return err
	}	// nodeTree里删除node
	cache.removeNodeImageStates(node)
	return nil
}
11. AddNode

有新的Node添加到集群,kube-scheduler调用该接口通知Cache。

// 通知cache新增node
func (cache *schedulerCache) AddNode(node *v1.Node) *framework.NodeInfo {
	cache.mu.Lock()
	defer cache.mu.Unlock()

	n, ok := cache.nodes[node.Name]
	if !ok {
		n = newNodeInfoListItem(framework.NewNodeInfo())
		cache.nodes[node.Name] = n
	} else {
		cache.removeNodeImageStates(n.info.Node())
	}
	cache.moveNodeInfoToHead(node.Name)

	cache.nodeTree.addNode(node)	// 新增node
	cache.addNodeImageStates(node, n.info)
	n.info.SetNode(node)
	return n.info.Clone()
}
12. RemoveNode

Node从集群中删除,kube-scheduler调用该接口通知Cache。

// RemoveNode removes a node from the cache's tree.
// The node might still have pods because their deletion events didn't arrive
// yet. Those pods are considered removed from the cache, being the node tree
// the source of truth.
// However, we keep a ghost node with the list of pods until all pod deletion
// events have arrived. A ghost node is skipped from snapshots.
func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
	cache.mu.Lock()
	defer cache.mu.Unlock()

	n, ok := cache.nodes[node.Name]
	if !ok {
		return fmt.Errorf("node %v is not found", node.Name)
	}
	n.info.RemoveNode()
	// We remove NodeInfo for this node only if there aren't any pods on this node.
	// We can't do it unconditionally, because notifications about pods are delivered
	// in a different watch, and thus can potentially be observed later, even though
	// they happened before node removal.
	if len(n.info.Pods) == 0 {
		cache.removeNodeInfoFromList(node.Name)
	} else {
		cache.moveNodeInfoToHead(node.Name)
	}
	if err := cache.nodeTree.removeNode(node); err != nil {
		return err
	}	// nodeTree里删除node
	cache.removeNodeImageStates(node)
	return nil
}
13. 后期清理协程函数run

Cache有自己的协程,就是用来清理假定到期的Pod

func (cache *schedulerCache) run() {
	go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
}	// 启动一个协程,每隔一段时间,就去运行函数,直到接收到结束信号就关闭这个协程
// 清理过期的pod
func (cache *schedulerCache) cleanupExpiredAssumedPods() {
	cache.cleanupAssumedPods(time.Now())
}

// cleanupAssumedPods exists for making test deterministic by taking time as input argument.
// It also reports metrics on the cache size for nodes, pods, and assumed pods.
func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
	cache.mu.Lock()
	defer cache.mu.Unlock()
	defer cache.updateMetrics()

	// The size of assumedPods should be small
	for key := range cache.assumedPods {
		ps, ok := cache.podStates[key]
		if !ok {
			klog.Fatal("Key found in assumed set but not in podStates. Potentially a logical error.")
		}
		if !ps.bindingFinished {
			klog.V(5).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.",
				ps.pod.Namespace, ps.pod.Name)
			continue
		}
		if now.After(*ps.deadline) {
			klog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name)
			if err := cache.expirePod(key, ps); err != nil {
				klog.Errorf("ExpirePod failed for %s: %v", key, err)
			}
		}
	}
}
// 清理过期的pod
func (cache *schedulerCache) expirePod(key string, ps *podState) error {
	if err := cache.removePod(ps.pod); err != nil {
		return err
	}
	delete(cache.assumedPods, key)
	delete(cache.podStates, key)
	return nil
}
14. UpdateSnapshot

增量更新快照

Cache存在的核心目的就是给kube-scheduler提供Node镜像,让kube-scheduler根据Node的状态调度新的Pod。

// UpdateSnapshot takes a snapshot of cached NodeInfo map. This is called at
// beginning of every scheduling cycle.	// 每次调度前都会调用这个方法更新快照
// The snapshot only includes Nodes that are not deleted at the time this function is called.
// nodeinfo.Node() is guaranteed to be not nil for all the nodes in the snapshot.
// This function tracks generation number of NodeInfo and updates only the
// entries of an existing snapshot that have changed after the snapshot was taken.
func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
	cache.mu.Lock()		// 获取锁更新快照 (其实调度只有一个线程在执行)
	defer cache.mu.Unlock()
	balancedVolumesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes)

	// Get the last generation of the snapshot.
	snapshotGeneration := nodeSnapshot.generation

	// NodeInfoList and HavePodsWithAffinityNodeInfoList must be re-created if a node was added
	// or removed from the cache.
	updateAllLists := false		// 是否需要更新snapshot快照里所有的node
	// HavePodsWithAffinityNodeInfoList must be re-created if a node changed its
	// status from having pods with affinity to NOT having pods with affinity or the other
	// way around.
	updateNodesHavePodsWithAffinity := false	// 更新有亲和性的pod
	// HavePodsWithRequiredAntiAffinityNodeInfoList must be re-created if a node changed its
	// status from having pods with required anti-affinity to NOT having pods with required
	// anti-affinity or the other way around.
	updateNodesHavePodsWithRequiredAntiAffinity := false	// 更新反亲和性的pod

	// Start from the head of the NodeInfo doubly linked list and update snapshot
	// of NodeInfos updated after the last snapshot.
	for node := cache.headNode; node != nil; node = node.next {
		if node.info.Generation <= snapshotGeneration {	// 小于这个版本, 代表node信息没有过期, 不需要更新.
			// all the nodes are updated before the existing snapshot. We are done.
			break
		}
		if balancedVolumesEnabled && node.info.TransientInfo != nil {	// 好像和存储Volumes相关
			// Transient scheduler info is reset here.
			node.info.TransientInfo.ResetTransientSchedulerInfo()
		}
		if np := node.info.Node(); np != nil {
			existing, ok := nodeSnapshot.nodeInfoMap[np.Name]
			if !ok {
				updateAllLists = true
				existing = &framework.NodeInfo{}
				nodeSnapshot.nodeInfoMap[np.Name] = existing
			}
			clone := node.info.Clone()	// 把cache里的node信息拷贝一份放到clone里
			// We track nodes that have pods with affinity, here we check if this node changed its
			// status from having pods with affinity to NOT having pods with affinity or the other
			// way around.	// 判断快照里的亲和性有没有变化
			if (len(existing.PodsWithAffinity) > 0) != (len(clone.PodsWithAffinity) > 0) {
				updateNodesHavePodsWithAffinity = true
			}
			if (len(existing.PodsWithRequiredAntiAffinity) > 0) != (len(clone.PodsWithRequiredAntiAffinity) > 0) {
				updateNodesHavePodsWithRequiredAntiAffinity = true
			}
			// We need to preserve the original pointer of the NodeInfo struct since it
			// is used in the NodeInfoList, which we may not update.
			*existing = *clone	// 将拷贝的更新到快照中
		}
	}
	// Update the snapshot generation with the latest NodeInfo generation.
	if cache.headNode != nil {
		nodeSnapshot.generation = cache.headNode.info.Generation	//快照的版本修改为head的版本, head的是最大的版本
	}
	// 如果nodeSnapshot中node的数量大于nodeTree中的数量,说明有node被删除
	// Comparing to pods in nodeTree.
	// Deleted nodes get removed from the tree, but they might remain in the nodes map
	// if they still have non-deleted Pods.
	if len(nodeSnapshot.nodeInfoMap) > cache.nodeTree.numNodes {
		cache.removeDeletedNodesFromSnapshot(nodeSnapshot)
		updateAllLists = true
	}
	// 如果需要更新Node的全量或者亲和性或者反亲和性列表,则更新nodeSnapshot中的Node列表
	if updateAllLists || updateNodesHavePodsWithAffinity || updateNodesHavePodsWithRequiredAntiAffinity {
		cache.updateNodeInfoSnapshotList(nodeSnapshot, updateAllLists)
	}

	if len(nodeSnapshot.nodeInfoList) != cache.nodeTree.numNodes {
		errMsg := fmt.Sprintf("snapshot state is not consistent, length of NodeInfoList=%v not equal to length of nodes in tree=%v "+
			", length of NodeInfoMap=%v, length of nodes in cache=%v"+
			", trying to recover",
			len(nodeSnapshot.nodeInfoList), cache.nodeTree.numNodes,
			len(nodeSnapshot.nodeInfoMap), len(cache.nodes))
		klog.Error(errMsg)
		// We will try to recover by re-creating the lists for the next scheduling cycle, but still return an
		// error to surface the problem, the error will likely cause a failure to the current scheduling cycle.
		cache.updateNodeInfoSnapshotList(nodeSnapshot, true)
		return fmt.Errorf(errMsg)
	}

	return nil
}
// 更新快照
func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll bool) {
	snapshot.havePodsWithAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
	snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
	if updateAll {
		// Take a snapshot of the nodes order in the tree
		snapshot.nodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
		nodesList, err := cache.nodeTree.list()		// node tree的list视图
		if err != nil {
			klog.Error(err)
		}
		for _, nodeName := range nodesList {	// 遍历所有的node
			if nodeInfo := snapshot.nodeInfoMap[nodeName]; nodeInfo != nil {
				snapshot.nodeInfoList = append(snapshot.nodeInfoList, nodeInfo)
				if len(nodeInfo.PodsWithAffinity) > 0 {
					snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, nodeInfo)
				}
				if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 {
					snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo)
				}
			} else {
				klog.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen.", nodeName)
			}
		}
	} else {
		for _, nodeInfo := range snapshot.nodeInfoList {
			if len(nodeInfo.PodsWithAffinity) > 0 {
				snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, nodeInfo)
			}
			if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 {
				snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo)
			}
		}
	}
}

版本号

// nextGeneration: Let's make sure history never forgets the name...
// Increments the generation number monotonically ensuring that generation numbers never collide.
// Collision of the generation numbers would be particularly problematic if a node was deleted and
// added back with the same name. See issue#63262.
func nextGeneration() int64 {
   return atomic.AddInt64(&generation, 1)
}
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐