第十四课 k8s源码学习和二次开发原理篇-调度器原理

tags:

  • k8s
  • 源码学习

categories:

  • 源码学习
  • 二次开发

第一节 调度器工作原理

1.1 调度器调度流程

  1. kube-scheduler 是 kubernetes 的核心组件之一,主要负责整个集群资源的调度功能,根据特定的调度算法和策略,将 Pod 调度到最优的工作节点上面去,从而更加合理、更加充分的利用集群的资源,这也是我们选择使用 kubernetes 一个非常重要的理由。(这里是1.19的源码)
  2. 默认情况下,kube-scheduler 提供的默认调度器能够满足我们绝大多数的要求,我们前面和大家接触的示例也基本上用的默认的策略,都可以保证我们的 Pod 可以被分配到资源充足的节点上运行。
  3. 但是在实际的线上项目中,可能我们自己会比 kubernetes 更加了解我们自己的应用,比如我们希望一个 Pod 只能运行在特定的几个节点上,或者这几个节点只能用来运行特定类型的应用,这就需要我们的调度器能够可控。
  4. kube-scheduler 的主要作用就是根据特定的调度算法和调度策略将 Pod 调度到合适的 Node 节点上去,是一个独立的二进制程序,启动之后会一直监听 API Server,获取到 PodSpec.NodeName 为空的 Pod,对每个 Pod 都会创建一个 binding。
    在这里插入图片描述
  5. 这个过程在我们看来好像比较简单,但在实际的生产环境中,需要考虑的问题就有很多了:
    • 如何保证全部的节点调度的公平性?要知道并不是所有节点资源配置一定都是一样的
    • 如何保证每个节点都能被分配资源?
    • 集群资源如何能够被高效利用?
    • 集群资源如何才能被最大化使用?
    • 如何保证 Pod 调度的性能和效率?
    • 用户是否可以根据自己的实际需求定制自己的调度策略?
  6. 考虑到实际环境中的各种复杂情况,kubernetes 的调度器采用插件化的形式实现,可以方便用户进行定制或者二次开发,我们可以自定义一个调度器并以插件形式和 kubernetes 进行集成。
  7. kubernetes 调度器的源码位于 kubernetes/pkg/scheduler 中,其中 Scheduler 创建和运行的核心程序,对应的代码在 pkg/scheduler/scheduler.go,如果要查看 kube-scheduler 的入口程序,对应的代码在 cmd/kube-scheduler/scheduler.go
  8. 调度主要分为以下几个部分:
    • 首先是预选过程,过滤掉不满足条件的节点,这个过程称为 Predicates(过滤)
    • 然后是优选过程,对通过的节点按照优先级排序,称之为 Priorities(打分)
    • 最后从中选择优先级最高的节点,如果中间任何一步骤有错误,就直接返回错误
  9. Predicates 阶段首先遍历全部节点,过滤掉不满足条件的节点,属于强制性规则,这一阶段输出的所有满足要求的节点将被记录并作为第二阶段的输入,如果所有的节点都不满足条件,那么 Pod 将会一直处于 Pending 状态,直到有节点满足条件,在这期间调度器会不断的重试。
  10. 在部署应用的时候,如果发现有 Pod 一直处于 Pending 状态,那么就是没有满足调度条件的节点,这个时候可以去检查下节点资源是否可用。
  11. Priorities 阶段即再次对节点进行筛选,如果有多个节点都满足条件的话,那么系统会按照节点的优先级(priorites)大小对节点进行排序,最后选择优先级最高的节点来部署 Pod 应用。
  12. 下面是调度过程的简单示意图:
    在这里插入图片描述
  13. 更详细的流程是这样的:
    • 首先,客户端通过 API Server 的 REST API 或者 kubectl 工具创建 Pod 资源
    • API Server 收到用户请求后,存储相关数据到 etcd 数据库中
    • 调度器监听 API Server 查看到还未被调度(bind)的 Pod 列表,循环遍历地为每个 Pod 尝试分配节点,这个分配过程就是我们上面提到的两个阶段:
      • 预选阶段(Predicates),过滤节点,调度器用一组规则过滤掉不符合要求的 Node 节点,比如 Pod 设置了资源的 request,那么可用资源比 Pod 需要的资源少的主机显然就会被过滤掉
      • 优选阶段(Priorities),为节点的优先级打分,将上一阶段过滤出来的 Node 列表进行打分,调度器会考虑一些整体的优化策略,比如把 Deployment 控制的多个 Pod 副本尽量分布到不同的主机上,使用最低负载的主机等等策略
    • 经过上面的阶段过滤后选择打分最高的 Node 节点和 Pod 进行 binding 操作,然后将结果存储到 etcd 中 最后被选择出来的 Node 节点对应的 kubelet 去执行创建 Pod 的相关操作(当然也是 watch APIServer 发现的)。

1.2 调度器调度框架

  1. 目前调度器已经全部通过插件的方式实现了调度框架,默认开启的调度插件如以下代码所示:
// pkg/scheduler/algorithmprovider/registry.go
func getDefaultConfig() *schedulerapi.Plugins {
    return &schedulerapi.Plugins{
        QueueSort: &schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: queuesort.Name},
            },
        },
        PreFilter: &schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: noderesources.FitName},
                {Name: nodeports.Name},
                {Name: podtopologyspread.Name},
                {Name: interpodaffinity.Name},
                {Name: volumebinding.Name},
            },
        },
        Filter: &schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: nodeunschedulable.Name},
                {Name: noderesources.FitName},
                {Name: nodename.Name},
                {Name: nodeports.Name},
                {Name: nodeaffinity.Name},
                {Name: volumerestrictions.Name},
                {Name: tainttoleration.Name},
                {Name: nodevolumelimits.EBSName},
                {Name: nodevolumelimits.GCEPDName},
                {Name: nodevolumelimits.CSIName},
                {Name: nodevolumelimits.AzureDiskName},
                {Name: volumebinding.Name},
                {Name: volumezone.Name},
                {Name: podtopologyspread.Name},
                {Name: interpodaffinity.Name},
            },
        },
        PostFilter: &schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: defaultpreemption.Name},
            },
        },
        PreScore: &schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: interpodaffinity.Name},
                {Name: podtopologyspread.Name},
                {Name: tainttoleration.Name},
            },
        },
        Score: &schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: noderesources.BalancedAllocationName, Weight: 1},
                {Name: imagelocality.Name, Weight: 1},
                {Name: interpodaffinity.Name, Weight: 1},
                {Name: noderesources.LeastAllocatedName, Weight: 1},
                {Name: nodeaffinity.Name, Weight: 1},
                {Name: nodepreferavoidpods.Name, Weight: 10000},
                // Weight is doubled because:// - This is a score coming from user preference.// - It makes its signal comparable to NodeResourcesLeastAllocated.
                {Name: podtopologyspread.Name, Weight: 2},
                {Name: tainttoleration.Name, Weight: 1},
            },
        },
        Reserve: &schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: volumebinding.Name},
            },
        },
        PreBind: &schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: volumebinding.Name},
            },
        },
        Bind: &schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: defaultbinder.Name},
            },
        },
    }
}
  1. 从上面我们可以看出调度器的一系列算法由各种插件在调度的不同阶段来完成,下面我们就先来了解下调度框架。
  2. 调度框架定义了一组扩展点,用户可以实现扩展点定义的接口来定义自己的调度逻辑(我们称之为扩展),并将扩展注册到扩展点上,调度框架在执行调度工作流时,遇到对应的扩展点时,将调用用户注册的扩展。调度框架在预留扩展点时,都是有特定的目的,有些扩展点上的扩展可以改变调度程序的决策方法,有些扩展点上的扩展只是发送一个通知。
  3. 我们知道每当调度一个 Pod 时,都会按照两个过程来执行:调度过程和绑定过程
  4. 调度过程为 Pod 选择一个合适的节点,绑定过程则将调度过程的决策应用到集群中(也就是在被选定的节点上运行 Pod),将调度过程和绑定过程合在一起,称之为调度上下文(scheduling context)。需要注意的是调度过程是同步运行的(同一时间点只为一个 Pod 进行调度),绑定过程可异步运行(同一时间点可并发为多个 Pod 执行绑定)。
  5. 调度过程和绑定过程遇到如下情况时会中途退出:
    • 调度程序认为当前没有该 Pod 的可选节点
    • 内部错误
  6. 这个时候,该 Pod 将被放回到 待调度队列,并等待下次重试。

1.3 调度器的拓展点

  1. 下图展示了调度框架中的调度上下文及其中的扩展点,一个扩展可以注册多个扩展点,以便可以执行更复杂的有状态的任务。
    在这里插入图片描述
  2. Sort 扩展用于对 Pod 的待调度队列进行排序,以决定先调度哪个 Pod,Sort 扩展本质上只需要实现一个方法 Less(Pod1, Pod2) 用于比较两个 Pod 谁更优先获得调度即可,同一时间点只能有一个 Sort 插件生效。
  3. Pre-filter 扩展用于对 Pod 的信息进行预处理,或者检查一些集群或 Pod 必须满足的前提条件,然后将其存入缓存中待 Filter 扩展点执行的时候使用,如果 pre-filter 返回了 error,则调度过程终止。
  4. Filter 扩展用于排除那些不能运行该 Pod 的节点,对于每一个节点,调度器将按顺序执行 filter 扩展;如果任何一个 filter 将节点标记为不可选,则余下的 filter 扩展将不会被执行,调度器可以同时对多个节点执行 filter 扩展。
  5. Post-filter 如果在 Filter 扩展点全部节点都被过滤掉了,没有合适的节点进行调度,才会执行 PostFilter 扩展点,如果启用了 Pod 抢占特性,那么会在这个扩展点进行抢占操作,可以用于 logs/metircs。
  6. PreScore 扩展会对 Score 扩展点的数据做一些预处理操作,然后将其存入缓存中待 Score 扩展点执行的时候使用。
  7. Score 扩展用于为所有可选节点进行打分,调度器将针对每一个节点调用每个 Sore 扩展,评分结果是一个范围内的整数,代表最小和最大分数。在 normalize scoring 阶段,调度器将会把每个 score 扩展对具体某个节点的评分结果和该扩展的权重合并起来,作为最终评分结果。
  8. Normalize score 扩展在调度器对节点进行最终排序之前修改每个节点的评分结果,注册到该扩展点的扩展在被调用时,将获得同一个插件中的 score 扩展的评分结果作为参数,调度框架每执行一次调度,都将调用所有插件中的一个 normalize score 扩展一次。
  9. Reserve 是一个通知性质的扩展点,有状态的插件可以使用该扩展点来获得节点上为 Pod 预留的资源,该事件发生在调度器将 Pod 绑定到节点之前,目的是避免调度器在等待 Pod 与节点绑定的过程中调度新的 Pod 到节点上时,发生实际使用资源超出可用资源的情况(因为绑定 Pod 到节点上是异步发生的)。这是调度过程的最后一个步骤,Pod 进入 reserved 状态以后,要么在绑定失败时触发 Unreserve 扩展,要么在绑定成功时,由 Post-bind 扩展结束绑定过程。
  10. Permit 扩展在每个 Pod 调度周期的最后调用,用于阻止或者延迟 Pod 与节点的绑定。Permit 扩展可以做下面三件事中的一项:
    • approve(批准):当所有的 permit 扩展都 approve 了 Pod 与节点的绑定,调度器将继续执行绑定过程
    • deny(拒绝):如果任何一个 permit 扩展 deny 了 Pod 与节点的绑定,Pod 将被放回到待调度队列,此时将触发 Unreserve 扩展
    • wait(等待):如果一个 permit 扩展返回了 wait,则 Pod 将保持在 permit 阶段,直到被其他扩展 approve,如果超时事件发生,wait 状态变成 deny,Pod 将被放回到待调度队列,此时将触发 Unreserve 扩展
  11. WaitOnPermit 扩展与 Permit 扩展点配合使用实现延时调度功能(内部默认实现的)。
  12. Pre-bind 扩展用于在 Pod 绑定之前执行某些逻辑。例如,pre-bind 扩展可以将一个基于网络的数据卷挂载到节点上,以便 Pod 可以使用。如果任何一个 pre-bind 扩展返回错误,Pod 将被放回到待调度队列,此时将触发 Unreserve 扩展。
  13. Bind 扩展用于将 Pod 绑定到节点上:
    • 只有所有的 pre-bind 扩展都成功执行了,bind 扩展才会执行
    • 调度框架按照 bind 扩展注册的顺序逐个调用 bind 扩展
    • 具体某个 bind 扩展可以选择处理或者不处理该 Pod
    • 如果某个 bind 扩展处理了该 Pod 与节点的绑定,余下的 bind 扩展将被忽略
  14. Post-bind 是一个通知性质的扩展:
    • Post-bind 扩展在 Pod 成功绑定到节点上之后被动调用
    • Post-bind 扩展是绑定过程的最后一个步骤,可以用来执行资源清理的动作
  15. Unreserve 是一个通知性质的扩展(上图没画出来),如果为 Pod 预留了资源,Pod 又在被绑定过程中被拒绝绑定,则 unreserve 扩展将被调用。Unreserve 扩展应该释放已经为 Pod 预留的节点上的计算资源。在一个插件中,reserve 扩展和 unreserve 扩展应该成对出现
  16. 如果我们要实现自己的插件,必须向调度框架注册插件并完成配置,另外还必须实现扩展点接口,对应的扩展点接口我们可以在源码 pkg/scheduler/framework/v1alpha1/interface.go 文件中找到。接下来我们会对整个 Kubernetes Scheduler 的调度流程以及其中的每一个扩展点进行代码层面的分析。最后,会在调度算法详解 中分析常见的调度插件的实现逻辑。

第二节 调度器启动流程分析

2.1 调度器调度参数

  1. kube-scheduler 组件有很多可以配置的启动参数,其核心也是通过 cobra 开发的一个 CLI 工具,所以要掌握 kube-scheduler 的启动配置,需要我们对 cobra 有一个基本的了解,kube-scheduler 主要有两种类型的配置参数:
    • 调度策略相关的参数,例如启用那些调度插件,以及给某些调度插件配置一些参数
    • 通用参数,就是一些普通的参数,比如配置服务端口号等等
  2. 这里我们主要是了解调度器的核心调度框架和算法,所以主要关注第一种参数即可。

2.2 调度器参数配置

  1. kube-scheduler 的启动入口位于 cmd/kube-scheduler/scheduler.go 文件,该文件中就包含一个 main 入口函数:
// cmd/kube-scheduler/scheduler.go
func main() {
	rand.Seed(time.Now().UnixNano())
	
	// 初始化 Cobra.Command 对象
	command := app.NewSchedulerCommand()
	
  // 将命令行参数进行标准化(_替换成-)
	pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
  // 初始化日志
	logs.InitLogs()
	defer logs.FlushLogs()
	
  // 执行命令
	if err := command.Execute(); err != nil {
		os.Exit(1)
	}
}
  1. 其中最核心的就是通过 app.NewSchedulerCommand() 或者一个 Cobra 的 Command 对象,然后最下面调用 command.Execute() 函数执行这个命令,所以核心就是 NewSchedulerCommand 函数的实现:
// cmd/kube-scheduler/app/server.go

// Option 配置一个 framework.Registry
type Option func(runtime.Registry) error

// NewSchedulerCommand 使用默认参数和 registryOptions 创建一个 *cobra.Command 对象
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
	// 获取默认的配置信息
  opts, err := options.NewOptions()
	if err != nil {
		klog.Fatalf("unable to initialize command options: %v", err)
	}

	cmd := &cobra.Command{
		Use: "kube-scheduler",
		Long: `......`,
    // 真正执行的函数入口
		Run: func(cmd *cobra.Command, args []string) {
			if err := runCommand(cmd, opts, registryOptions...); err != nil {
				fmt.Fprintf(os.Stderr, "%v\n", err)
				os.Exit(1)
			}
		},
		......
	}
	......
	return cmd
}
  1. 如果我们熟悉 Cobra 的基本用法的话应该知道当我们执行 Cobra 的命令的时候,实际上真正执行的是 Cobra.Command 对象中的 Run 函数,也就是这里的 runCommand 函数:
// cmd/kube-scheduler/app/server.go
if err := runCommand(cmd, opts, registryOptions...); err != nil {
	fmt.Fprintf(os.Stderr, "%v\n", err)
	os.Exit(1)
}
  1. 其中有两个非常重要的参数 opts 与 registryOptions,opts 是一个 Options 对象,该参数包含所有的运行 Scheduler 需要的参数:
// cmd/kube-scheduler/app/options/options.go

// Options 拥有所有运行 Scheduler 需要的参数
type Options struct {
  // 默认值,如果设置了 ConfigFile 或 InsecureServing 中的值,这些设置将被覆盖
  // KubeSchedulerConfiguration 类似与 Deployment 都是k8s的资源对象,这是这个对象是用于配置调度器使用的
	ComponentConfig kubeschedulerconfig.KubeSchedulerConfiguration

	SecureServing           *apiserveroptions.SecureServingOptionsWithLoopback
  // 可为 Healthz 和 metrics 配置两个不安全的标志
  CombinedInsecureServing *CombinedInsecureServingOptions
	Authentication          *apiserveroptions.DelegatingAuthenticationOptions
	Authorization           *apiserveroptions.DelegatingAuthorizationOptions
	Metrics                 *metrics.Options
	Logs                    *logs.Options
	Deprecated              *DeprecatedOptions

	// ConfigFile 指定是调度程序服务的配置文件的位置
	ConfigFile string

  // WriteConfigTo 将默认配置写入的文件路径
	WriteConfigTo string

	Master string
}
  1. 其中第一个参数 ComponentConfig kubeschedulerconfig.KubeSchedulerConfiguration 是我们需要重点关注的用于配置调度策略相关参数的地方,通过 NewOptions() 来获取默认配置参数:
// cmd/kube-scheduler/app/options/options.go

// NewOptions 返回一个默认的调度器应用 options 参数。
func NewOptions() (*Options, error) {
	cfg, err := newDefaultComponentConfig()
	if err != nil {
		return nil, err
	}
	......
	o := &Options{
		ComponentConfig: *cfg,
		......
	}
	......
	return o, nil
}
  1. 上面是初始化 Options 的函数,这里我们只关心核心的 ComponentConfig 参数,该参数是通过函数 newDefaultComponentConfig() 来生成默认的配置:
// cmd/kube-scheduler/app/options/options.go

func newDefaultComponentConfig() (*kubeschedulerconfig.KubeSchedulerConfiguration, error) {
	versionedCfg := kubeschedulerconfigv1beta1.KubeSchedulerConfiguration{}
	// 可用于配置是否开启 Debug 相关特性,比如 profiling
  versionedCfg.DebuggingConfiguration = *configv1alpha1.NewRecommendedDebuggingConfiguration()

	kubeschedulerscheme.Scheme.Default(&versionedCfg)
	cfg := kubeschedulerconfig.KubeSchedulerConfiguration{}
	if err := kubeschedulerscheme.Scheme.Convert(&versionedCfg, &cfg, nil); err != nil {
		return nil, err
	}
	return &cfg, nil
}
  1. 上面的函数会创建一个默认的 KubeSchedulerConfiguration 对象,用于配置调度器,默认配置参数通过 Options 构造完成后,在构造整个 cobra.Command 命令后会为其添加命令行参数:
// cmd/kube-scheduler/app/server.go

// NewSchedulerCommand 使用默认参数和 registryOptions 创建一个 *cobra.Command 对象
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
	// 获取默认的配置信息
  opts, err := options.NewOptions()
	if err != nil {
		klog.Fatalf("unable to initialize command options: %v", err)
	}

	cmd := &cobra.Command{
		......
	}
	fs := cmd.Flags()
  // 调用 Options 的 Flags 方法
	namedFlagSets := opts.Flags()
	verflag.AddFlags(namedFlagSets.FlagSet("global"))
	globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())
	// 将默认的所有参数添加到 cmd.Flags 中去
  for _, f := range namedFlagSets.FlagSets {
		fs.AddFlagSet(f)
	}
	......
	return cmd
}
  1. 其中的 opts.Flags() 方法就是将默认的 Options 配置转换成命令行参数的函数:
// cmd/kube-scheduler/app/options/options.go

func (o *Options) Flags() (nfs cliflag.NamedFlagSets) {
	fs := nfs.FlagSet("misc")
	fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, `The path to the configuration file. The following flags can overwrite fields in this file:
  --address
  --port
  --use-legacy-policy-config
  --policy-configmap
  --policy-config-file
  --algorithm-provider`)
	fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the configuration values to this file and exit.")
	fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")

	o.SecureServing.AddFlags(nfs.FlagSet("secure serving"))
	o.CombinedInsecureServing.AddFlags(nfs.FlagSet("insecure serving"))
	o.Authentication.AddFlags(nfs.FlagSet("authentication"))
	o.Authorization.AddFlags(nfs.FlagSet("authorization"))
	o.Deprecated.AddFlags(nfs.FlagSet("deprecated"), &o.ComponentConfig)

	options.BindLeaderElectionFlags(&o.ComponentConfig.LeaderElection, nfs.FlagSet("leader election"))
	utilfeature.DefaultMutableFeatureGate.AddFlag(nfs.FlagSet("feature gate"))
	o.Metrics.AddFlags(nfs.FlagSet("metrics"))
	o.Logs.AddFlags(nfs.FlagSet("logs"))

	return nfs
}
  1. 其中第一个参数 --config 就可以用来指定配置文件。到这里我们就获取到了调度器所有默认的配置参数了。

2.3启动调度器

  1. 接下来分析真正运行调度器的 runCommand 函数的实现。
// cmd/kube-scheduler/app/server.go

// 运行调度器的真正函数
func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
	// 比如执行 --version 这样的操作,则打印版本后直接退出了
  verflag.PrintAndExitIfRequested()
	cliflag.PrintFlags(cmd.Flags())

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// 根据命令行 args 和 options 创建完整的配置和调度程序
	cc, sched, err := Setup(ctx, opts, registryOptions...)
	if err != nil {
		return err
	}
	
  // 如果指定了 WriteConfigTo 参数
	if len(opts.WriteConfigTo) > 0 {
    // 将配置写入到指定的文件中
		if err := options.WriteConfigFile(opts.WriteConfigTo, &cc.ComponentConfig); err != nil {
			return err
		}
		klog.Infof("Wrote configuration to: %s\n", opts.WriteConfigTo)
		return nil
	}
  // 真正去启动调度器
	return Run(ctx, cc, sched)
}
  1. 上面的函数首先判断是否是执行类似于 --version 这样的操作,如果是这打印后直接退出,然后根据命令行参数和选项通过 Setup 函数构造 CompletedConfig 配置和 Scheduler 调度器对象。
// cmd/kube-scheduler/app/server.go/

// 根据命令行参数和选项构造完整的配置和调度器对象
func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
	// 校验命令行选项
  if errs := opts.Validate(); len(errs) > 0 {
		return nil, nil, utilerrors.NewAggregate(errs)
	}
	// 获取调度器Config对象,该对象拥有一个调度器所有的上下文信息
	c, err := opts.Config()
	if err != nil {
		return nil, nil, err
	}

	// 获取 completed 配置
	cc := c.Complete()

	outOfTreeRegistry := make(runtime.Registry)
	for _, option := range outOfTreeRegistryOptions {
		if err := option(outOfTreeRegistry); err != nil {
			return nil, nil, err
		}
	}

	recorderFactory := getRecorderFactory(&cc)
	// 创建调度器 这里很重要 要自己定义调度器算法就在这
	sched, err := scheduler.New(cc.Client,
		cc.InformerFactory,
		cc.PodInformer,
		recorderFactory,
		ctx.Done(),
		scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
		scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
		scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
		scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
		scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
		scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
		scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
	)
	if err != nil {
		return nil, nil, err
	}

	return &cc, sched, nil
}
  1. 该函数首先调用 opts.Validate() 函数对所有参数进行校验,接着使用 opts.Config() 函数创建 *schedulerappconfig.Config 对象,该对象拥有一个调度器所有的上下文信息。
// cmd/kube-scheduler/app/options/options.go

// Config 返回一个调度器配置对象
func (o *Options) Config() (*schedulerappconfig.Config, error) {
	......
	c := &schedulerappconfig.Config{}
	if err := o.ApplyTo(c); err != nil {
		return nil, err
	}

	// 创建 kube 客户端
	client, leaderElectionClient, eventClient, err := createClients(c.ComponentConfig.ClientConnection, o.Master, c.ComponentConfig.LeaderElection.RenewDeadline.Duration)
	if err != nil {
		return nil, err
	}

	......
	c.Client = client
	c.InformerFactory = informers.NewSharedInformerFactory(client, 0)
	c.PodInformer = scheduler.NewPodInformer(client, 0)
	c.LeaderElection = leaderElectionConfig

	return c, nil
}
  1. 上面函数的核心是通过 o.ApplyTo(c) 函数将 Options 转换成了 *schedulerappconfig.Config 对象,
// cmd/kube-scheduler/app/options/options.go

// 将调度程序 options 转换成调度程序应用配置
func (o *Options) ApplyTo(c *schedulerappconfig.Config) error {
	if len(o.ConfigFile) == 0 {
		c.ComponentConfig = o.ComponentConfig

		// 如果未加载任何配置文件,则应用 deprecated flags(这是旧的行为)
    o.Deprecated.ApplyTo(&c.ComponentConfig)
		if err := o.CombinedInsecureServing.ApplyTo(c, &c.ComponentConfig); err != nil {
			return err
		}
	} else {
		cfg, err := loadConfigFromFile(o.ConfigFile)
		if err != nil {
			return err
		}
		if err := validation.ValidateKubeSchedulerConfiguration(cfg).ToAggregate(); err != nil {
			return err
		}
		c.ComponentConfig = *cfg
		......
	}
	......
	return nil
}
  1. 上面的转换函数会首先判断是否配置了 ConfigFile(也就是 --config 参数),如果配置了则会加载对应的配置文件转换成对应的 KubeSchedulerConfiguration 对象,然后校验有效性,如果都正常则将其赋给 schedulerappconfig.Config 的 ComponentConfig 属性;如果没有配置 ConfigFile,则使用旧的参数进行配置。
  2. 接着会去调用 scheduler.New() 函数去构造一个真正的调度器对象,该函数的具体实现如下所示:
// pkg/scheduler/scheduler.go

// 配置调度器
type Option func(*schedulerOptions)

var defaultSchedulerOptions = schedulerOptions{
	profiles: []schedulerapi.KubeSchedulerProfile{
    // Profiles 的默认插件是从算法提供程序配置的
		{SchedulerName: v1.DefaultSchedulerName},  // 默认的调度器名称为 default-scheduler
	},
	schedulerAlgorithmSource: schedulerapi.SchedulerAlgorithmSource{
		Provider: defaultAlgorithmSourceProviderName(),  // 默认的算法源提供器名称为 DefaultProvider
	},
	percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
	podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()),
	podMaxBackoffSeconds:     int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()),
}

// 返回一个 Scheduler 对象
func New(client clientset.Interface,
	informerFactory informers.SharedInformerFactory,
	podInformer coreinformers.PodInformer,
	recorderFactory profile.RecorderFactory,
	stopCh <-chan struct{},
	opts ...Option) (*Scheduler, error) {

	stopEverything := stopCh
	if stopEverything == nil {
		stopEverything = wait.NeverStop
	}
  // 默认的调度器配置
	options := defaultSchedulerOptions
	for _, opt := range opts {
    // 将默认的调度器配置调用 Option 重新配置一次
		opt(&options)
	}

	......

	var sched *Scheduler
  // SchedulerAlgorithmSource 是调度程序算法的源
  // 包含Policy与Provider两种方式,必须指定一个源字段,并且源字段是互斥的
	source := options.schedulerAlgorithmSource
	switch {
	case source.Provider != nil:
		// 从一个算法 provider 中创建配置,这是我们现在需要重点关注的方式
		sc, err := configurator.createFromProvider(*source.Provider)
		if err != nil {
			return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
		}
		sched = sc
	case source.Policy != nil:
		// 从用户指定的策略源中创建配置,这是以前的扩展方式
		policy := &schedulerapi.Policy{}
		......
	default:
		return nil, fmt.Errorf("unsupported algorithm source: %v", source)
	}
	......
  // addAllEventHandlers 是在测试和 Scheduler 中使用的帮助程序函数,用于为各种 informers 添加事件处理程序
	addAllEventHandlers(sched, informerFactory, podInformer)
	return sched, nil
}
  1. 首先将默认的调度器配置通过传递的 Option 参数进行一一配置,然后重点就是根据应用过后的配置来判断调度算法的源是 Provider 还是 Policy 方式,我们现在的重点是调度框架,所以主要关注 Provider 这种配置,Policy 是以前的扩展调度器的方式。所以调度器的实例化核心是通过 configurator.createFromProvider(*source.Provider) 该函数来实现的。
// pkg/scheduler/factory.go

// 从一组已注册的插件集合中创建一个调度器
func (c *Configurator) create() (*Scheduler, error) {
	var extenders []framework.Extender
	var ignoredExtendedResources []string
	if len(c.extenders) != 0 {
		// Extender 方式扩展调度器
    ......
	}

  ......

	// Profiles 需要提供有效的 queue sort 插件
	lessFn := profiles[c.profiles[0].SchedulerName].Framework.QueueSortFunc()
  // 将优先级队列初始化为调度队列
	podQueue := internalqueue.NewSchedulingQueue(
		lessFn,
		internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
		internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),
		internalqueue.WithPodNominator(nominator),
	)

  ......
  // 创建一个 genericScheduler 对象,该对象实现了 ScheduleAlgorithm 接口,具体的调度实现就是这个对象实现的
	algo := core.NewGenericScheduler(
		c.schedulerCache,
		c.nodeInfoSnapshot,
		extenders,
		c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
		c.disablePreemption,
		c.percentageOfNodesToScore,
	)

	return &Scheduler{
		SchedulerCache:  c.schedulerCache,
		Algorithm:       algo,
		Profiles:        profiles,
		NextPod:         internalqueue.MakeNextPodFunc(podQueue),
		Error:           MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),
		StopEverything:  c.StopEverything,
		SchedulingQueue: podQueue,
	}, nil
}

// createFromProvider 从注册的算法提供器来创建调度器
func (c *Configurator) createFromProvider(providerName string) (*Scheduler, error) {
	klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
  // 获取算法提供器集合
	r := algorithmprovider.NewRegistry()
  // 获取指定算法提供器的插件集合
	defaultPlugins, exist := r[providerName]
	if !exist {
		return nil, fmt.Errorf("algorithm provider %q is not registered", providerName)
	}
  
	for i := range c.profiles {
		prof := &c.profiles[i]
		plugins := &schedulerapi.Plugins{}
		plugins.Append(defaultPlugins)
    // Apply 合并来自自定义插件的插件配置
		plugins.Apply(prof.Plugins)
		prof.Plugins = plugins
	}
	return c.create()
}
  1. 通过上面的一些列操作后就实例化了真正的调度器对象,最后我们需要去启动一系列的资源对象的事件监听程序,比如 Pod、Node 等对象,上面实例化函数中通过 addAllEventHandlers(sched, informerFactory, podInformer) 来实现的,关于这些资源对象对应的 onAdd、onUpdate、onDelete 操作均在 pkg/scheduler/eventhandlers.go 文件中实现的,这样比如当创建一个 Pod 过后,我们的调度器通过 watch 就会在 onAdd 事件中接收到该操作,然后我们就可以根据 queue sort 插件将器加入到带调度队列中去开始调度了。
  2. 最后就是去调用 Run 函数来真正启动调度器了,首先会等待所有的 cache 同步完成,然后开始进行调度操作。
// cmd/kube-scheduler/app/server.go/

// Run 根据指定的配置执行调度程序,仅在出现错误或上下文完成时才返回
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
	klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get())

	......

	// 启动 healthz 以及 metrics 相关服务
	......

	// 启动所有 informers
	go cc.PodInformer.Informer().Run(ctx.Done())
	cc.InformerFactory.Start(ctx.Done())

	// 调度之前等待所有 caches 同步完成
	cc.InformerFactory.WaitForCacheSync(ctx.Done())

	// 开启了 leader election
	if cc.LeaderElection != nil {
		cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
			OnStartedLeading: sched.Run,
			OnStoppedLeading: func() {
				klog.Fatalf("leaderelection lost")
			},
		}
		leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
		if err != nil {
			return fmt.Errorf("couldn't create leader elector: %v", err)
		}

		leaderElector.Run(ctx)

		return fmt.Errorf("lost lease")
	}

	// 如果没有开启 Leader election,这直接调用调度器对象的 Run 函数
	sched.Run(ctx)
	return fmt.Errorf("finished without leader elect")
}

// pkg/scheduler/scheduler.go

// 等待 cache 同步完成,然后开始调度
func (sched *Scheduler) Run(ctx context.Context) {
	if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
		return
	}
	sched.SchedulingQueue.Run()
	// sched.scheduleOne这里才是真正一个个pod去调度的方法
	wait.UntilWithContext(ctx, sched.scheduleOne, 0)
	sched.SchedulingQueue.Close()
}
  1. 接下来我们就接着来分析是如何进行具体的 Pod 调度的。

第三节 调度 Pod 流程

2.1 调度队列

  1. 前面我们分析了 kube-scheduler 组件如何接收命令行参数,用传递的参数构造一个 Scheduler 对象,最终启动了调度器。调度器启动后就可以开始为未调度的 Pod 进行调度操作了,下面主要来分析调度器是如何对一个 Pod 进行调度操作的。
  2. 调度器启动后最终是调用 Scheduler 下面的 Run() 函数来开始调度 Pod,如下所示代码:
// pkg/scheduler/scheduler.go

// 等待 cache 同步完成,然后开始调度
func (sched *Scheduler) Run(ctx context.Context) {
	if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
		return
	}
	sched.SchedulingQueue.Run()
	wait.UntilWithContext(ctx, sched.scheduleOne, 0)
	sched.SchedulingQueue.Close()
}
  1. 首先会等待所有的 cache 同步完成,然后开始执行 SchedulingQueue 的 Run() 函数,SchedulingQueue 是一个队列接口,用于存储待调度的 Pod,该接口遵循类似于 cache.FIFOcache.Heap 这样的数据结构,要弄明白调度器是如何去调度 Pod 的,我们就首先需要弄清楚这个结构:
// pkg/scheduler/internal/queue/scheduling_queue.go

// 用于存储带调度 Pod 的队列接口
type SchedulingQueue interface {
	framework.PodNominator
	// AddUnschedulableIfNotPresent 将无法调度的 Pod 添加回调度队列
  // podSchedulingCycle表示可以通过调用 SchedulingCycle() 返回的当前调度周期号
	AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error
  // SchedulingCycle 返回由调度队列缓存的当前调度周期数。 
  // 通常,只要弹出一个 Pod(例如调用 Pop() 函数),就增加此数字。
	SchedulingCycle() int64
  
  // 下面是通用队列相关操作
  // Pop 删除队列的头并返回它。 
  // 如果队列为空,它将阻塞,并等待直到新元素添加到队列中
	Pop() (*framework.QueuedPodInfo, error)
  // 往队列中添加一个 Pod
	Add(pod *v1.Pod) error
	Update(oldPod, newPod *v1.Pod) error
	Delete(pod *v1.Pod) error

	MoveAllToActiveOrBackoffQueue(event string)
	AssignedPodAdded(pod *v1.Pod)
	AssignedPodUpdated(pod *v1.Pod)
	PendingPods() []*v1.Pod
  // 关闭 SchedulingQueue,以便等待 pop 元素的 goroutine 可以正常退出
	Close()
	// NumUnschedulablePods 返回 SchedulingQueue 中存在的不可调度 Pod 的数量
	NumUnschedulablePods() int
	// 启动管理队列的goroutine
	Run()
}
  1. SchedulingQueue 是一个用于存储带调度 Pod 的队列接口,在构造 Scheduler 对象的时候我们可以了解到调度器中是如何实现这个队列接口的:
// pkg/scheduler/factory.go

// Profiles are required to have equivalent queue sort plugins.
lessFn := profiles[c.profiles[0].SchedulerName].Framework.QueueSortFunc()
podQueue := internalqueue.NewSchedulingQueue(
	lessFn,
	internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
	internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),
	internalqueue.WithPodNominator(nominator),
)
......
return &Scheduler{
	......
	NextPod:         internalqueue.MakeNextPodFunc(podQueue),
  ......
	SchedulingQueue: podQueue,
}, nil
  1. 可以看到上面的 internalqueue.NewSchedulingQueue 就是创建的一个 SchedulingQueue 对象,定义如下所示:
// pkg/scheduler/internal/queue/scheduling_queue.go

// 初始化一个优先级队列作为一个新的调度队列
func NewSchedulingQueue(lessFn framework.LessFunc, opts ...Option) SchedulingQueue {
	return NewPriorityQueue(lessFn, opts...)
}

// 配置 PriorityQueue
type Option func(*priorityQueueOptions)

// 创建一个 PriorityQueue 对象
func NewPriorityQueue(
	lessFn framework.LessFunc,
	opts ...Option,
) *PriorityQueue {
  ......

  comp := func(podInfo1, podInfo2 interface{}) bool {
		pInfo1 := podInfo1.(*framework.QueuedPodInfo)
		pInfo2 := podInfo2.(*framework.QueuedPodInfo)
		return lessFn(pInfo1, pInfo2)
	}
  ......

  pq := &PriorityQueue{
		PodNominator:              options.podNominator,
		clock:                     options.clock,
		stop:                      make(chan struct{}),
		podInitialBackoffDuration: options.podInitialBackoffDuration,
		podMaxBackoffDuration:     options.podMaxBackoffDuration,
		activeQ:                   heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
		unschedulableQ:            newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
		moveRequestCycle:          -1,
	}
	pq.cond.L = &pq.lock
	pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())

	return pq
}
  1. 从上面的初始化过程可以看出来 PriorityQueue 这个优先级队列实现了 SchedulingQueue 接口,所以真正的实现还需要去查看这个优先级队列:
// pkg/scheduler/internal/queue/scheduling_queue.go

// PriorityQueue 实现了调度队列 SchedulingQueue
// PriorityQueue 的头部元素是优先级最高的 pending Pod,该结构有三个子队列:
// 一个子队列包含正在考虑进行调度的 Pod,称为 activeQ,是一个堆
// 另一个队列包含已尝试并且确定为不可调度的 Pod,称为 unschedulableQ
// 第三个队列包含从 unschedulableQ 队列移出的 Pod,并在 backoff 完成后将其移到 activeQ 队列
type PriorityQueue struct {
	framework.PodNominator

	stop  chan struct{}
	clock util.Clock

	// pod 初始 backoff 的时间
	podInitialBackoffDuration time.Duration
	// pod 最大 backoff 的时间
	podMaxBackoffDuration time.Duration

	lock sync.RWMutex
	cond sync.Cond  // condition

  // activeQ 是调度程序主动查看以查找要调度 pod 的堆结构,堆头部是优先级最高的 Pod
	activeQ *heap.Heap
  // backoff 队列
	podBackoffQ *heap.Heap
	// unschedulableQ 不可调度队列
	unschedulableQ *UnschedulablePodsMap
  // 调度周期的递增序号,当 pop 的时候会增加
	schedulingCycle int64
  // moveRequestCycle 会缓存 schedulingCycle 的值
  // 当未调度的 Pod 重新被添加到 activeQ 中会保存 schedulingCycle 到 moveRequestCycle 中
	moveRequestCycle int64

	// 表明队列已经被关闭
	closed bool
}
  1. 这里使用的是一个 PriorityQueue 优先级队列来存储带调度的 Pod,这个也很好理解,普通队列是一个 FIFO 数据结构,根据元素进入队列的顺序依次出队,而对于调度的这个场景,优先级队列显然更合适,可以根据某些优先级策略,优先对某个 Pod 进行调度。
  2. PriorityQueue 的头部元素是优先级最高的带调度的 Pod,该结构有三个子队列:
    • 活动队列(activeQ):用来存放等待调度的 Pod 队列。
    • 不可调度队列(unschedulableQ):当 Pod 不能满足被调度的条件的时候就会被加入到这个不可调度的队列中来,等待后续继续进行尝试调度。
    • 回退队列(podBackOffQ):如果任务反复执行还是失败,则会按尝试次数增加等待调度时间,降低重试效率,从而避免反复失败浪费调度资源。对于调度失败的 Pod 会优先存储在 backoff 队列中,等待后续进行重试,可以认为就是重试的队列,只是后续再调度的等待时间会越来越长。
  3. 这里我们需要来弄清楚这几个队列是如何实现的。

2.2活动队列

  1. **活动队列(activeQ)是存储当前系统中所有在等待调度的 Pod 队列,**在上面实例化优先级队列里面可以看到 activeQ 队列的初始化是通过调用 heap.NewWithRecorder() 函数实现的。
// pkg/scheduler/internal/heap/heap.go

// NewWithRecorder 就是 Heap 基础上包装了 metrics 数据
func NewWithRecorder(keyFn KeyFunc, lessFn lessFunc, metricRecorder metrics.MetricRecorder) *Heap {
	return &Heap{
		data: &data{
			items:    map[string]*heapItem{},
			queue:    []string{},
			keyFunc:  keyFn,
			lessFunc: lessFn,
		},
		metricRecorder: metricRecorder,
	}
}

// lessFunc 接收两个元素,对列表进行排序时,将第一个元素放在第二个元素之前,则返回true。
type lessFunc = func(item1, item2 interface{}) bool
  1. 其中的 data 数据结构是 Golang 中的一个标准 heap 堆(只需要实现 heap.Interface 接口即可),然后 Heap 是在 data 基础上新增了一个用于记录 metrics 数据的堆,这里最重要的就是用比较元素优先级的 lessFunc 函数的实现,在初始化优先级队列的时候我们传入了一个 comp 的参数,这个参数就是 activeQ 这个堆里面的 lessFunc 函数的实现:
comp := func(podInfo1, podInfo2 interface{}) bool {
		pInfo1 := podInfo1.(*framework.QueuedPodInfo)
		pInfo2 := podInfo2.(*framework.QueuedPodInfo)
		return lessFn(pInfo1, pInfo2)
	}
  1. 最终是调用的创建 Scheduler 对象的时候传入的 lessFn 参数:
lessFn := profiles[c.profiles[0].SchedulerName].Framework.QueueSortFunc()
  1. 从这里可以看到比较元素优先级是通过调度框架的 QueueSortFunc() 函数来实现的,对应的实现如下所示:
// pkg/scheduler/framework/runtime/framework.go

// QueueSortFunc 返回用于对调度队列中的 Pod 进行排序的函数
func (f *frameworkImpl) QueueSortFunc() framework.LessFunc {
	if f == nil {
		// 如果 frameworkImpl 为nil,则只需保持其顺序不变
		// NOTE: 主要用于测试
		return func(_, _ *framework.QueuedPodInfo) bool { return false }
	}
  // 如果没有 queuesort 插件
	if len(f.queueSortPlugins) == 0 {
		panic("No QueueSort plugin is registered in the frameworkImpl.")
	}

	// 只有一个 QueueSort 插件有效
	return f.queueSortPlugins[0].Less
}
  1. 最终真正用于优先级队列元素优先级比较的函数是通过 QueueSort 插件来实现的,默认启用的 QueueSort 插件是 PrioritySort:
// pkg/scheduler/algorithmprovider/registry.go

func getDefaultConfig() *schedulerapi.Plugins {
	return &schedulerapi.Plugins{
		QueueSort: &schedulerapi.PluginSet{
			Enabled: []schedulerapi.Plugin{
				{Name: queuesort.Name},
			},
		},
    ......
  1. PrioritySort 这个插件的核心实现就是其 Less 函数了:
// pkg/scheduler/framework/plugins/queuesort/priority_sort.go

// Less 是 activeQ 队列用于对 Pod 进行排序的函数。
// 它根据 Pod 的优先级对 Pod 进行排序,
// 当优先级相同时,它使用 PodQueueInfo.timestamp 进行比较
func (pl *PrioritySort) Less(pInfo1, pInfo2 *framework.QueuedPodInfo) bool {
	p1 := pod.GetPodPriority(pInfo1.Pod)
	p2 := pod.GetPodPriority(pInfo2.Pod)
  // 先根据优先级的高低进行比较,然后根据 Pod 的创建时间
  // 越高优先级的 Pod 越被优先调度,越早创建的pod越优先
	return (p1 > p2) || (p1 == p2 && pInfo1.Timestamp.Before(pInfo2.Timestamp))
}

// pkg/api/v1/pod/util.go

// GetPodPriority 获取指定 Pod 的优先级
func GetPodPriority(pod *v1.Pod) int32 {
	if pod.Spec.Priority != nil {
		return *pod.Spec.Priority
	}
	return 0
}
  1. 到这里就真相大白了,对于 activeQ 活动队列中的 Pod 是依靠** PrioritySort 插件来进行优先级比较的**,每个 Pod 在被创建后都会有一个 priority 属性来标记 Pod 优先级,我们也可以通过全局的 ProrityClass 对象来进行定义,然后在调度 Pod 的时候会先根据 Pod 优先级的高低进行比较,如果优先级相同,则回根据 Pod 的创建时间进行比较,越高优先级的 Pod 越被优先调度,越早创建的Pod 越优先。
  2. 那么 Pod 是在什么时候加入到 activeQ 活动队列的呢?还记得前面我们在创建 Scheduler 对象的时候有一个 addAllEventHandlers 函数吗?其中就有对未调度 Pod 的事件监听处理操作。
// pkg/scheduler/eventhandlers.go

// unscheduled pod queue
podInformer.Informer().AddEventHandler(
	cache.FilteringResourceEventHandler{
		FilterFunc: func(obj interface{}) bool {
			switch t := obj.(type) {
			case *v1.Pod:
				return !assignedPod(t) && responsibleForPod(t, sched.Profiles)
			case cache.DeletedFinalStateUnknown:
				if pod, ok := t.Obj.(*v1.Pod); ok {
					return !assignedPod(pod) && responsibleForPod(pod, sched.Profiles)
				}
				utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
				return false
			default:
				utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
				return false
			}
		},
		Handler: cache.ResourceEventHandlerFuncs{
			AddFunc:    sched.addPodToSchedulingQueue,
			UpdateFunc: sched.updatePodInSchedulingQueue,
			DeleteFunc: sched.deletePodFromSchedulingQueue,
		},
	},
)
  1. 当 Pod 有事件变化后,首先回通过 FilterFunc 函数进行过滤,如果 Pod 没有绑定到节点(未调度)并且使用的是指定的调度器才进入下面的 Handler 进行处理,比如当创建 Pod 以后就会有 onAdd 的添加事件,这里调用的就是 sched.addPodToSchedulingQueue 函数:
// pkg/scheduler/eventhandlers.go

// 添加未调度的 Pod 到优先级队列
func(sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
   pod := obj.(*v1.Pod)
   klog.V(3).Infof("add event for unscheduled pod %s/%s", pod.Namespace, pod.Name)
   if err := sched.SchedulingQueue.Add(pod); err != nil {
      utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
   }
}
  1. 可以看到这里当 Pod 被创建后,会将 Pod 通过调度队列 SchedulingQueue 的 Add 函数添加到优先级队列中去:
// pkg/scheduler/internal/queue/scheduling_queue.go

// Add 添加 Pod 到 activeQ 活动队列,仅当添加了新的 Pod 时才应调用它
// 这样 Pod 就不会已经处于 active/unschedulable/backoff 队列中了
func (p *PriorityQueue) Add(pod *v1.Pod) error {
	p.lock.Lock()
	defer p.lock.Unlock()
	pInfo := p.newQueuedPodInfo(pod)
  // 添加到 activeQ 队列
	if err := p.activeQ.Add(pInfo); err != nil {
		klog.Errorf("Error adding pod %v to the scheduling queue: %v", nsNameForPod(pod), err)
		return err
	}
  // 如果在 unschedulableQ 队列中,则从改队列移除
	if p.unschedulableQ.get(pod) != nil {
		klog.Errorf("Error: pod %v is already in the unschedulable queue.", nsNameForPod(pod))
		p.unschedulableQ.delete(pod)
	}
	// 从 backoff 队列删除
	if err := p.podBackoffQ.Delete(pInfo); err == nil {
		klog.Errorf("Error: pod %v is already in the podBackoff queue.", nsNameForPod(pod))
	}
  // 记录metrics
	metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()
	p.PodNominator.AddNominatedPod(pod, "")
  // 通知其他地方进行处理
	p.cond.Broadcast()

	return nil
}
  1. 这就是 activeQ 活动队列添加元素的过程。

2.3 调度 Pod

  1. 当我们把新创建的 Pod 添加到 activeQ 活动队列过后,就可以在另外的协程中从这个队列中弹出堆顶的元素来进行具体的调度处理了。这里就要回头本文开头部分调度器启动后执行的一个调度操作了 sched.scheduleOne ,对单个 Pod 进行调度的基本流程如下所示:
    • 通过优先级队列弹出需要调度的 Pod
    • 在某些场景下跳过调度
    • 执行 Schedule 调度函数进行真正的调度,找到 Pod 合适的节点
    • 如果上面调度失败则会尝试抢占机制
    • 如果调度成功则将该 Pod 和选定的节点进行假性绑定(临时绑定),存入到调度器 cache 中,而不需要等待绑定操作的发生,方便后续操作
    • 异步执行真正的绑定操作,将节点名称添加到 Pod 的 spec.nodeName 属性中进行绑定。
      scheduleOne 函数如下所示:
// pkg/scheduler/scheduler.go

// scheduleOne 为单个 Pod 完成整个调度工作流程
func (sched *Scheduler) scheduleOne(ctx context.Context) {
  // 从调度器中获取下一个要调度的 Pod
	podInfo := sched.NextPod()
	......
}
  1. scheduleOne 函数在最开始调用 sched.NextPod() 函数来获取现在需要调度的 Pod,其实就是上面 activeQ 活动队列中 Pop 出来的元素,当实例化 Scheduler 对象的时候就指定了 NextPod 函数:internalqueue.MakeNextPodFunc(podQueue)
// pkg/scheduler/internal/queue/scheduling_queue.go

// MakeNextPodFunc 返回一个函数,用于从指定的调度队列中获取下一个 Pod 进行调度
func MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo {
	return func() *framework.QueuedPodInfo {
		podInfo, err := queue.Pop()
		......
		return nil
	}
}
  1. 很明显这里就是调用的优先级队列的 Pop() 函数来弹出队列中的 Pod 进行调度处理。
// pkg/scheduler/internal/queue/scheduling_queue.go

// Pop 删除 activeQ 活动队列的头部元素并返回它。
// 如果 activeQ 为空,它将阻塞,并等待直到新元素添加到队列中。
// 当 Pod 弹出后会增加调度周期参数的值。
func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
	p.lock.Lock()
	defer p.lock.Unlock()
	for p.activeQ.Len() == 0 {
    // 当队列为空时,将阻塞Pop()的调用,直到新元素入队。
    // 调用Close()时,将设置p.closed并广播condition,这将导致此循环继续并从Pop()返回。
		if p.closed {
			return nil, fmt.Errorf(queueClosed)
		}
		p.cond.Wait()
	}
  // 从 activeQ 队列弹出堆顶元素
	obj, err := p.activeQ.Pop()
	if err != nil {
		return nil, err
	}
	pInfo := obj.(*framework.QueuedPodInfo)
	pInfo.Attempts++
  // 增加调度周期次数
	p.schedulingCycle++
	return pInfo, err
}
  1. Pop() 函数很简单,就是从 activeQ 队列中弹出堆顶的元素返回即可。拿到了要调度的 Pod,接下来就是去真正执行调度逻辑了。
  2. 接着通过 Pod 指定的调度器获取对应的 profile 用于后续处理,其中包含有当前调度器对应的调度框架对象:
// pkg/scheduler/scheduler.go

// scheduleOne 为单个 Pod 完成整个调度工作流程
func (sched *Scheduler) scheduleOne(ctx context.Context) {
  // 从调度器中获取下一个要调度的 Pod
	podInfo := sched.NextPod()
	if podInfo == nil || podInfo.Pod == nil {
		return
	}
	pod := podInfo.Pod
  // 根据 Pod 获取指定的 profile
	prof, err := sched.profileForPod(pod)
	if err != nil {
		klog.Error(err)
		return
	}
  // 某些特定情况下跳过调度 Pod
	if sched.skipPodSchedule(prof, pod) {
		return
	}
  ......
}

// 某些特定情况下跳过调度 Pod 则返回 true
func (sched *Scheduler) skipPodSchedule(prof *profile.Profile, pod *v1.Pod) bool {
	// Case 1: Pod 标记为删除
	if pod.DeletionTimestamp != nil {
		prof.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
		klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
		return true
	}

	// Case 2: Pod 已经临时绑定可能会被跳过 Pod 更新
  // 如果临时绑定的 Pod 在其之前的调度周期中,但在被临时绑定之前有更新事件,则可以将其再次添加到调度队列中。
	if sched.skipPodUpdate(pod) {
		return true
	}
	return false
}
  1. 在真正开始调度之前,在某些场景下可能需要跳过 Pod 的调度,比如 Pod 被标记为删除了,还有一些忽略的场景通过 skipPodUpdate 函数来确定:
// pkg/scheduler/eventhandlers.go

// skipPodUpdate 检查指定的 Pod 更新是否应该被忽略
// 该函数将返回 true,如果:
//   - Pod 已经被暂时绑定, 并且
//   - Pod 只有 ResourceVersion,Spec.NodeName,Annotations,
//     ManagedFields, Finalizers 和/或 Conditions 更新了.
func (sched *Scheduler) skipPodUpdate(pod *v1.Pod) bool {
	// 没有暂时绑定的 Pod 不能被跳过更新
	isAssumed, err := sched.SchedulerCache.IsAssumedPod(pod)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err))
		return false
	}
	if !isAssumed {
		return false
	}

	// 从 cache 中获取临时绑定的 Pod
	assumedPod, err := sched.SchedulerCache.GetPod(pod)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("failed to get assumed pod %s/%s from cache: %v", pod.Namespace, pod.Name, err))
		return false
	}
	
  // 比较临时绑定的 Pod 和更新的 Pod
  // 如果它们相等(排除某些字段),则将跳过此 pod 更新
	f := func(pod *v1.Pod) *v1.Pod {
		p := pod.DeepCopy()
    // 必须排除 ResourceVersion 字段,因为每个对象更新将具有新的资源版本
		p.ResourceVersion = ""
    // Spec.NodeName 必须排除在外,因为在缓存中预期就临时为 Pod 分配了节点
		p.Spec.NodeName = ""
		// Annotations 必须排除在外,因为 <https://github.com/kubernetes/kubernetes/issues/52914>.
		p.Annotations = nil
		p.ManagedFields = nil
		// 外部控制器可能会更改以下内容,但它们不会影响调度决策
		p.Finalizers = nil
		p.Status.Conditions = nil
		return p
	}
  // 排除上面的这些字段外,如果一致则可以跳过更新
	assumedPodCopy, podCopy := f(assumedPod), f(pod)
	if !reflect.DeepEqual(assumedPodCopy, podCopy) {
		return false
	}
	klog.V(3).Infof("Skipping pod %s/%s update", pod.Namespace, pod.Name)
	return true
}
  1. 经过上面的忽略 Pod 调度过程后,就会去同步尝试调度 Pod 找到适合的节点。整个调度过程和之前的调度框架流程图是一致的:
    在这里插入图片描述
  2. 代码如下所示:
// pkg/scheduler/scheduler.go

// scheduleOne 为单个 Pod 完成整个调度工作流程
func (sched *Scheduler) scheduleOne(ctx context.Context) {
  ......
  // 同步尝试调度 Pod 找到适合的节点
	start := time.Now()
	state := framework.NewCycleState()
	state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
	schedulingCycleCtx, cancel := context.WithCancel(ctx)
	defer cancel()
  // 执行 Schedule 调度函数进行真正的调度
	scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
	if err != nil {
    // Schedule() 函数执行可能会失败,因为 Pod 无法在任何主机上运行
    // 因此我们尝试进行抢占,并期望下一次尝试Pod进行调度时,因为抢占而可以正常调度
    // 但也有可能不同的 Pod 会调度到被抢占的资源中,但这没有什么影响
		nominatedNode := ""
		if fitError, ok := err.(*core.FitError); ok {
			if !prof.HasPostFilterPlugins() {
        // 没有注册 PostFilter 插件
				klog.V(3).Infof("No PostFilter plugins are registered, so no preemption will be performed.")
			} else {
				// 运行 PostFilter 插件,尝试在未来的调度周期中使 Pod 可调度
        result, status := prof.RunPostFilterPlugins(ctx, state, pod, fitError.FilteredNodesStatuses)
				......
				if status.IsSuccess() && result != nil {
					nominatedNode = result.NominatedNodeName  // 被提名的节点名
				}
			}
		}
    // recordSchedulingFailure 为 pod记录一个事件,表示 pod 调度失败
    // 另外,如果设置了pod condition 和 nominated 提名节点名称,也要更新
    // 这里最重要的是要将调度失败的 Pod 加入到不可调度 Pod 的队列中去
    sched.recordSchedulingFailure(prof, podInfo, err, v1.PodReasonUnschedulable, nominatedNode)
		return
	}
	
  // 告诉 cache 暂时绑定的 Pod 现在正在指定节点上运行,即使尚未绑定
	// 这样我们就可以保持调度,而不必等待真正的绑定发生
	assumedPodInfo := podInfo.DeepCopy()  // 拷贝现在调度的 Pod 信息
	assumedPod := assumedPodInfo.Pod
  // assume 是通过设置 NodeName=scheduleResult.SuggestedHost 来修改 `assumedPod` 的
	err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
	if err != nil {
		......
	}
	
	// 运行 reserve 插件的 Reserve 方法(预留)
	if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
		// 触发 un-reserve 插件以清理与 reserve 的 Pod 相关联的状态
    prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    // 从缓存中移除临时绑定的 Pod
		if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
			klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
		}
		......
	}
  
  // 运行 "permit" 插件
	runPermitStatus := prof.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
	if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() {
    // 当 permit 插件结果不是 Wait 并且没有执行成功的时候,进行错误处理
		var reason string
		if runPermitStatus.IsUnschedulable() {
			reason = v1.PodReasonUnschedulable
		} else {
			reason = SchedulerError
		}
    // 其中一个插件返回的状态不等于 success 或 wait 的状态
    // 触发 un-reserve 插件
		prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
		// 从缓存中移除临时绑定的 Pod
    if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
			klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
		}
    // 记录调度失败事件,主要我们关心的是如何加入到另外两个不可调度队列中去
    sched.recordSchedulingFailure(prof, podInfo, err, v1.PodReasonUnschedulable, nominatedNode)
		return
	}
	......
}
  1. 从上面函数可以看出就是通过 sched.Algorithm.Schedule() 函数执行真正的调度选择合适的节点,如果调度出现了错误,比如没有得到任何合适的节点,这个时候如果注册了 PostFilter 插件,则会执行该插件,抢占操作就是在该插件中执行的,然后会将这个调度失败的 Pod 加入到 unschedulableQ 或者 podBackoffQ 队列中去,后续我们再仔细分析。
  2. 如果调度成功,得到了合适的节点,则先将该 Pod 和选定的节点进行假性绑定(不用等待去执行真正的绑定操作),存入调度器的 cache 中去,然后执行 Reserve 预留插件,去预留节点上 Pod 需要的资源。
  3. 接着就是调度的最后阶段去执行 Permit 允许插件,用于阻止或者延迟 Pod 与节点的绑定。
  4. 上面调度的过程都完成过后,最后就是去异步和节点进行真正的绑定操作了。
// pkg/scheduler/scheduler.go

func (sched *Scheduler) scheduleOne(ctx context.Context) {
	......
	// 异步绑定 Pod 到选定的节点 (we can do this b/c of the assumption step above).
	go func() {
		......
		// 首先调用 WaitOnPermit 扩展,与前面的 Permit 扩展点配合使用实现延时调度功能
		waitOnPermitStatus := prof.WaitOnPermit(bindingCycleCtx, assumedPod)
		if !waitOnPermitStatus.IsSuccess() {
			......
			// 触发 un-reserve 插件清理 reserver pod 关联的状态
			prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
      // 从缓存中移除临时绑定的 Pod
			if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
				klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
			}
			return
		}

		// 执行 "prebind" 插件.
		preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
		if !preBindStatus.IsSuccess() {
			prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
			if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
				klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
			}
			sched.recordSchedulingFailure(prof, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "")
			return
		}
		// 调用 bind 函数进行真正的绑定
		err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state)
		if err != nil {
			// 绑定失败进行清理操作
			prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
			if err := sched.SchedulerCache.ForgetPod(assumedPod); err != nil {
				klog.Errorf("scheduler cache ForgetPod failed: %v", err)
			}
		} else {
			// 最后绑定成功后,调用 "postbind" 插件.
			prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
		}
	}()
}
  1. 执行真正的绑定操作是在一个单独的 goroutine 里面操作的,由于前面调度最后可能启用了 Permit 允许插件,所以这里首先需要调用内置的 WaitOnPermit 插件配合 Permit 插件,如果 Pod 是 waiting 状态,则 WaitOnPermit 将会一直阻塞,直到 approve 或 deny 该 Pod。
  2. 然后就和调度框架里面的流程一样,依次调用 prebind、bind、postbind 插件完成真正的绑定操作。到这里我们就真正的完成了一个 Pod 的调度流程。

2.4失败与重试处理

  1. podBackOffQ,平时我们在创建 Pod 后如果执行失败了,可能会有一些 backoff 这样的 events 信息,这个 backoff 是什么意思呢?
    • backoff(回退)机制是并发编程中非常常见的一种机制,即如果任务反复执行依旧失败,则会按次增加等待调度时间,降低重试的效率,从而避免反复失败浪费调度资源。
    • 针对调度失败的 Pod 会优先存储在 backoff 队列中,等待后续重试。podBackOffQ 主要存储那些在多个 schedulingCyle 中依旧调度失败的情况,则会通过 backoff 机制,延迟等待调度时间。
    • backoffQ 也是一个优先级队列,其初始化也是在 Scheduler 初始化优先级队列的时候传入的,其中最重要的比较队列中元素优先级的函数为 pq.podsCompareBackoffCompleted
// pkg/scheduler/internal/queue/scheduling_queue.go

func NewPriorityQueue(
	lessFn framework.LessFunc,
	opts ...Option,
) *PriorityQueue {
	......
	pq := &PriorityQueue{
		......
	}
	pq.cond.L = &pq.lock
	pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
	return pq
}

// 比较 BackOffQ 队列中元素的优先级,谁的回退时间短谁的优先级高
func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
	pInfo1 := podInfo1.(*framework.QueuedPodInfo)
	pInfo2 := podInfo2.(*framework.QueuedPodInfo)
	bo1 := p.getBackoffTime(pInfo1)
	bo2 := p.getBackoffTime(pInfo2)
	return bo1.Before(bo2)
}

// getBackoffTime 获取 podInfo 完成回退的时间
func (p *PriorityQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time {
	duration := p.calculateBackoffDuration(podInfo)
	backoffTime := podInfo.Timestamp.Add(duration)
	return backoffTime
}

// calculateBackoffDuration 是一个 helper 函数用于计算 backoffDuration
// 基于 pod 已经计算的 attempts 次数
func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInfo) time.Duration {
	duration := p.podInitialBackoffDuration  // 默认1s
	for i := 1; i < podInfo.Attempts; i++ {  // 调度成功之前的尝试次数
		duration = duration * 2
		if duration > p.podMaxBackoffDuration {  // 最大10s
			return p.podMaxBackoffDuration
		}
	}
	return duration
}
  1. 不可调度队列unschedulableQ 就是字面意思不可调度队列,该队列中的 Pod 是已经尝试被确定为不可调度的 Pod,虽说是个队列,实际的数据结构是一个 map 类型。
// pkg/scheduler/internal/queue/scheduling_queue.go

// UnschedulablePodsMap 保存了不可调度的 Pod。
// 这个数据结构用于实现 unschedulableQ
type UnschedulablePodsMap struct {
	// podInfoMap 是由 Pod 的全名(podname_namespace)构成的 map key,值是指向 QueuedPodInfo 的指针。
  podInfoMap map[string]*framework.QueuedPodInfo
	keyFunc    func(*v1.Pod) string
	metricRecorder metrics.MetricRecorder
}
  1. 错误处理。在上面 scheduleOne 函数中当我们去真正执行调度操作后,如果出现了错误,然后会调用 recordSchedulingFailure 函数记录调度失败的事件,该函数中我们最关心的是调用 Scheduler 的 Error 回调函数,这个回调函数是在初始化调度器的时候传入的,是通过 MakeDefaultErrorFunc 函数得到的一个回调函数,在这个函数中会把当前调度失败的 Pod 加入到 unschedulableQ 不可调度队列或者 podBackoffQ 队列中去:
// pkg/scheduler/scheduler.go

// 记录调度失败的事件
func (sched *Scheduler) recordSchedulingFailure(prof *profile.Profile, podInfo *framework.QueuedPodInfo, err error, reason string, nominatedNode string) {
	sched.Error(podInfo, err)
	......
}

// pkg/scheduler/factory.go

// 创建 Scheduler 对象
func (c *Configurator) create() (*Scheduler, error) {
	......
	return &Scheduler{
		NextPod:         internalqueue.MakeNextPodFunc(podQueue),
		Error:           MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),
		......
	}, nil
}

// MakeDefaultErrorFunc 构造一个函数用来处理 Pod 调度错误
func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodLister, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.QueuedPodInfo, error) {
	return func(podInfo *framework.QueuedPodInfo, err error) {
		pod := podInfo.Pod
		......
		// 从 informer 缓存中获取 Pod
		cachedPod, err := podLister.Pods(pod.Namespace).Get(pod.Name)
		if err != nil {
			klog.Warningf("Pod %v/%v doesn't exist in informer cache: %v", pod.Namespace, pod.Name, err)
			return
		}
		// 添加到 unschedulableQ 队列中去
		podInfo.Pod = cachedPod.DeepCopy()
		if err := podQueue.AddUnschedulableIfNotPresent(podInfo, podQueue.SchedulingCycle()); err != nil {
			klog.Error(err)
		}
	}
}
  1. 真正是加入到队列是通过调用函数 podQueue.AddUnschedulableIfNotPresent() 来完成的:
// pkg/scheduler/internal/queue/scheduling_queue.go

// AddUnschedulableIfNotPresent 将一个不可调用的 Pod 插入到队列中
// 如果已经在队列中了,则忽略
// 一般情况优先级队列会把不可调度的 Pod 加入到 `unschedulableQ` 队列中
// 但如果最近有 move request,则会将 Pod 加入到 `podBackoffQ` 队列中
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error {
	p.lock.Lock()
	defer p.lock.Unlock()
	pod := pInfo.Pod
  // 查看是否已经在 unschedulableQ 队列中了
	if p.unschedulableQ.get(pod) != nil {
		return fmt.Errorf("pod: %v is already present in unschedulable queue", nsNameForPod(pod))
	}

	// 刷新 Pod 被重新添加后的时间戳
	pInfo.Timestamp = p.clock.Now()
  // 检查是否在 activeQ 活动队列中
	if _, exists, _ := p.activeQ.Get(pInfo); exists {
		return fmt.Errorf("pod: %v is already present in the active queue", nsNameForPod(pod))
	}
  // 检查是否在 podBackoffQ 队列中
	if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
		return fmt.Errorf("pod %v is already present in the backoff queue", nsNameForPod(pod))
	}

  // 如果已经收到了 move request,则将其加入到 BackoffQ 队列中,否则加入到 unschedulableQ 不可调度队列
	if p.moveRequestCycle >= podSchedulingCycle {
		if err := p.podBackoffQ.Add(pInfo); err != nil {
			return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
		}
		metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", ScheduleAttemptFailure).Inc()
	} else {
		p.unschedulableQ.addOrUpdate(pInfo)
		metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()
	}

	p.PodNominator.AddNominatedPod(pod, "")
	return nil
}
  1. 在 Pod 调度失败时,会调用AddUnschedulableIfNotPresent函数,其中有一个逻辑:
    • 如果 moveRequestCycle 大于等于当前的 podSchedulingCycle,则当前应该对之前已经失败的 Pod 进行重试,也就是放进 backoffQ 队列里
    • 如果不满足,则放进 unscheduleableQ 不可调度队列里
  2. 对于 moveRequestCycle 这个属性只有集群资源发生过变更(在资源的事件监听处理器里面都会去设置 moveRequestCycle=podSchedulingCycle)才会等于podSchedulingCycle。理论上来说在 Pod 调度失败时,没有后续任何操作,会被放进 unscheduleableQ 不可调度队列,但是有可能 Pod 刚刚调度失败,在错误处理之前,忽然发生了资源变更,这个时候,由于在这个错误处理的间隙,集群的资源状态已经发生了变化,所以可以认为这个 Pod 有了被调度成功的可能性,所以就被放进了backoffQ重试队列中,等待快速重试。
  3. 那么 PriorityQueue 队列里面包含的3个子队列之间的数据是如何流转的呢?还是要从调度启动的函数入手分析:
// pkg/scheduler/scheduler.go

// 等待 cache 同步完成,然后开始调度
func (sched *Scheduler) Run(ctx context.Context) {
	if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
		return
	}
	sched.SchedulingQueue.Run()
	wait.UntilWithContext(ctx, sched.scheduleOne, 0)
	sched.SchedulingQueue.Close()
}

其中的 sched.SchedulingQueue.Run() 函数就是运行 PriorityQueue 队列的 Run() 函数:

// pkg/scheduler/internal/queue/scheduling_queue.go

// Run 启动协程,把 podBackoffQ 队列数据放到 activeQ 活动队列中
func (p *PriorityQueue) Run() {
	go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
	go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}

// flushBackoffQCompleted 将 backoffQ 队列中已完成 backoff Pod 移动到 activeQ 队列中
func (p *PriorityQueue) flushBackoffQCompleted() {
	p.lock.Lock()
	defer p.lock.Unlock()
	for {
    // 获取heap头元素(不删除)
		rawPodInfo := p.podBackoffQ.Peek() 
		if rawPodInfo == nil {
			return
		}
		pod := rawPodInfo.(*framework.QueuedPodInfo).Pod
    // 如果该 Pod 回退完成的时间还没到,则忽略
		boTime := p.getBackoffTime(rawPodInfo.(*framework.QueuedPodInfo))
		if boTime.After(p.clock.Now()) {
			return
		}
    // 完成了则弹出heap头元素
		_, err := p.podBackoffQ.Pop()
		if err != nil {
			klog.Errorf("Unable to pop pod %v from backoff queue despite backoff completion.", nsNameForPod(pod))
			return
		}
    // 加入到 activeQ 活动队列
		p.activeQ.Add(rawPodInfo)
		metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
    // 广播
		defer p.cond.Broadcast()
	}
}

// flushUnschedulableQLeftover 将停留在 unschedulableQ 中时间长于 1min 的 Pod 移动 activeQ 中
func (p *PriorityQueue) flushUnschedulableQLeftover() {
	p.lock.Lock()
	defer p.lock.Unlock()

	var podsToMove []*framework.QueuedPodInfo
	currentTime := p.clock.Now()
	for _, pInfo := range p.unschedulableQ.podInfoMap {
    // 最后调度的时间
		lastScheduleTime := pInfo.Timestamp
    // 如果 Pod 在 unschedulableQ 队列中停留的时间超过1min
		if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval {
			podsToMove = append(podsToMove, pInfo)
		}
	}

	if len(podsToMove) > 0 {
    // 移动到活动队列或者 Backoff 队列
		p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
	}
}

// 移动Pod到活动队列或者Backoff队列
func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.QueuedPodInfo, event string) {
	for _, pInfo := range podInfoList {
		pod := pInfo.Pod
    // 如果还在 backoff 时间内
		if p.isPodBackingoff(pInfo) {
      // 添加到 podBackOffQ 队列
			if err := p.podBackoffQ.Add(pInfo); err != nil {
				klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
			} else {
        // 从 unschedulableQ 队列删除
				metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc()
				p.unschedulableQ.delete(pod)
			}
		} else {
      // 添加到 activeQ 队列
			if err := p.activeQ.Add(pInfo); err != nil {
				klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
			} else {
        // 从 unschedulableQ 队列删除
				metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc()
				p.unschedulableQ.delete(pod)
			}
		}
	}
  // 将 moveRequestCycle 设置为当前的 schedulingCycle
	p.moveRequestCycle = p.schedulingCycle
	p.cond.Broadcast()
}
  1. 加上上面的 sched.scheduleOne 函数,3个子队列整体的工作流程就是:
    • 每隔1秒,检测 backoffQ 里是否有 Pod 可以被放进 activeQ 里
    • 每隔30秒,检测 unscheduleodQ 里是否有 Pod 可以被放进 activeQ 里(默认条件是等待时间超过60秒)
    • 不停的调用 scheduleOne 方法,从 activeQ 里弹出 Pod 进行调度
  2. 如果一个 Pod 调度失败了,正常就是不可调度的,应该放入 unscheduleableQ 队列。如果集群内的资源状态一直不发生变化,这种情况,每隔60s这些 Pod 还是会被重新尝试调度一次。
  3. 但是一旦资源的状态发生了变化,这些不可调度的 Pod 就很可能可以被调度了,也就是 unscheduleableQ 中的 Pod 应该放进 backoffQ 里面去了。等待安排重新调度,backoffQ 里的 Pod 会根据重试的次数设定等待重试的时间,重试的次数越少,等待重新调度的时间也就越少。backOffQ 里的 Pod 调度的速度会比 unscheduleableQ 里的 Pod 快得多。
    在这里插入图片描述
  4. 到这里我们就完成了调度单个 Pod 的完整工作流的分析,接下来就是真正调度过程中的核心调度算法的实现过程。
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐