k8s controller-manager之daemonset源码分析
源码基于k8s 1.9 release源码目录结构cmd/kube-controller-manager/app/core.go// service Controller的启动代码,包含很多其它controller的启动/pkg/controller/daemon.├── BUILD├── doc.go├── daemon_controller.go // dae...
·
源码基于k8s 1.9 release
源码目录结构
cmd/kube-controller-manager/app/core.go // daemonset Controller的启动代码,包含很多其它controller的启动
/pkg/controller/daemon
.
├── BUILD
├── doc.go
├── daemon_controller.go // daemonset的核心代码逻辑
├── daemon_controller_test.go
├── OWNERS
├── update.go
├── update_test.go
daemonset的初始化如下代码所示:
func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInformer, historyInformer appsinformers.ControllerRevisionInformer, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, kubeClient clientset.Interface) (*DaemonSetsController, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
// TODO: remove the wrapper when every clients have moved to use the clientset.
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")})
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
if err := metrics.RegisterMetricAndTrackRateLimiterUsage("daemon_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
return nil, err
}
}
// 初始化DaemonSetsController结构体
dsc := &DaemonSetsController{
kubeClient: kubeClient,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
podControl: controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
},
crControl: controller.RealControllerRevisionControl{
KubeClient: kubeClient,
},
burstReplicas: BurstReplicas,
expectations: controller.NewControllerExpectations(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"),
suspendedDaemonPods: map[string]sets.String{},
}
// daemonset的add、update、delete操作取key加入队列,形式为namespace/name
daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ds := obj.(*extensions.DaemonSet)
glog.V(4).Infof("Adding daemon set %s", ds.Name)
dsc.enqueueDaemonSet(ds)
},
UpdateFunc: func(old, cur interface{}) {
oldDS := old.(*extensions.DaemonSet)
curDS := cur.(*extensions.DaemonSet)
glog.V(4).Infof("Updating daemon set %s", oldDS.Name)
dsc.enqueueDaemonSet(curDS)
},
DeleteFunc: dsc.deleteDaemonset,
})
dsc.dsLister = daemonSetInformer.Lister()
// 判断是否缓存完成
dsc.dsStoreSynced = daemonSetInformer.Informer().HasSynced
//
historyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dsc.addHistory,
UpdateFunc: dsc.updateHistory,
DeleteFunc: dsc.deleteHistory,
})
dsc.historyLister = historyInformer.Lister()
dsc.historyStoreSynced = historyInformer.Informer().HasSynced
// Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete
// more pods until all the effects (expectations) of a daemon set's create/delete have been observed.
// add: 1、在daemonset controller启动时先判断pod是否正在删除(以pod的deletionTimestamp字段判断,不为空则表示当前的pod状态为Terminating);
// 2、检测pod的OwnerReference,如果该pod不是由daemonset控制的,则忽略这个pod,如果通过pod的OwnerReference找到的daemonset的uid与
// pod的uid不一致,则忽略这个pod,否则将这个daemonset加入queue
// update: 1、如果新旧两个Pod的资源版本号一致,忽略返回;
// 2、如果新旧两个Pod的OwnerReference不一致且旧OwnerReference不为空,执行add的第二步操作,如果返回值不为空就加入queue;
// 3、如果新的pod的OwnerReference不为空,执行add的第二步操作,如果返回值不为空就加入queue,否则返回;
// 4、如果旧的pod的状态不是ready而新的状态是ready并且DaemoSet Pod启动可用所需的最小的秒数大于0,则延时最小准备时间+1秒将daemonset加入queue并返回;
// 5、根据新pod的label获取对应的ds列表,如果新旧pod的label或OwnerReference不一致,则将ds列表依次加入queue;
// delete: 1、获取pod的OwnerReference,如果为空但pod的node name不为空,requeueSuspendedDaemonPods函数根据pod的nodename查找那些ds不应调度到该节点上的ds,将其入queue
// 2、如果OwnerReference不为空,则根据OwnerReference获取对应的ds,如果ds为空,requeueSuspendedDaemonPods函数根据pod的nodename查找哪些ds不应调度到该节点上的ds,将其入queue,如果ds不为空,将ds入队
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dsc.addPod,
UpdateFunc: dsc.updatePod,
DeleteFunc: dsc.deletePod,
})
dsc.podLister = podInformer.Lister()
dsc.podStoreSynced = podInformer.Informer().HasSynced
// add:获取ds列表,分别经过nodeShouldRunDaemonPod函数判定该新增的node有哪些ds需要运行pod,将这些ds加入queue;
// update: 判断更改前后的node的label、Taints污点策略、conditions节点状态是否一致,如果一致则忽略此次watch;
// 获取ds列表,对每个ds计算更改前后的node的nodeShouldRunDaemonPod,如果两者的应该被调度参数和应该继续运行参数结果不一致,则将该ds入queue;
// 关于nodeShouldRunDaemonPod函数在下面的篇幅介绍
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dsc.addNode,
UpdateFunc: dsc.updateNode,
},
)
dsc.nodeStoreSynced = nodeInformer.Informer().HasSynced
dsc.nodeLister = nodeInformer.Lister()
// daemonset的业务逻辑函数
dsc.syncHandler = dsc.syncDaemonSet
// daemonset的入队函数
dsc.enqueueDaemonSet = dsc.enqueue
dsc.enqueueDaemonSetRateLimited = dsc.enqueueRateLimited
return dsc, nil
}
damonset的业务逻辑函数syncDaemonSet
func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
startTime := time.Now()
defer func() {
glog.V(4).Infof("Finished syncing daemon set %q (%v)", key, time.Now().Sub(startTime))
}()
// 获取key的namespace和name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
// 根据namespace和name获取ds,如果ds已经被删除,则删除store缓存中的ds
ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
if errors.IsNotFound(err) {
glog.V(3).Infof("daemon set has been deleted %v", key)
dsc.expectations.DeleteExpectations(key)
return nil
}
if err != nil {
return fmt.Errorf("unable to retrieve ds %v from store: %v", key, err)
}
everything := metav1.LabelSelector{}
if reflect.DeepEqual(ds.Spec.Selector, &everything) {
dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, SelectingAllReason, "This daemon set is selecting all pods. A non-empty selector is required.")
return nil
}
// Don't process a daemon set until all its creations and deletions have been processed.
// For example if daemon set foo asked for 3 new daemon pods in the previous call to manage,
// then we do not want to call manage on foo until the daemon pods have been created.
dsKey, err := controller.KeyFunc(ds)
if err != nil {
return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
}
// Construct histories of the DaemonSet, and get the hash of current history
// history为ds的版本,每个版本都存储在名为 ControllerRevision 的资源中,每个ControllerRevision中存储了相应DaemonSet版本的注解和模板,kubectl rollout undo 采用特定ControllerRevision,并用ControllerRevision
// 中存储的模板代替DaemonSet的模板。kubectl rollout undo 相当于通过其他命令(如 kubectl edit 或 kubectl apply)将 DaemonSet 模板更新至先前的版本。ControllerRevision 仅在 Kubernetes 1.7 及以后的版本中可用。
// 注意 DaemonSet 版本只会向前滚动。 也就是说,回滚完成后,所回滚到的 ControllerRevision 版本号 (.revision 字段) 会增加。 例如,如果用户在系统中有版本 1 和版本 2,并从版本 2 回滚到版本 1 ,带有 .revision: 1
// 的ControllerRevision 将变为 .revision: 3。
// constructHistory函数通过给定的ds查找其所有的histories,并更新当前history版本号,如果没有则生成history,返回当前history和旧的history列表
cur, old, err := dsc.constructHistory(ds)
if err != nil {
return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
}
// 获取label中的controller-revision-hash的hash值
hash := cur.Labels[extensions.DefaultDaemonSetUniqueLabelKey]
// 如果ds正在被删除或者调用ds的调用SatisfiedExpectations返回为false,将更新ds的status,如果ControlleeExpectations中待add和del都<=0,或者expectations已经超过5分钟,已经Expire,则返回true
if ds.DeletionTimestamp != nil || !dsc.expectations.SatisfiedExpectations(dsKey) {
// Only update status.
// 1、调用getNodesToDaemonPods返回一个map,key为hostname,val为pod列表,该函数找出那些属于给定ds的pod;
// 2、循环node列表,通过nodeShouldRunDaemonPod函数判端ds的pod是否需要在当前node上运行,如果是则desiredNumberScheduled++,并结合1中获取的map判断pod是否已经在node上调度了,如果是则currentNumberScheduled++,
// 根据node name从1的map中获取pod列表,以创建时间排序取第一个pod(防止多个pod在node上),如果该pod的状态为ready,则numberReady++,根据minReadySeconds判断pod是否已经是available,如果是numberAvailable++,
// 判断ds中的.spec.templateGeneration字段值和pod的label中的pod-template-generation值是否相等或者hash和pod的label中的controller-revision-hash相等,只要一个满足则updatedNumberScheduled++,
// 如果pod不需要在当前node上运行,但是已经调度到node上,则numberMisscheduled++,
// 3、所有的desiredNumberScheduled、currentNumberScheduled、numberReady、numberAvailable、updatedNumberScheduled处理完毕后将这些值更新到ds
return dsc.updateDaemonSetStatus(ds, hash)
}
// manage函数管理调度和运行在节点上的ds的pods,经过逻辑判断,找出哪些ds的pod应该在节点上运行但是还未运行,哪些pod不应该在节点上运行了但是已经运行,将调用syncNodes函数删掉这些pod或者是创建这些pod;
// manage函数具体后面分析
err = dsc.manage(ds, hash)
if err != nil {
return err
}
// Process rolling updates if we're ready.
if dsc.expectations.SatisfiedExpectations(dsKey) {
switch ds.Spec.UpdateStrategy.Type {
case extensions.OnDeleteDaemonSetStrategyType:
case extensions.RollingUpdateDaemonSetStrategyType:
err = dsc.rollingUpdate(ds, hash)
}
if err != nil {
return err
}
}
// 清理history
err = dsc.cleanupHistory(ds, old)
if err != nil {
return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err)
}
return dsc.updateDaemonSetStatus(ds, hash)
}
manage函数管理节点上的daemon pod,根据ds查找出哪些daemon pod是否应该在节点上调度和运行,最后通过syncNodes函数去执行
func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet, hash string) error {
// 调用getNodesToDaemonPods返回一个map,key为hostname,val为pod列表,该函数找出那些属于给定ds的pod
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}
// For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
// pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
nodeList, err := dsc.nodeLister.List(labels.Everything())
if err != nil {
return fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err)
}
var nodesNeedingDaemonPods, podsToDelete []string
var failedPodsObserved int
// 循环node列表,找出那些不希望在该节点上运行的daemon pod但已经运行,将它们杀掉和那些希望在节点上运行但还没有生成,则生成daemon pod
for _, node := range nodeList {
// nodeShouldRunDaemonPod根据一组前置条件检测k8s的node和当前ds(相当于schedule的预测),返回3个有效的bool类型的变量
// wantToRun为true表示ds希望在该节点上运行pod并忽略node的一些条件,例如DiskPressure或者insufficient resource(资源不足);
// shouldSchedule为true表示daemon pod应该调度到该节点上,但当前还没运行在该节点上;
// shouldContinueRunning为true表示daemon pod应该继续运行在当前node上(该pod已经在该节点上运行了)
// nodeShouldRunDaemonPod函数会在后面专门介绍
wantToRun, shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds)
if err != nil {
continue
}
// 获取该节点上的指定ds的pod列表
daemonPods, exists := nodeToDaemonPods[node.Name]
dsKey, _ := cache.MetaNamespaceKeyFunc(ds)
// 移除那些想运行但不应该调度在该节点上的ds的pod
dsc.removeSuspendedDaemonPods(node.Name, dsKey)
switch {
case wantToRun && !shouldSchedule:
// 如果daemon pod不该调度到该节点上,将它加入暂停列表
dsc.addSuspendedDaemonPods(node.Name, dsKey)
case shouldSchedule && !exists:
// 如果daemon pod应该在该节点上运行但目前还没有,将生成daemon pod,添加node name到nodesNeedingDaemonPods数组
nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
case shouldContinueRunning:
// 如果daemon pod状态是Failed,删掉它;
// 如果node上没有daemon pod运行了,将在下一次的同步中生成它
var daemonPodsRunning []*v1.Pod
for _, pod := range daemonPods {
// 排除被删除的pod
if pod.DeletionTimestamp != nil {
continue
}
if pod.Status.Phase == v1.PodFailed {
msg := fmt.Sprintf("Found failed daemon pod %s/%s on node %s, will try to kill it", pod.Namespace, pod.Name, node.Name)
glog.V(2).Infof(msg)
// Emit an event so that it's discoverable to users.
dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedDaemonPodReason, msg)
podsToDelete = append(podsToDelete, pod.Name)
failedPodsObserved++
} else {
// 正在在节点上跑的daemon pod列表
daemonPodsRunning = append(daemonPodsRunning, pod)
}
}
// 删掉多余的daemon pod,保留生成时间最长的那个pod
if len(daemonPodsRunning) > 1 {
sort.Sort(podByCreationTimestamp(daemonPodsRunning))
for i := 1; i < len(daemonPodsRunning); i++ {
// 将需要删除的pod的pod name加入podsToDelete数组
podsToDelete = append(podsToDelete, daemonPodsRunning[i].Name)
}
}
case !shouldContinueRunning && exists:
// If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node.
// 如果daemon pod不应该跑在该节点上,但目前已经在该节点上,删除ds在该节点上的所有daemon pod(这种情况类似于,给节点打标签,之后改节点标签)
for _, pod := range daemonPods {
podsToDelete = append(podsToDelete, pod.Name)
}
}
}
// Label new pods using the hash label value of the current history when creating them
// syncNodes删掉给定的pods并生成新的daemonset pods到指定节点上
// syncNodes会批量生成ds的nodesNeedingDaemonPods中的Pod,批量的初始size为1,每次成功生成后,将size翻倍直到所有pods完全生成。在每次批量中都使用go routine协程去生成每个pod
// syncNodes会启动go routine协程去删除podsToDelete中的每个pod
if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
return err
}
// Throw an error when the daemon pods fail, to use ratelimiter to prevent kill-recreate hot loop
if failedPodsObserved > 0 {
return fmt.Errorf("deleted %d failed pods of DaemonSet %s/%s", failedPodsObserved, ds.Namespace, ds.Name)
}
return nil
}
nodeShouldRunDaemonPod会对node和daemon pod做一些列的判定,决定pod是否应该调度在该节点上,主要根据pod的Tolerate和node资源使用情况返回wantToRun, shouldSchedule, shouldContinueRunning
func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *extensions.DaemonSet) (wantToRun, shouldSchedule, shouldContinueRunning bool, err error) {
newPod := NewPod(ds, node.Name)
// Because these bools require an && of all their required conditions, we start
// with all bools set to true and set a bool to false if a condition is not met.
// A bool should probably not be set to true after this line.
wantToRun, shouldSchedule, shouldContinueRunning = true, true, true
// If the daemon set specifies a node name, check that it matches with node.Name.
// 如果ds指定的Node name不存在,则wantToRun, shouldSchedule, shouldContinueRunning都为false
if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) {
return false, false, false, nil
}
// simulate给pod加上一些node的前置条件,加上一些Toleratoins,防止node出现这些问题时被node controller杀死
// node.kubernetes.io/not-ready: NoExecute,node.kubernetes.io/unreachable: NoExecute,
// node.kubernetes.io/disk-pressure: NoSchedule,node.kubernetes.io/memory-pressure: NoSchedule
// 如果该pod是CriticalPod并且ExperimentalCriticalPodAnnotation enable,添加node.kubernetes.io/out-of-disk: NoSchedule
// 获取node上运行的所有pod(排除ds的pod),计算nodeInfo,包括所有pod请求的端口、亲和性和资源,如cpu、内存、gpu、存储等
// simulate会调用Predicates函数(GeneralPredicates和PodToleratesNodeTaints),判定deamon pod是否可以在该pod上调度,对于CriticalPod的Predicates也有区别对待
reasons, nodeInfo, err := dsc.simulate(newPod, node, ds)
if err != nil {
glog.Warningf("DaemonSet Predicates failed on node %s for ds '%s/%s' due to unexpected error: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, err)
return false, false, false, err
}
var insufficientResourceErr error
for _, r := range reasons {
glog.V(4).Infof("DaemonSet Predicates failed on node %s for ds '%s/%s' for reason: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, r.GetReason())
switch reason := r.(type) {
case *predicates.InsufficientResourceError:
insufficientResourceErr = reason
case *predicates.PredicateFailureError:
var emitEvent bool
// we try to partition predicates into two partitions here: intentional on the part of the operator and not.
switch reason {
// intentional
case
predicates.ErrNodeSelectorNotMatch,
predicates.ErrPodNotMatchHostName,
predicates.ErrNodeLabelPresenceViolated,
// this one is probably intentional since it's a workaround for not having
// pod hard anti affinity.
predicates.ErrPodNotFitsHostPorts:
return false, false, false, nil
case predicates.ErrTaintsTolerationsNotMatch:
// DaemonSet is expected to respect taints and tolerations
fitsNoExecute, _, err := predicates.PodToleratesNodeNoExecuteTaints(newPod, nil, nodeInfo)
if err != nil {
return false, false, false, err
}
if !fitsNoExecute {
return false, false, false, nil
}
wantToRun, shouldSchedule = false, false
// unintentional
case
predicates.ErrDiskConflict,
predicates.ErrVolumeZoneConflict,
predicates.ErrMaxVolumeCountExceeded,
predicates.ErrNodeUnderMemoryPressure,
predicates.ErrNodeUnderDiskPressure:
// wantToRun and shouldContinueRunning are likely true here. They are
// absolutely true at the time of writing the comment. See first comment
// of this method.
shouldSchedule = false
emitEvent = true
// unexpected
case
predicates.ErrPodAffinityNotMatch,
predicates.ErrServiceAffinityViolated:
glog.Warningf("unexpected predicate failure reason: %s", reason.GetReason())
return false, false, false, fmt.Errorf("unexpected reason: DaemonSet Predicates should not return reason %s", reason.GetReason())
default:
glog.V(4).Infof("unknown predicate failure reason: %s", reason.GetReason())
wantToRun, shouldSchedule, shouldContinueRunning = false, false, false
emitEvent = true
}
if emitEvent {
dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedPlacementReason, "failed to place pod on %q: %s", node.ObjectMeta.Name, reason.GetReason())
}
}
}
// only emit this event if insufficient resource is the only thing
// preventing the daemon pod from scheduling
if shouldSchedule && insufficientResourceErr != nil {
dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedPlacementReason, "failed to place pod on %q: %s", node.ObjectMeta.Name, insufficientResourceErr.Error())
shouldSchedule = false
}
return
}
更多推荐
已为社区贡献18条内容
所有评论(0)