k8s 之 cache缓存机制
Scheduler进行完成调度流程的决策之后,为pod选择了一个node节点,此时还未进行后续的Bind操作,但实际上资源已经分配给该pod, 此时会先更新到本地缓存(),然后再等待apiserver进行数据的广播并且最终被kubelet来进行实际的调度。快照是对Cache某一时刻的复制,随着时间的推移,Cache的状态在持续更新,kube-scheduler在调度一个Pod的时候需要获取Cach
K8S里的几种缓存
-
lruCache
可以设置一个固定的大小, 每次新增的时候会淘汰最晚未被使用的元素
-
singleCache
-
LRUExpireCache
底层基于lruCache实现的, 添加元素的时候会计算超时时间, 获取元素的时候判断元素有没有过期, 和guava cache类似
-
client-go cache
ThreadSafeStore : 内部是一个有读写锁的map, 可以根据对象计算索引key, 然后根据索引key找到缓存的对象.
-
scheduler cache
重点讲一下 Scheduler Cache
缓存的常见就是加快处理速度, 平时我们使用缓存缓存关注的问题一般是:
-
缓存和数据源的数据一致性
-
缓存读写的线程安全
-
缓存读写性能
带着问题我们看下scheduler cache是怎么设计的
1. 缓存的目的
-
cache谁?
kubernetes的信息都存储在etcd中,而访问kubernetes的etcd的唯一方法是通过apiserver,所以准确的说是缓存etcd的信息。
scheduler cache还有个作用就是bind操作会先写cache, 再异步调用api server接口bind, 这种先写cache再异步写库的方式也是可以借鉴的.
-
cache哪些信息?
调度器需要将Pod调度到满足需求的Node上,所以cache至少要缓存Pod和Node信息,这样才能提高kube-scheduler访问apiserver的性能。
-
为什么要cache?
为了加快调度。Cache不仅缓存了Pod和Node信息,关键的bind操作也是依赖缓存实现的.
2. scheduler cache
type schedulerCache struct {
stop <-chan struct{} // 用来通知schedulerCache停止的chan,schedulerCache有自己的协程
ttl time.Duration // 假定Pod一旦完成绑定,就要在指定的时间内确认,否则就会超时,ttl就是指定的过期时间,默认30秒
period time.Duration // 定时清理过期的Pod,定时周期默认是1秒钟
// This mutex guards all fields within this cache struct.
mu sync.RWMutex // cache不是线程安全的, 对cache的读写要先获取锁.
// a set of assumed pod keys. // 假定绑定的pod key set 集合
// The key could further be used to get an entry in podStates.
assumedPods sets.String
// a map from pod key to podState.
podStates map[string]*podState // 根据pod key找到podState
nodes map[string]*nodeInfoListItem
// headNode points to the most recently updated NodeInfo in "nodes". It is the
// head of the linked list.
headNode *nodeInfoListItem // headNode 指向最近更新的nodeInfo
nodeTree *nodeTree
// A map from image name to its imageState.
imageStates map[string]*imageState
}
schedulerCache = chan + 锁 + map + 双向链表 + 头指针的一个结构体
每次node更新, 都会把更新后的node版本加一, 然后移动到双向链表的第一个.
已经有了nodes(map类型)变量,为什么还要再加一个双向链表的变量 ?
- nodes是一个map, 用于根据Node的名字快速找到Node.
- 链表是按照顺序排过序的, 每次node有更新, 都会版本加一, 并且移动到最前面. 首先链表的移动效率比较高, 比数组和切片快. 然后再增量更新的时候, 从第一个开始遍历, 因为版本号是从高到低的, 只要遍历到版本号比快照小的就可以结束了.
- 我理解的这是一个以空间换时间的数据结构
3. snapshot快照机制
当scheduler获取一个待调度的pod, 则需要从Cache中获取当前集群中的快照数据(当前此时集群中node的统计信息), 用于后续调度流程中使用
快照是对Cache某一时刻的复制,随着时间的推移,Cache的状态在持续更新,kube-scheduler在调度一个Pod的时候需要获取Cache的快照。相比于直接访问Cache,快照可以解决如下几个问题:
- 快照不会再有任何变化,可以理解为只读,那么访问快照不需要加锁保证保证原子性
- 无锁化的设计, 快照和Cache让读写分离, 可以避免使用锁造成Cache访问性能下降
- 每次调度前都会更新快照信息. 如果k8s集群信息变更, 快照的数据可能不是最新鲜的, 但是不影响调度, 同一时间只有一个进程在调度. 并且如果bind失败还会进行重试的. 保证最终一致性.
4. 增量更新
// Snapshot is a snapshot of cache NodeInfo and NodeTree order. The scheduler takes a
// snapshot at the beginning of each scheduling cycle and uses it for its operations in that cycle.
type Snapshot struct { // Snapshot是node和nodeTree的快照, 在每次调度开始的时候获取一个快照.
// nodeInfoMap a map of node name to a snapshot of its NodeInfo.
nodeInfoMap map[string]*framework.NodeInfo
// nodeInfoList is the list of nodes as ordered in the cache's nodeTree.
nodeInfoList []*framework.NodeInfo
// havePodsWithAffinityNodeInfoList is the list of nodes with at least one pod declaring affinity terms.
havePodsWithAffinityNodeInfoList []*framework.NodeInfo
// havePodsWithRequiredAntiAffinityNodeInfoList is the list of nodes with at least one pod declaring
// required anti-affinity terms.
havePodsWithRequiredAntiAffinityNodeInfoList []*framework.NodeInfo
generation int64 // 调用Cache.UpdateSnapshot的时候只需要更新快照之后有变化的Node即可
}
快照 = map + nodeList + 版本号组成的一个结构体
5. 过期机制
Scheduler进行完成调度流程的决策之后,为pod选择了一个node节点,此时还未进行后续的Bind操作,但实际上资源已经分配给该pod, 此时会先更新到本地缓存(),然后再等待apiserver进行数据的广播并且最终被kubelet来进行实际的调度
但如果因为某些原因导致pod后续的事件都没有被监听到,则需要将对应的pod资源进行删除,并删除对node资源的占用
6. cache的生命周期
在scheduler cache中pod会一个内部的状态机:initial、Assumed、Expired、Added、Delete,实际上所有的操作都是围绕着该状态机在进行,状态如下:
- Initial: 初始化完成从apiserver监听到(也可能是监听到一个已经完成分配的pod)
- Assumed: 在scheduler中完成分配最终完成bind操作的pod(未实际分配)
- Added: 首先监听到事件可能是一个已经完成实际调度的pod(即从initial到Added),其次可能是经过调度决策后,被实际调度(从Assumed到Added),最后则是后续pod的更新(Update), Added语义上其实就是往Cache中添加一个Pod状态
- Deleted: 某个pod被监听到删除事件,只有被Added过的数据才可以被Deleted
- Expired: Assumed pod经过一段时间后没有感知到真正的分配事件被删除
源码注释中关于Pod状态机的注释 :
// State Machine of a pod's events in scheduler's cache:
//
//
// +-------------------------------------------+ +----+
// | Add | | |
// | | | | Update
// + Assume Add v v |
//Initial +--------> Assumed +------------+---> Added <--+
// ^ + + | +
// | | | | |
// | | | Add | | Remove
// | | | | |
// | | | + |
// +----------------+ +-----------> Expired +----> Deleted
// Forget Expire
//
上面总结中描述了kube-scheduler大致调度一个Pod的流程
数据结构
1. nodeTree
type NodeTree struct {
tree map[string]*nodeArray // 存储zone和zone下面的node信息
zones []string // 存储zones
zoneIndex int
numNodes int
mu sync.RWMutex
}
2. nodeArray
nodeArray负责存储一个zone下面的所有node节点,并且通过lastIndex记录当前zone分配的节点索引
type nodeArray struct {
nodes []string
lastIndex int
}
3. Cache接口
interface.go
从接口方法可以看出, Cache主要的方法是围绕pod和node展开的, 而Pod和Node源数据信息存储在etcd中.
type Cache interface {
// NodeCount returns the number of nodes in the cache.
// DO NOT use outside of tests. // 只给测试用的方法, 有点怪.
NodeCount() int
// PodCount returns the number of pods in the cache (including those from deleted nodes).
// DO NOT use outside of tests.
PodCount() (int, error)
// AssumePod assumes a pod scheduled and aggregates the pod's information into its node.
// The implementation also decides the policy to expire pod before being confirmed (receiving Add event).
// After expiration, its information would be subtracted.
AssumePod(pod *v1.Pod) error // 更新缓存, 假定绑定
// FinishBinding signals that cache for assumed pod can be expired
FinishBinding(pod *v1.Pod) error // Bind是一个异步过程,当Bind完成后需要调用这个接口更新Cache,
// ForgetPod removes an assumed pod from cache.
ForgetPod(pod *v1.Pod) error // 调用AssumePod后如果遇到错误失败了,就需要调用这个接口
// AddPod either confirms a pod if it's assumed, or adds it back if it's expired.
// If added back, the pod's information would be added again.
AddPod(pod *v1.Pod) error // 添加Pod既确认了假定的Pod,也会将假定过期的Pod重新添加回来
// UpdatePod removes oldPod's information and adds newPod's information.
UpdatePod(oldPod, newPod *v1.Pod) error // 更新pod, 内部其实是删除了再添加pod
// RemovePod removes a pod. The pod's information would be subtracted from assigned node.
RemovePod(pod *v1.Pod) error // 删除pod
// GetPod returns the pod from the cache with the same namespace and the
// same name of the specified pod.
GetPod(pod *v1.Pod) (*v1.Pod, error) // 从缓存中获取pod
// IsAssumedPod returns true if the pod is assumed and not expired.
IsAssumedPod(pod *v1.Pod) (bool, error) // pod是否假定绑定了
// AddNode adds overall information about node.
// It returns a clone of added NodeInfo object.
AddNode(node *v1.Node) *framework.NodeInfo // 下面的都是node相关的接口, 添加node
// UpdateNode updates overall information about node.
// It returns a clone of updated NodeInfo object.
UpdateNode(oldNode, newNode *v1.Node) *framework.NodeInfo // 更新node
// RemoveNode removes overall information about node.
RemoveNode(node *v1.Node) error // 删除node
// UpdateSnapshot updates the passed infoSnapshot to the current contents of Cache.
// The node info contains aggregated information of pods scheduled (including assumed to be)
// on this node.
// The snapshot only includes Nodes that are not deleted at the time this function is called.
// nodeinfo.Node() is guaranteed to be not nil for all the nodes in the snapshot.
UpdateSnapshot(nodeSnapshot *Snapshot) error // 更新信息到cache里, 这里有个快照的概念.
// Dump produces a dump of the current cache.
Dump() *Dump // 也是用于调试的
}
4. NodeInfo
node的各种信息
// NodeInfo is node level aggregated information.
type NodeInfo struct {
// Overall node information.
node *v1.Node // 通用的node结构体
// Pods running on the node. // 当前node上运行中的pod
Pods []*PodInfo
// The subset of pods with affinity.
PodsWithAffinity []*PodInfo // 亲和性
// The subset of pods with required anti-affinity.
PodsWithRequiredAntiAffinity []*PodInfo // 反亲和性
// Ports allocated on the node.
UsedPorts HostPortInfo // 已分配的端口
// Total requested resources of all pods on this node. This includes assumed
// pods, which scheduler has sent for binding, but may not be scheduled yet.
Requested *Resource // 已分配的所有资源的总和
// Total requested resources of all pods on this node with a minimum value
// applied to each container's CPU and memory requests. This does not reflect
// the actual resource requests for this node, but is used to avoid scheduling
// many zero-request pods onto one node.
NonZeroRequested *Resource
// We store allocatedResources (which is Node.Status.Allocatable.*) explicitly
// as int64, to avoid conversions and accessing map.
Allocatable *Resource
// ImageStates holds the entry of an image if and only if this image is on the node. The entry can be used for
// checking an image's existence and advanced usage (e.g., image locality scheduling policy) based on the image
// state information.
ImageStates map[string]*ImageStateSummary
// TransientInfo holds the information pertaining to a scheduling cycle. This will be destructed at the end of
// scheduling cycle.
// TODO: @ravig. Remove this once we have a clear approach for message passing across predicates and priorities.
TransientInfo *TransientSchedulerInfo
// Whenever NodeInfo changes, generation is bumped.
// This is used to avoid cloning it if the object didn't change.
Generation int64
}
5. scheduler Cache
type schedulerCache struct {
stop <-chan struct{} // 用来通知schedulerCache停止的chan,schedulerCache有自己的协程
ttl time.Duration // 假定Pod一旦完成绑定,就要在指定的时间内确认,否则就会超时,ttl就是指定的过期时间,默认30秒
period time.Duration // 定时清理过期的Pod,定时周期默认是1秒钟
// This mutex guards all fields within this cache struct.
mu sync.RWMutex // cache的线程安全是靠这个读写锁保护的
// a set of assumed pod keys. // 假定绑定的pod key set 集合
// The key could further be used to get an entry in podStates.
assumedPods sets.String
// a map from pod key to podState.
podStates map[string]*podState // 根据pod key找到podState
nodes map[string]*nodeInfoListItem
// headNode points to the most recently updated NodeInfo in "nodes". It is the
// head of the linked list.
headNode *nodeInfoListItem // headNode 指向最近更新的nodeInfo
nodeTree *nodeTree
// A map from image name to its imageState.
imageStates map[string]*imageState
}
6. AssumePod
当kube-scheduler找到最优的Node调度Pod的时候会调用AssumePod假定Pod调度,在通过另一个协程异步Bind。假定其实就是预先占住资源,kube-scheduler调度下一个Pod的时候不会把这部分资源抢走,直到收到确认消息AddPod确认调度成功,或者是Bind失败ForgetPod取消.
// 同步更新缓存, 异步bind 就叫assume
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
if _, ok := cache.podStates[key]; ok { // 已经在缓存里, 报错, 正常应该不会进来, 防御型的报错.
return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)
}
cache.addPod(pod)
ps := &podState{
pod: pod,
}
cache.podStates[key] = ps // 更新pod state
cache.assumedPods.Insert(key) // assumedPods保存所有绑定的pod
return nil
}
7. ForgetPod
假定Pod预先占用了一些资源,如果之后的操作(比如Bind)有什么错误,就需要取消假定调度,释放出资源
// 假定Pod预先占用了一些资源,如果之后的操作有什么错误,就需要取消假定bind,释放出资源
func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
currState, ok := cache.podStates[key]
if ok && currState.pod.Spec.NodeName != pod.Spec.NodeName {
return fmt.Errorf("pod %v was assumed on %v but assigned to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
}
switch {
// Only assumed pod can be forgotten.
case ok && cache.assumedPods.Has(key):
err := cache.removePod(pod)
if err != nil {
return err
}
delete(cache.assumedPods, key)
delete(cache.podStates, key)
default:
return fmt.Errorf("pod %v wasn't assumed so cannot be forgotten", key)
}
return nil
}
8. FinishBinding
当假定Pod绑定完成后,需要调用FinishBinding通知Cache开始计时,直到假定Pod过期如果依然没有收到AddPod的请求,则将过期假定Pod删除
func (cache *schedulerCache) FinishBinding(pod *v1.Pod) error {
return cache.finishBinding(pod, time.Now())
}
// finishBinding exists to make tests determinitistic by injecting now as an argument
func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.RLock()
defer cache.mu.RUnlock()
klog.V(5).Infof("Finished binding for pod %v. Can be expired.", key)
currState, ok := cache.podStates[key]
if ok && cache.assumedPods.Has(key) { // assumedPods 已经保存进去了
dl := now.Add(cache.ttl) // 计算cache过期时间
currState.bindingFinished = true
currState.deadline = &dl // 设置过期时间, cache里会有协程定时清理过期的cache.
}
return nil
}
9. AddPod
当Pod Bind成功,kube-scheduler会收到消息,然后调用AddPod确认调度结果
// 当Pod Bind成功,会收到informer消息,然后调用AddPod更新调度结果
func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
currState, ok := cache.podStates[key]
switch {
case ok && cache.assumedPods.Has(key): // pod有assumed记录
if currState.pod.Spec.NodeName != pod.Spec.NodeName { // 真实分配的node和assumed的不一致, 什么路径会走到这里.
// The pod was added to a different node than it was assumed to.
klog.Warningf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
// Clean this up.
if err = cache.removePod(currState.pod); err != nil {
klog.Errorf("removing pod error: %v", err)
}
cache.addPod(pod)
}
delete(cache.assumedPods, key) // 删除assumed记录
cache.podStates[key].deadline = nil // 清理cache过期时间
cache.podStates[key].pod = pod
case !ok:
// Pod was expired. We should add it back. // 走到这里说明cache过期被清理了, 最终还是以informer下发的为准.
cache.addPod(pod)
ps := &podState{
pod: pod,
}
cache.podStates[key] = ps
default:
return fmt.Errorf("pod %v was already in added state", key)
}
return nil
}
10. RemovePod
kube-scheduler收到删除Pod的请求,如果Pod在Cache中,就需要调用RemovePod
// RemoveNode removes a node from the cache's tree.
// The node might still have pods because their deletion events didn't arrive
// yet. Those pods are considered removed from the cache, being the node tree
// the source of truth.
// However, we keep a ghost node with the list of pods until all pod deletion
// events have arrived. A ghost node is skipped from snapshots.
func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
cache.mu.Lock()
defer cache.mu.Unlock()
n, ok := cache.nodes[node.Name]
if !ok {
return fmt.Errorf("node %v is not found", node.Name)
}
n.info.RemoveNode()
// We remove NodeInfo for this node only if there aren't any pods on this node.
// We can't do it unconditionally, because notifications about pods are delivered
// in a different watch, and thus can potentially be observed later, even though
// they happened before node removal.
if len(n.info.Pods) == 0 {
cache.removeNodeInfoFromList(node.Name)
} else {
cache.moveNodeInfoToHead(node.Name)
}
if err := cache.nodeTree.removeNode(node); err != nil {
return err
} // nodeTree里删除node
cache.removeNodeImageStates(node)
return nil
}
11. AddNode
有新的Node添加到集群,kube-scheduler调用该接口通知Cache。
// 通知cache新增node
func (cache *schedulerCache) AddNode(node *v1.Node) *framework.NodeInfo {
cache.mu.Lock()
defer cache.mu.Unlock()
n, ok := cache.nodes[node.Name]
if !ok {
n = newNodeInfoListItem(framework.NewNodeInfo())
cache.nodes[node.Name] = n
} else {
cache.removeNodeImageStates(n.info.Node())
}
cache.moveNodeInfoToHead(node.Name)
cache.nodeTree.addNode(node) // 新增node
cache.addNodeImageStates(node, n.info)
n.info.SetNode(node)
return n.info.Clone()
}
12. RemoveNode
Node从集群中删除,kube-scheduler调用该接口通知Cache。
// RemoveNode removes a node from the cache's tree.
// The node might still have pods because their deletion events didn't arrive
// yet. Those pods are considered removed from the cache, being the node tree
// the source of truth.
// However, we keep a ghost node with the list of pods until all pod deletion
// events have arrived. A ghost node is skipped from snapshots.
func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
cache.mu.Lock()
defer cache.mu.Unlock()
n, ok := cache.nodes[node.Name]
if !ok {
return fmt.Errorf("node %v is not found", node.Name)
}
n.info.RemoveNode()
// We remove NodeInfo for this node only if there aren't any pods on this node.
// We can't do it unconditionally, because notifications about pods are delivered
// in a different watch, and thus can potentially be observed later, even though
// they happened before node removal.
if len(n.info.Pods) == 0 {
cache.removeNodeInfoFromList(node.Name)
} else {
cache.moveNodeInfoToHead(node.Name)
}
if err := cache.nodeTree.removeNode(node); err != nil {
return err
} // nodeTree里删除node
cache.removeNodeImageStates(node)
return nil
}
13. 后期清理协程函数run
Cache有自己的协程,就是用来清理假定到期的Pod
func (cache *schedulerCache) run() {
go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
} // 启动一个协程,每隔一段时间,就去运行函数,直到接收到结束信号就关闭这个协程
// 清理过期的pod
func (cache *schedulerCache) cleanupExpiredAssumedPods() {
cache.cleanupAssumedPods(time.Now())
}
// cleanupAssumedPods exists for making test deterministic by taking time as input argument.
// It also reports metrics on the cache size for nodes, pods, and assumed pods.
func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
cache.mu.Lock()
defer cache.mu.Unlock()
defer cache.updateMetrics()
// The size of assumedPods should be small
for key := range cache.assumedPods {
ps, ok := cache.podStates[key]
if !ok {
klog.Fatal("Key found in assumed set but not in podStates. Potentially a logical error.")
}
if !ps.bindingFinished {
klog.V(5).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.",
ps.pod.Namespace, ps.pod.Name)
continue
}
if now.After(*ps.deadline) {
klog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name)
if err := cache.expirePod(key, ps); err != nil {
klog.Errorf("ExpirePod failed for %s: %v", key, err)
}
}
}
}
// 清理过期的pod
func (cache *schedulerCache) expirePod(key string, ps *podState) error {
if err := cache.removePod(ps.pod); err != nil {
return err
}
delete(cache.assumedPods, key)
delete(cache.podStates, key)
return nil
}
14. UpdateSnapshot
增量更新快照
Cache存在的核心目的就是给kube-scheduler提供Node镜像,让kube-scheduler根据Node的状态调度新的Pod。
// UpdateSnapshot takes a snapshot of cached NodeInfo map. This is called at
// beginning of every scheduling cycle. // 每次调度前都会调用这个方法更新快照
// The snapshot only includes Nodes that are not deleted at the time this function is called.
// nodeinfo.Node() is guaranteed to be not nil for all the nodes in the snapshot.
// This function tracks generation number of NodeInfo and updates only the
// entries of an existing snapshot that have changed after the snapshot was taken.
func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
cache.mu.Lock() // 获取锁更新快照 (其实调度只有一个线程在执行)
defer cache.mu.Unlock()
balancedVolumesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes)
// Get the last generation of the snapshot.
snapshotGeneration := nodeSnapshot.generation
// NodeInfoList and HavePodsWithAffinityNodeInfoList must be re-created if a node was added
// or removed from the cache.
updateAllLists := false // 是否需要更新snapshot快照里所有的node
// HavePodsWithAffinityNodeInfoList must be re-created if a node changed its
// status from having pods with affinity to NOT having pods with affinity or the other
// way around.
updateNodesHavePodsWithAffinity := false // 更新有亲和性的pod
// HavePodsWithRequiredAntiAffinityNodeInfoList must be re-created if a node changed its
// status from having pods with required anti-affinity to NOT having pods with required
// anti-affinity or the other way around.
updateNodesHavePodsWithRequiredAntiAffinity := false // 更新反亲和性的pod
// Start from the head of the NodeInfo doubly linked list and update snapshot
// of NodeInfos updated after the last snapshot.
for node := cache.headNode; node != nil; node = node.next {
if node.info.Generation <= snapshotGeneration { // 小于这个版本, 代表node信息没有过期, 不需要更新.
// all the nodes are updated before the existing snapshot. We are done.
break
}
if balancedVolumesEnabled && node.info.TransientInfo != nil { // 好像和存储Volumes相关
// Transient scheduler info is reset here.
node.info.TransientInfo.ResetTransientSchedulerInfo()
}
if np := node.info.Node(); np != nil {
existing, ok := nodeSnapshot.nodeInfoMap[np.Name]
if !ok {
updateAllLists = true
existing = &framework.NodeInfo{}
nodeSnapshot.nodeInfoMap[np.Name] = existing
}
clone := node.info.Clone() // 把cache里的node信息拷贝一份放到clone里
// We track nodes that have pods with affinity, here we check if this node changed its
// status from having pods with affinity to NOT having pods with affinity or the other
// way around. // 判断快照里的亲和性有没有变化
if (len(existing.PodsWithAffinity) > 0) != (len(clone.PodsWithAffinity) > 0) {
updateNodesHavePodsWithAffinity = true
}
if (len(existing.PodsWithRequiredAntiAffinity) > 0) != (len(clone.PodsWithRequiredAntiAffinity) > 0) {
updateNodesHavePodsWithRequiredAntiAffinity = true
}
// We need to preserve the original pointer of the NodeInfo struct since it
// is used in the NodeInfoList, which we may not update.
*existing = *clone // 将拷贝的更新到快照中
}
}
// Update the snapshot generation with the latest NodeInfo generation.
if cache.headNode != nil {
nodeSnapshot.generation = cache.headNode.info.Generation //快照的版本修改为head的版本, head的是最大的版本
}
// 如果nodeSnapshot中node的数量大于nodeTree中的数量,说明有node被删除
// Comparing to pods in nodeTree.
// Deleted nodes get removed from the tree, but they might remain in the nodes map
// if they still have non-deleted Pods.
if len(nodeSnapshot.nodeInfoMap) > cache.nodeTree.numNodes {
cache.removeDeletedNodesFromSnapshot(nodeSnapshot)
updateAllLists = true
}
// 如果需要更新Node的全量或者亲和性或者反亲和性列表,则更新nodeSnapshot中的Node列表
if updateAllLists || updateNodesHavePodsWithAffinity || updateNodesHavePodsWithRequiredAntiAffinity {
cache.updateNodeInfoSnapshotList(nodeSnapshot, updateAllLists)
}
if len(nodeSnapshot.nodeInfoList) != cache.nodeTree.numNodes {
errMsg := fmt.Sprintf("snapshot state is not consistent, length of NodeInfoList=%v not equal to length of nodes in tree=%v "+
", length of NodeInfoMap=%v, length of nodes in cache=%v"+
", trying to recover",
len(nodeSnapshot.nodeInfoList), cache.nodeTree.numNodes,
len(nodeSnapshot.nodeInfoMap), len(cache.nodes))
klog.Error(errMsg)
// We will try to recover by re-creating the lists for the next scheduling cycle, but still return an
// error to surface the problem, the error will likely cause a failure to the current scheduling cycle.
cache.updateNodeInfoSnapshotList(nodeSnapshot, true)
return fmt.Errorf(errMsg)
}
return nil
}
// 更新快照
func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll bool) {
snapshot.havePodsWithAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
if updateAll {
// Take a snapshot of the nodes order in the tree
snapshot.nodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
nodesList, err := cache.nodeTree.list() // node tree的list视图
if err != nil {
klog.Error(err)
}
for _, nodeName := range nodesList { // 遍历所有的node
if nodeInfo := snapshot.nodeInfoMap[nodeName]; nodeInfo != nil {
snapshot.nodeInfoList = append(snapshot.nodeInfoList, nodeInfo)
if len(nodeInfo.PodsWithAffinity) > 0 {
snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, nodeInfo)
}
if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 {
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo)
}
} else {
klog.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen.", nodeName)
}
}
} else {
for _, nodeInfo := range snapshot.nodeInfoList {
if len(nodeInfo.PodsWithAffinity) > 0 {
snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, nodeInfo)
}
if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 {
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo)
}
}
}
}
版本号
// nextGeneration: Let's make sure history never forgets the name...
// Increments the generation number monotonically ensuring that generation numbers never collide.
// Collision of the generation numbers would be particularly problematic if a node was deleted and
// added back with the same name. See issue#63262.
func nextGeneration() int64 {
return atomic.AddInt64(&generation, 1)
}
更多推荐
所有评论(0)