scheduler开发实现(四):源码笔记

如果觉得文章对你有帮助,可以点赞、好评、转发

这是本系列的终章,记录了k8sv1.23.3版本源码流程,建议全屏阅读,因为有些行比较长
(因为不是所有公司都直接在生产上使用最新版本的k8s,不过大致流程都是一样的,只是具体实现稍有不同。)

v1.23.3

X:scheduler启动流程

X:kubeschedulerconfiguration文件加载流程

X:插件加载流程

X:具体调度过程

X:scheme加载

X:k8s options

//scheduler启动流程
app.NewSchedulerCommand
	options.NewOptions()  //生成部分默认配置
	options.Options.Flags //设置应用程序接受哪些参数
		options.XXXXOptions.AddFlags  //不同的option实现了不同的AddFlags方法,AddFlags方法中设置具体的参数名
	app.runCommand
		app.Setup #创建scheduler
			options.Options.Config #生成配置、informer这些东西,scheduler本质上也是一个controller
				options.Optinos.ApplyTo //!!!加载配置文件
					options.loadConfigFromFile
				options.createClients //创建clientset
				options.makeLeaderElectionConfig //创建选举配置文件
				scheduler.NewInformerFactory //创建一个pod informer
					scheduler.newPodInformer
						coreinformers.NewFilteredPodInformer //创建一个会监听除了pod.Status.Phase!=Successed和pod.Status.Phase!=Failed的pod信息的informer
							cache.NewSharedIndexInformer
                dynamic.NewForConfigOrDie
				dynamicinformer.NewFilteredDynamicSharedInformerFactory
			scheduler.New //创建scheduler对应的configuration对象(创建一个默认的,然后合并我们配置的,然后得到最终的配置),并据此创建scheduler
				scheduler.Configuration.create //创建schduler
				scheduler.addAllEventHandlers //把scheduler的podQueue和前面创建的informer联系起来,这样informer就会把相关pod信息放到scheduler的三种调度队列中
		app.Run
        	informers.SharedInformerFactory.Start //启动informer
				go cache.sharedIndexInformer.Run  
			informers.SharedInformerFactory.WaitForCacheSync //等待缓存同步
			......waitingForLeader......//如果开启了选举则等待成为leader(获取到指定的lease资源)
			scheduler.Scheduler.Run //运行调度器
				internalqueue.SchedulingQueue.Run //会开启两个新的调度队列线程,会把待调度的pod放到对应的队列,实际为queue.PriorityQueue.Run,priorityqueue是阻塞优先队列
								                  //一共三个队列unschedulable/backoff/active
									              //无法调度(如资源不足导致找不到合适的node)->unschedulable;启动失败->backoff;可以再次尝试调度->activeQ
					go wait.Until  //一个死循环,1s运行一次
						......    //省略号表示中间略去了一系列的函数调用
						    PriorityQueue.flushBackoffQCompleted  //尝试把backoff队列中本次backoff等待时间到期的pod移动到active 队列
					go wait.Until    //一个死循环,30s运行一次
                        ......
                            PriorityQueue.flushUnschedulableQLeftover //把unschedulable队列中本次unschedulable等待时间到期的pod移动到backoff队列或者active队列
																	  //也就是说一个处于不可调度的pod在经历过一段时候后还是会尝试重新调度,默认等待时间是60s
								PriorityQueue.movePodsToActiveOrBackoffQueue //如果pod需要等待backoff超时则会放入backoff队列,如果不需要则直接放入active队列
				wait.UntilWithContext //一个死循环,每隔一段时间就会运行一次Scheduler.scheduleOne,默认是0,即有pod就调度
					......				//省略号表示中间略去了一系列的函数调用
						scheduler.Scheduler.scheduleOne //进行一次调度
				internalqueue.SchedulingQueue.Close //走到这里表明上面已经退出了死循环,即程序即将结束,这里关闭队列
//配置文件加载流程: 加载kubeSchedulerConfiguration文件并转换成内部版本
app.NewSchedulerCommand
  app.runCommand
	app.Setup //创建scheduler配置对象:加载配置文件、创建clientset、创建leaderelect对象
	  options.Options.Config
	    options.Options.ApplyTo
	      options.loadConfigFromFile //读取文件
	        options.loadConfig //把二进制数据解析成kubeschedulerconfiguration对象并转换成__internal版本
	          serializer.CodeFactory.UniversalDecoder //创建一个解码器
	            ......
	              versioning.NewCodec
                                              //codec是根据scheme构造的的,我们通过addKnowTypes注册结构体和gvk之间的映射关系,然后通过schemebuilder.AddToScheme
                                              //把注册好的映射关系添加到指定的scheme中
					return &codec{  
					  encoder:   encoder,
					  decoder:   recongnizer.decoder, //json/yaml解析器
					  convertor: runtime.unsafeObjectConvertor, //用来转换版本
					  creater:   runtime.Scheme, //用来根据gvk创建obj
					  typer:     runtime.Scheme, //获取对象gvk 
					  defaulter: defaulter,
					  encodeVersion: encodeVersion,
					  decodeVersion: decodeVersion,
					  identifier: identifier(encodeVersion, encoder),
					  originalSchemeName: originalSchemeName,
					}
	          versioning.codec.Decode //把二进制数据解析成kubeschedulerconfiguration对象并转换成__internal版本
	            recongnizer.decoder.Decode //decoder是一个解码器数组,循环调用每一个解码器来尝试解码(json解码器、probuff解码器、yaml解码器)
	              json.Serializer.Decode
	                yaml.YAMLToJSON //把二进制数据转换成json对象后再把json对象序列化成二进制数据
	                  yaml.Unmarshal //解析二进制数据成map[interface]interface
	                  yaml.convertToJSONableObject  //把map[interface]interface转换成jsonable对象,
	                  yaml.Marshal//把json对象序列化成二进制数据([]byte)
	                json.SimpleMetaFactory.Interpret //获取对象的gvk
	                  json.Unmarshal//从反序列化后的json对象中获取对象的apiVersion字段和kind字段
	                  schema.ParseGroupVersion //从对象的apiVersion字段中解析出对象的Group和version
	                runtime.UseOrCreateObject //创建一个gvk对应类型的空对象
	                  runtime.Scheme.New  
	                    //runtime.Scheme.New函数就是根据gvk来获取对象的type,然后通过反射包来创建对象
	                    //s是Serializer,s.gvkToType是一个map,key=对象的gvk,value=对象的Typer
					    if t, exists := s.gvkToType[kind]; exists {
					      return reflect.New(t).Interface().(Object), nil
				        }
	                json.Serializer.unmarshal //从由json对象序列化后的二进制数据中的值填充到刚创建的对象中
	            v1beta2.KubeSchedulerConfiguration.DecodeNestedObjects //解析plugin参数,目前k8s 1.23版本的kubeschedulerconfiguration用的是v1beta2版本
																	   //即把kubeschedulerconfiguration.yaml文件中profiles.pluginConfig解析成指定参数类型即解析成DynamicArgs
                  v1beta2.PluginConfig.decodeNestedObjects
					gvk := SchemeGroupVersion.WithKind(c.Name + "Args") //!!!如果我们插件的Name()返回的名字叫做Dynamic,那么对应的参数结构体的名字必须叫做DynamicArgs
                                                                        //否则就无法把kubeschedulerconfiguration中profiles.plugconfig.args解析成结构体
																	    //从而在调用NewDynamicScheduler函数创建plugin时,无法把传进来的runtime.Object对象强转换成DynamicArgs对象
																	    //DynamicArgs的GV和kubeschedulerconfiguration保持一致
					runtime.WithoutVersionDecoder.Decode
                      recognizer.decoder.Decode                //把profiles.pluginConfig.args解析成DynamicArgs
                runtime.Scheme.Default                         //对kubeschedulerconfiguration对象进行处理
                                                               //对于profile部分,他会把我们enable和disable的插件和他默认开启的插件进行合并,合并后profile中只含有要开启的插件
                                                               //也就是说经过这一步之后,凡是在profile对象的enable中的插件都会被开启,凡是不再enable中的插件都不会开启,disable保持为nil
                  v1beta2.SetObjectDefaults_KubeSchedulerConfiguration
                    v1beta2.SetDefaults_KubeSchedulerConfiguration
                      obj.Parallelism = pointer.Int32Ptr(16) //默认并行度为16,举例:比如在给node打分时,会开16个goroute去并行打分
					  v1beta2.setDefaults_KubeSchedulerProfile //合并插件,对于默认的插件还会设置默认的参数
                        v1beta2.getDefaultPlugins //获取默认开启的插件
                        v1beta2.mergePlugins   //进行合并,合并后,最终的profile对象中会包含需要开启的所有插件,不在profile对象中的插件都不会被开启
                        v1beta2.GetPluginArgConversionScheme   //获取一个scheme对象,该scheme中注册了config包下面的所有的struct,以便我们可以解析配置文件和自定义参数对象
                        runtime.Scheme.Default                 //对于每一个参数对象,如果有defaulter函数,则应用该对象对应的defaulter函数。
                                                               //注意:defaulter一般是在对象初始化之后调用,即我们从配置文件读取了值并已经赋值给了对应的对象,
                                                               //defaulter相当于一个postInit操作,类似于调度器插件函数postBind运行在bind之后
                          SetObjectDefaults_DynamicArgs        //defaulter函数都是叫SetObjectDefaults_XXXX,XXX代表struct名字,可以有defaulter-gen工具自动生成
	            runtime.unsafeObjectConvertor.ConvertToVersion //把对象从外部版本转换成内部版本即__interval版本
	              runtime.Scheme.UnsafeConvertToVersion
	                runtime.Scheme.convertToVersion
	                   conversion.Converter.Convert //根据对象的gvk获取对应的转换函数,然后把对象转换成内部版本
	                     v1beta2.Convert_v1beta2_KubeSchedulerConfiguration_To_config_KubeSchedulerConfiguration //把kubeschedulerconfiguration转换成__internal版本
                            v1beta2.Convert_v1beta2_DynamicArgs_To_config_DynamicArgs //把kubeschedulerconfiguration的profiles.pluginconfig.args转换成__internal版本
	   options.CreateKubeConfig
	   options.createClients
	   ......
//调度器创建流程:插件加载流程
scheduler.New //创建默认的配置、合并kubeschedulerconfiguration文件中的配置、创建scheduler、把scheduler和informer联系起来
  frameworkplugins.NewInTreeRegistry          //创建默认的调度器插件集合,registry是一个map,即type Registry map[string]PluginFactory,
                                              //key=plugin的名字,对应Dynamic,value=pluginFactory即创建一个plugin的方法,对应NewDynamicScheduler
  runtime.Registry.Merge                      //对于一个调度器,他把默认的调度器插件集合和我们在app.NewSchedulerCommand中通过app.WithPlugin注册的插件合并
											  //注意,registry相当于一个插件工厂,包含了所有已知的插件和对应的构造函数,然后经过Default处理后的profile指定了要开启的插件
                                              //创建调度器的时候,根据profile中的启用的插件的名字去registry中查找对应的构造函数,然后构造插件对象,然后再把插件对象放到调度器中
  configurator:=&Configurator{...省略...}     //创建配置对象
  scheduler.Configurator.create()            //创建调度器
    internalqueue.NewPodNominator            //抢占相关(调度失败以后才会尝试抢占)
    profile.NewMap                           //根据kubeschedulerconfiguration中的profiles字段创建调度器集合,profiles可以包含多个profile,一个profile对应一个调度器的配置文件
                                             //调度器集合用map保存,key=schedulerName,value=Framework(一个Framework代表一个调度器),Framework内部包含了profile中启用的插件
      profile.newProfile   
        runtime.NewFramework  
          runtime.frameworkImpl.pluginsNeed  //只有在profile对象中enable的插件才是调度器所需要开启的插件 
          ...根据plugin名字从registry中获取对应的构造函数,然后创建插件...
          runtime.updatePluginList           //把创建好的插件对象放到调度器对象中
    internalqueue.NewSchedulingQueue         //创建调度器的podQueue
    scheduler.NewGenericScheduler            //创建调度器算法对象,这个对象是调度器的一部分,这个对象实现了Schedule方法,调度器算法对象对应的结构体类型为scheduler.genericScheduler
                                             //调度器在scheduler.Scheduler.scheduleOne方法中通过此对象的Schedule方法来从所有node中选出一个合适的node供pod调度
										     //也就是说我们的调度插件是在scheduler.genericScheduler.Schedule方法中调用
    return &Scheduler{...省略...}            //最终的调度器对象,scheduler.Scheduler
  scheduler.addAllEventHandlers              //把调度器的podqueue和之前创建的informer联系起来
//leader-elect流程
app.NewSchedulerCommand
  app.runCommand
	app.Setup 
	  options.Options.Config  
        options.makeLeaderElectionConfig     //创建leaderElection配置对象
          resourcelock.NewFromKubeconfig     //从kubeschedulerconfiguration文件创建resourcelock。需要配置ResourceLock为leases(一般都是用leases锁)
                                             //ResouceName为leader-elect过程中所竞争的锁,如果没有配置则默认竞争名为" kube-scheduler"的锁,
                                             //而系统一般已经有默认的调度器在运行了,所以此时如果开启选举,我们的调度器一般会一直阻塞于获取锁的状态
                                             //ResouceNamespace:leases对象所在的命名空间
	app.Run                                  //如果没有开启选举则直接运行scheduler.Scheduler.Run,否则就先要获取锁
      leaderelection.NewLeaderElector        //创建leaderElector对象,会把scheduler.Scheduler.Run作为回调函数,在获取到对应的leases锁后运行该函数
      leaderelection.LeaderElector.Run       //
        leaderelection.LeaderElector.acquire //尝试获取锁,一直阻塞,直到获取锁或者出错
          wait.JitterUtil                    //死循环,不断尝试获取锁
            ......
              leaderelection.LeaderElector.tryAcquireOrRenew             //尝试获取锁
                resourcelock.Get  
                  v1.leases.Get
                    rest.RESTClient.Get                                 //查询apiserver
                1:锁不存在,则创建,并返回成功
                  resourcelock.Create                                   //如果锁不存在,说明此时不会有人获取到该锁,所以他创建锁并获取领导者资格
                2:锁存在,如果自己不是leader且锁没有过时,则返回失败
                3:锁存在,如果自己是leader或者锁已经过时,则更新锁对象,并返回成功
                  resourcelock.Update
        go leaderelection.LeaderCallbacks.OnStartedLeading              //调用回调函数,即我们前面设置的scheduler.Scheduler.Run,即新开一个线程去运行调度主程序
        leaderelection.LeaderElector.renew                              //内部一个死循环不断更新锁资源对象,如果更新失败,则关闭scheduler.Scheduler.Run线程,然后退出运行
          wait.Until                                                     //死循环
            ...... 
              leaderelection.LeaderElector.tryAcquireOrRenew  
              1:更新成功,开始下一次更新周期
              2:如果更新失败,则退出运行
                cancel()                   //leaderelector和scheduler.Scheduler.Run使用的是同一个ctx,所以在leaderelector中调用该ctx的cancel函数会通知scheduler.Scheduler.Run关闭
//具体调度过程:
scheduler.Scheduler.scheduleOne
  scheduler.Scheduler.NextPod //NextPod函数是在创建调度器的时候外部传入的,queue.MakeNextPodFunc对队列pop的一个封装,增加了日志功能
    queue.PriorityQueue.Pop
  scheduler.Scheduler.frameworkForPod //通过pod的schedulerName字段获取pod对应的调度器
  scheduler.Scheduler.skipPodSchedule //如果调度时pod已经被删除了或者pod在之前就已经调度完了但是此时还没有绑定完成状态还没有更新,就跳过调度
  scheduler.genericScheduler.Schedule //为pod挑选一个合适的node
    scheduler.genericScheduler.snapshot //获取当前node最新信息的一个快照
    scheduler.genericScheduler.findNodesThatFitPod //通过运行过滤插件来过滤Node,来找到可调度的node集合
       runtime.frameworkImpl.RunPreFilterPlugins //进行一些初始化操作,把数据写到framework.CycleState变量中,然后在后续阶段从此变量中读取数据,
                                                 //在该pod的调度周期,该插件的prefilter函数只会运行一次
       scheduler.genericScheduler.evaluateNominatedNode //如果该pod在之前的调度中已经提名了,则首先尝试提名的node,失败再去迭代所有node
         scheduler.genericScheduler.findNodesThatPassFilters //运行过滤插件来过滤node,如果没有过滤插件,则默认都可以通过过滤
           runtime.frameworkImpl.RunFilterPluginsWithNominatedPods//会过滤两次,第一次过滤时会假定提名的pod抢占成功即给node已使用的资源加上该提名pod的信息,
                                                                  //第二次则假定提名的pod抢占失败,即不考虑提名pod的资源
             runtime.frameworkImpl.RunFilterPlugins //过滤一个node,默认有十几个过滤器,包括了节点亲和性这些,v1beta2.getDefaultPlugins获取所有默认的插件
       scheduler.genericScheduler.findNodesThatPassFilters //如果提名的node没有通过过滤器检测,则再次尝试过滤所有node
    scheduler.prioritizeNodes //打分
      runtime.frameworkImpl.RunPreScorePlugins //有许多插件,如节点亲和性、拓扑逻辑等,暂不了解
      runtime.frameworkImpl.RunScorePlugins  //打分,默认也有一大堆的打分插件
        1:运行所有打分插件
        2:每一个节点的最终分=sum(插件分*插件权重)
  if 没有找到合适node:
    runtime.frameworkImpl.RunPostFilterPlugins  //默认只有一个抢占插件,该抢占逻辑就是优先级低的pod可以被抢占(可能需要抢占多个才能),然后挑选每一个node上可以被抢占的pod集合,然后先挑选node,然后再挑选最好的node
      runtime.frameworkImpl.runPostFilterPlugin
        defaultpreemption.DefaultPreemption.PostFilter
          preemption.Evaluator.Preempt
            preemption.Evaluator.findCandidates //挑选合适的node
              preemption.Evaluator.DryRunPreemption //可以抢占低优先级的pod,挑选被抢占的pod
              preemption.Evaluator.SelectCandidate //挑选一个最合适的node
              preemption.Evaluator.prepareCandidate //通过clientset向apiserver执行删除操作,此处无需更新node的资源使用情况,此时此pod为不可调度状态,
                                                    //本次调度即不会调度pod,但会提名pod,就是标记一下这个pod在等待这个node,等到此pod再次被调度时,会优先尝试调度到这个node上
                                                    //当低优先级pod被驱逐后,因为k8s在scheduler.addAllEventHandlers中注册了Add/Update/Delete函数,所以是在此处更新node的资源使用情况                                   
       scheduler.Scheduler.recordSchedulingFailure //本次调度到此结束,此处会把pod放到不可调度队列,等待下一次调度
  scheduler.Scheduler.assume //把pod添加到调度器内部的cache,该cache保存了所有假定已经调度完毕的pod(即挑选了node,但是还没有绑定完成,下次调度会被跳过)
    cache.schedulerCache.AssumePod //添加到cache内部的map
                                   //此处操作的pod结构体是副本,所以系统中的pod结构体的status字段仍然是未绑定,所以该pod当发生updatePod事件时该pod仍然可能再次被添加到调度队列
      cache.schedulerCache.addPod
        framework.NodeInfo.AddPod
          framework.NodeInfo.AddPodInfo  //这一步会把pod的cpu/memory limits(不是requests)加到node对象上,也就是假定绑定成功,假定这些资源被消耗了
            framework.NodeInfo.calculateResource //这一步会计算出一个pod所需的资源:所有容器的limits之和
            ...更新nodeinfo以及时间戳,时间戳在更新nodesnap的时候判断一个node信息是否是最新的...
  runtime.frameworkImpl.RunReservePluginsReserve //执行预留资源操作,实际只是更新本地资源对象,并没有更新apiserver?
    volumebinding.VolumeBinding.Reserve //默认reserve阶段只有这个插件,绑定pvc/pv
  runtime.frameworkImpl.RunPermitPlugins //默认没有批准插件
  go func{  //异步绑定,在此之前已经假定pod成功绑定到了该node上,即更新了相关对象
      runtime.frameworkImpl.WaitOnPermit //等待批准,默认无批准插件,所以无需等待
      runtime.frameworkImpl.RunPreBindPlugins
      scheduler.Scheduler.bind           //执行绑定操作,实际就是发一个请求到apiserver更新apiserver上对象的状态
        runtime.frameworkImpl.RunBindPlugins
          defaultbinder.DefaultBinder.Bind
            v1.pods.Bind  //post请求到apiserver更新对象状态
      if err!=nil
        runtime.frameworkImpl.RunReservePluginsUnreserve //如果有错误则取消预留的资源(默认只有volume可能需要处理)
      runtime.frameworkImpl.RunPostBindPlugins  //默认没有postbind插件,但是我们自己给他添加了一个dynamic插件
  }







//scheme加载:
//k8s.io\kubernetes\pkg\scheduler\apis\config\scheme.go: 这是k8s内建的scheduler的scheme
package scheme
import (
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/serializer"
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
	config "k8s.io/kubernetes/pkg/scheduler/apis/config"
	configv1beta2 "k8s.io/kubernetes/pkg/scheduler/apis/config/v1beta2"
	configv1beta3 "k8s.io/kubernetes/pkg/scheduler/apis/config/v1beta3"
)
var (
	// Scheme is the runtime.Scheme to which all kubescheduler api types are registered.
	Scheme = runtime.NewScheme()                                  //这是k8s内建的scheme,如果我们想要调度器成功加载我们自己的对象,那么就必须把我们的pair(gvk,struct)注册到这个scheme里

	// Codecs provides access to encoding and decoding for the scheme.
	Codecs = serializer.NewCodecFactory(Scheme, serializer.EnableStrict) //编码/解码器,用来二进制数据中解码出对应的struct,我们在启动的时候在options.loadConfig里调用这个Codecs
)
func init() {                    //init函数会在这个文件加载的时候自动被调用,这是go的机制
	AddToScheme(Scheme)   
}
func AddToScheme(scheme *runtime.Scheme) {
	utilruntime.Must(config.AddToScheme(scheme))  //把config包中通过addKnowTypes方法注册的pair(gvk,struct)注册到整个scheme中,下面以config包为例
	utilruntime.Must(configv1beta2.AddToScheme(scheme))
	utilruntime.Must(configv1beta3.AddToScheme(scheme))
	utilruntime.Must(scheme.SetVersionPriority(
		configv1beta3.SchemeGroupVersion,
		configv1beta2.SchemeGroupVersion))
}




//k8s.io/kubernetes/pkg/scheduler/apis/config.go
package config
import (
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
)
const GroupName = "kubescheduler.config.k8s.io"

var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: runtime.APIVersionInternal}

var (
	SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
	AddToScheme = SchemeBuilder.AddToScheme   //在上面k8s.io\kubernetes\pkg\scheduler\apis\config\scheme的AddToScheme中被调用,即把本包内的对象注册到了全局scheduler使用的scheme中
)

func addKnownTypes(scheme *runtime.Scheme) error {
                                                 //注册gvk(g:group,v:version,k:kind)和struct,实际就是一个map[gvk]struct,我们读取一个二进制文件,首先解析成map[interface]interface
                                                 //然后读取apiVersion和Kind,然后从apiversion中再解析出Group和Version,这样就获取了GVK,就知道二进制文件对应的结构体类型
                                                 //就可以通过反射来创建一个初始对象,就知道有哪些字段了,然后再从map[interface]interface读取对应的字段填充到初始对象
                                                 //这样就完成了从二进制文件创建对象的功能了
	scheme.AddKnownTypes(SchemeGroupVersion,   
		&KubeSchedulerConfiguration{},
		&DefaultPreemptionArgs{},
		&InterPodAffinityArgs{},
		&NodeResourcesFitArgs{},
		&PodTopologySpreadArgs{},
		&VolumeBindingArgs{},
		&NodeResourcesBalancedAllocationArgs{},
		&NodeAffinityArgs{},
	)
	return nil
}





//k8s.io\apimachinery\pkg\runtime.go
func (s *Scheme) AddKnownTypes(gv schema.GroupVersion, types ...Object) {
	s.addObservedVersion(gv)
	for _, obj := range types {
		t := reflect.TypeOf(obj)
		if t.Kind() != reflect.Ptr {
			panic("All types must be pointers to structs.")
		}
		t = t.Elem()
		s.AddKnownTypeWithName(gv.WithKind(t.Name()), obj) //把gv,k组合成gvk,然后把gvk和obj关联起来
	}
}




//crane\pkg\plugins\apis\config\register.go 这是我们自己的配置文件
package config
import (
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/klog/v2"
	kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
)
var SchemeGroupVersion = schema.GroupVersion{Group: kubeschedulerconfig.GroupName, Version: runtime.APIVersionInternal}
var (
	localSchemeBuilder = &kubeschedulerconfig.SchemeBuilder   //因为需要k8s能够从文件识别、加载、创建我们自己的对象(如DynamicArgs等),
                                                              //所以需要把我们的pair(gvk,dynamicArgs)注册到全局的scheduler的scheme
                                                              //而config.SchemeBuilder中的pair(gvk,struct)会被注册到全局scheme,
                                                              //所以我们把我们的pair添加到config.SchemeBuilder也是可行的
	AddToScheme = localSchemeBuilder.AddToScheme
)
func addKnownTypes(scheme *runtime.Scheme) error {
	scheme.AddKnownTypes(SchemeGroupVersion,
		&DynamicArgs{},
		&NodeResourceTopologyMatchArgs{},
	)
	return nil
}
func init() {
	localSchemeBuilder.Register(addKnownTypes) //把我们的pair添加到config.SchemeBuilder中
	localSchemeBuilder.Register(RegisterDefaults)
}
/* k8s option机制:
option本质就是一个函数,不同包下面的option是接受不同参数的函数。创建一个option相当于创建了一个容器,这个容器内不保存了一个值(类似于闭包)
然后当我们传递一个对象给这个容器的时候,option就会把内部保存的这个值保存到这个对象里面
举例:app.WithPlugin函数返回一个app.Option对象,这个option对象内部保存了两个值(name,factory),
当我们传递一个resitry对象给这个option的时候,这个option就会把(name,factory)保存到registery对象内部
*/
cmd := app.NewSchedulerCommand(
		app.WithPlugin(dynamic.Name, dynamic.NewDynamicScheduler),
		app.WithPlugin(noderesourcetopology.Name, noderesourcetopology.New),
	)
type Option func(runtime.Registry) error
func WithPlugin(name string, factory runtime.PluginFactory) Option {
	return func(registry runtime.Registry) error {
		return registry.Register(name, factory)
	}
}
func Setup(){
	.......
	outOfTreeRegistry := make(runtime.Registry)         //我们创建一个叫做outOfTreeRegistry的registry(因为k8s有个默认的叫做inTree,所以叫做outOfTree,后面会合并这两个registery)
	for _, option := range outOfTreeRegistryOptions {   //option即我们用app.WithPlugin添加的外部插件
		if err := option(outOfTreeRegistry); err != nil {  //option(outOfTreeRegistry)这一步会把option里面的插件登记到这个outOfTreeRegistry中
			return nil, nil, err
		}
	}
    ......
}

附注
v1beta2.getDefaultPlugins获取所有默认的插件
调度器的Extenders字段一直为空,即默认调度器是没有扩展的,可以忽略

别人写的k8s 调度器源码注释,我觉得还挺好,贴出来和大家分享一下:

https://github.com/jindezgm/k8s-src-analysis/blob/master/kube-scheduler/Plugin.md

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐