k8s scheduler开发实现(四):源码笔记
k8s v1.23.3版本调度器源码阅读笔记。记录了scheduler启动流程、KubeSchedulerConfiguration.yaml配置文件的加载流程、插件加载流程、具体的调度过程、scheme加载,最后还简单介绍了K8s源码里常用的options机制
·
scheduler开发实现(四):源码笔记
如果觉得文章对你有帮助,可以点赞、好评、转发
这是本系列的终章,记录了k8sv1.23.3版本源码流程,建议全屏阅读,因为有些行比较长
(因为不是所有公司都直接在生产上使用最新版本的k8s,不过大致流程都是一样的,只是具体实现稍有不同。)
v1.23.3
X:scheduler启动流程
X:kubeschedulerconfiguration文件加载流程
X:插件加载流程
X:具体调度过程
X:scheme加载
X:k8s options
//scheduler启动流程
app.NewSchedulerCommand
options.NewOptions() //生成部分默认配置
options.Options.Flags //设置应用程序接受哪些参数
options.XXXXOptions.AddFlags //不同的option实现了不同的AddFlags方法,AddFlags方法中设置具体的参数名
app.runCommand
app.Setup #创建scheduler
options.Options.Config #生成配置、informer这些东西,scheduler本质上也是一个controller
options.Optinos.ApplyTo //!!!加载配置文件
options.loadConfigFromFile
options.createClients //创建clientset
options.makeLeaderElectionConfig //创建选举配置文件
scheduler.NewInformerFactory //创建一个pod informer
scheduler.newPodInformer
coreinformers.NewFilteredPodInformer //创建一个会监听除了pod.Status.Phase!=Successed和pod.Status.Phase!=Failed的pod信息的informer
cache.NewSharedIndexInformer
dynamic.NewForConfigOrDie
dynamicinformer.NewFilteredDynamicSharedInformerFactory
scheduler.New //创建scheduler对应的configuration对象(创建一个默认的,然后合并我们配置的,然后得到最终的配置),并据此创建scheduler
scheduler.Configuration.create //创建schduler
scheduler.addAllEventHandlers //把scheduler的podQueue和前面创建的informer联系起来,这样informer就会把相关pod信息放到scheduler的三种调度队列中
app.Run
informers.SharedInformerFactory.Start //启动informer
go cache.sharedIndexInformer.Run
informers.SharedInformerFactory.WaitForCacheSync //等待缓存同步
......waitingForLeader......//如果开启了选举则等待成为leader(获取到指定的lease资源)
scheduler.Scheduler.Run //运行调度器
internalqueue.SchedulingQueue.Run //会开启两个新的调度队列线程,会把待调度的pod放到对应的队列,实际为queue.PriorityQueue.Run,priorityqueue是阻塞优先队列
//一共三个队列unschedulable/backoff/active
//无法调度(如资源不足导致找不到合适的node)->unschedulable;启动失败->backoff;可以再次尝试调度->activeQ
go wait.Until //一个死循环,1s运行一次
...... //省略号表示中间略去了一系列的函数调用
PriorityQueue.flushBackoffQCompleted //尝试把backoff队列中本次backoff等待时间到期的pod移动到active 队列
go wait.Until //一个死循环,30s运行一次
......
PriorityQueue.flushUnschedulableQLeftover //把unschedulable队列中本次unschedulable等待时间到期的pod移动到backoff队列或者active队列
//也就是说一个处于不可调度的pod在经历过一段时候后还是会尝试重新调度,默认等待时间是60s
PriorityQueue.movePodsToActiveOrBackoffQueue //如果pod需要等待backoff超时则会放入backoff队列,如果不需要则直接放入active队列
wait.UntilWithContext //一个死循环,每隔一段时间就会运行一次Scheduler.scheduleOne,默认是0,即有pod就调度
...... //省略号表示中间略去了一系列的函数调用
scheduler.Scheduler.scheduleOne //进行一次调度
internalqueue.SchedulingQueue.Close //走到这里表明上面已经退出了死循环,即程序即将结束,这里关闭队列
//配置文件加载流程: 加载kubeSchedulerConfiguration文件并转换成内部版本
app.NewSchedulerCommand
app.runCommand
app.Setup //创建scheduler配置对象:加载配置文件、创建clientset、创建leaderelect对象
options.Options.Config
options.Options.ApplyTo
options.loadConfigFromFile //读取文件
options.loadConfig //把二进制数据解析成kubeschedulerconfiguration对象并转换成__internal版本
serializer.CodeFactory.UniversalDecoder //创建一个解码器
......
versioning.NewCodec
//codec是根据scheme构造的的,我们通过addKnowTypes注册结构体和gvk之间的映射关系,然后通过schemebuilder.AddToScheme
//把注册好的映射关系添加到指定的scheme中
return &codec{
encoder: encoder,
decoder: recongnizer.decoder, //json/yaml解析器
convertor: runtime.unsafeObjectConvertor, //用来转换版本
creater: runtime.Scheme, //用来根据gvk创建obj
typer: runtime.Scheme, //获取对象gvk
defaulter: defaulter,
encodeVersion: encodeVersion,
decodeVersion: decodeVersion,
identifier: identifier(encodeVersion, encoder),
originalSchemeName: originalSchemeName,
}
versioning.codec.Decode //把二进制数据解析成kubeschedulerconfiguration对象并转换成__internal版本
recongnizer.decoder.Decode //decoder是一个解码器数组,循环调用每一个解码器来尝试解码(json解码器、probuff解码器、yaml解码器)
json.Serializer.Decode
yaml.YAMLToJSON //把二进制数据转换成json对象后再把json对象序列化成二进制数据
yaml.Unmarshal //解析二进制数据成map[interface]interface
yaml.convertToJSONableObject //把map[interface]interface转换成jsonable对象,
yaml.Marshal//把json对象序列化成二进制数据([]byte)
json.SimpleMetaFactory.Interpret //获取对象的gvk
json.Unmarshal//从反序列化后的json对象中获取对象的apiVersion字段和kind字段
schema.ParseGroupVersion //从对象的apiVersion字段中解析出对象的Group和version
runtime.UseOrCreateObject //创建一个gvk对应类型的空对象
runtime.Scheme.New
//runtime.Scheme.New函数就是根据gvk来获取对象的type,然后通过反射包来创建对象
//s是Serializer,s.gvkToType是一个map,key=对象的gvk,value=对象的Typer
if t, exists := s.gvkToType[kind]; exists {
return reflect.New(t).Interface().(Object), nil
}
json.Serializer.unmarshal //从由json对象序列化后的二进制数据中的值填充到刚创建的对象中
v1beta2.KubeSchedulerConfiguration.DecodeNestedObjects //解析plugin参数,目前k8s 1.23版本的kubeschedulerconfiguration用的是v1beta2版本
//即把kubeschedulerconfiguration.yaml文件中profiles.pluginConfig解析成指定参数类型即解析成DynamicArgs
v1beta2.PluginConfig.decodeNestedObjects
gvk := SchemeGroupVersion.WithKind(c.Name + "Args") //!!!如果我们插件的Name()返回的名字叫做Dynamic,那么对应的参数结构体的名字必须叫做DynamicArgs
//否则就无法把kubeschedulerconfiguration中profiles.plugconfig.args解析成结构体
//从而在调用NewDynamicScheduler函数创建plugin时,无法把传进来的runtime.Object对象强转换成DynamicArgs对象
//DynamicArgs的GV和kubeschedulerconfiguration保持一致
runtime.WithoutVersionDecoder.Decode
recognizer.decoder.Decode //把profiles.pluginConfig.args解析成DynamicArgs
runtime.Scheme.Default //对kubeschedulerconfiguration对象进行处理
//对于profile部分,他会把我们enable和disable的插件和他默认开启的插件进行合并,合并后profile中只含有要开启的插件
//也就是说经过这一步之后,凡是在profile对象的enable中的插件都会被开启,凡是不再enable中的插件都不会开启,disable保持为nil
v1beta2.SetObjectDefaults_KubeSchedulerConfiguration
v1beta2.SetDefaults_KubeSchedulerConfiguration
obj.Parallelism = pointer.Int32Ptr(16) //默认并行度为16,举例:比如在给node打分时,会开16个goroute去并行打分
v1beta2.setDefaults_KubeSchedulerProfile //合并插件,对于默认的插件还会设置默认的参数
v1beta2.getDefaultPlugins //获取默认开启的插件
v1beta2.mergePlugins //进行合并,合并后,最终的profile对象中会包含需要开启的所有插件,不在profile对象中的插件都不会被开启
v1beta2.GetPluginArgConversionScheme //获取一个scheme对象,该scheme中注册了config包下面的所有的struct,以便我们可以解析配置文件和自定义参数对象
runtime.Scheme.Default //对于每一个参数对象,如果有defaulter函数,则应用该对象对应的defaulter函数。
//注意:defaulter一般是在对象初始化之后调用,即我们从配置文件读取了值并已经赋值给了对应的对象,
//defaulter相当于一个postInit操作,类似于调度器插件函数postBind运行在bind之后
SetObjectDefaults_DynamicArgs //defaulter函数都是叫SetObjectDefaults_XXXX,XXX代表struct名字,可以有defaulter-gen工具自动生成
runtime.unsafeObjectConvertor.ConvertToVersion //把对象从外部版本转换成内部版本即__interval版本
runtime.Scheme.UnsafeConvertToVersion
runtime.Scheme.convertToVersion
conversion.Converter.Convert //根据对象的gvk获取对应的转换函数,然后把对象转换成内部版本
v1beta2.Convert_v1beta2_KubeSchedulerConfiguration_To_config_KubeSchedulerConfiguration //把kubeschedulerconfiguration转换成__internal版本
v1beta2.Convert_v1beta2_DynamicArgs_To_config_DynamicArgs //把kubeschedulerconfiguration的profiles.pluginconfig.args转换成__internal版本
options.CreateKubeConfig
options.createClients
......
//调度器创建流程:插件加载流程
scheduler.New //创建默认的配置、合并kubeschedulerconfiguration文件中的配置、创建scheduler、把scheduler和informer联系起来
frameworkplugins.NewInTreeRegistry //创建默认的调度器插件集合,registry是一个map,即type Registry map[string]PluginFactory,
//key=plugin的名字,对应Dynamic,value=pluginFactory即创建一个plugin的方法,对应NewDynamicScheduler
runtime.Registry.Merge //对于一个调度器,他把默认的调度器插件集合和我们在app.NewSchedulerCommand中通过app.WithPlugin注册的插件合并
//注意,registry相当于一个插件工厂,包含了所有已知的插件和对应的构造函数,然后经过Default处理后的profile指定了要开启的插件
//创建调度器的时候,根据profile中的启用的插件的名字去registry中查找对应的构造函数,然后构造插件对象,然后再把插件对象放到调度器中
configurator:=&Configurator{...省略...} //创建配置对象
scheduler.Configurator.create() //创建调度器
internalqueue.NewPodNominator //抢占相关(调度失败以后才会尝试抢占)
profile.NewMap //根据kubeschedulerconfiguration中的profiles字段创建调度器集合,profiles可以包含多个profile,一个profile对应一个调度器的配置文件
//调度器集合用map保存,key=schedulerName,value=Framework(一个Framework代表一个调度器),Framework内部包含了profile中启用的插件
profile.newProfile
runtime.NewFramework
runtime.frameworkImpl.pluginsNeed //只有在profile对象中enable的插件才是调度器所需要开启的插件
...根据plugin名字从registry中获取对应的构造函数,然后创建插件...
runtime.updatePluginList //把创建好的插件对象放到调度器对象中
internalqueue.NewSchedulingQueue //创建调度器的podQueue
scheduler.NewGenericScheduler //创建调度器算法对象,这个对象是调度器的一部分,这个对象实现了Schedule方法,调度器算法对象对应的结构体类型为scheduler.genericScheduler
//调度器在scheduler.Scheduler.scheduleOne方法中通过此对象的Schedule方法来从所有node中选出一个合适的node供pod调度
//也就是说我们的调度插件是在scheduler.genericScheduler.Schedule方法中调用
return &Scheduler{...省略...} //最终的调度器对象,scheduler.Scheduler
scheduler.addAllEventHandlers //把调度器的podqueue和之前创建的informer联系起来
//leader-elect流程
app.NewSchedulerCommand
app.runCommand
app.Setup
options.Options.Config
options.makeLeaderElectionConfig //创建leaderElection配置对象
resourcelock.NewFromKubeconfig //从kubeschedulerconfiguration文件创建resourcelock。需要配置ResourceLock为leases(一般都是用leases锁)
//ResouceName为leader-elect过程中所竞争的锁,如果没有配置则默认竞争名为" kube-scheduler"的锁,
//而系统一般已经有默认的调度器在运行了,所以此时如果开启选举,我们的调度器一般会一直阻塞于获取锁的状态
//ResouceNamespace:leases对象所在的命名空间
app.Run //如果没有开启选举则直接运行scheduler.Scheduler.Run,否则就先要获取锁
leaderelection.NewLeaderElector //创建leaderElector对象,会把scheduler.Scheduler.Run作为回调函数,在获取到对应的leases锁后运行该函数
leaderelection.LeaderElector.Run //
leaderelection.LeaderElector.acquire //尝试获取锁,一直阻塞,直到获取锁或者出错
wait.JitterUtil //死循环,不断尝试获取锁
......
leaderelection.LeaderElector.tryAcquireOrRenew //尝试获取锁
resourcelock.Get
v1.leases.Get
rest.RESTClient.Get //查询apiserver
1:锁不存在,则创建,并返回成功
resourcelock.Create //如果锁不存在,说明此时不会有人获取到该锁,所以他创建锁并获取领导者资格
2:锁存在,如果自己不是leader且锁没有过时,则返回失败
3:锁存在,如果自己是leader或者锁已经过时,则更新锁对象,并返回成功
resourcelock.Update
go leaderelection.LeaderCallbacks.OnStartedLeading //调用回调函数,即我们前面设置的scheduler.Scheduler.Run,即新开一个线程去运行调度主程序
leaderelection.LeaderElector.renew //内部一个死循环不断更新锁资源对象,如果更新失败,则关闭scheduler.Scheduler.Run线程,然后退出运行
wait.Until //死循环
......
leaderelection.LeaderElector.tryAcquireOrRenew
1:更新成功,开始下一次更新周期
2:如果更新失败,则退出运行
cancel() //leaderelector和scheduler.Scheduler.Run使用的是同一个ctx,所以在leaderelector中调用该ctx的cancel函数会通知scheduler.Scheduler.Run关闭
//具体调度过程:
scheduler.Scheduler.scheduleOne
scheduler.Scheduler.NextPod //NextPod函数是在创建调度器的时候外部传入的,queue.MakeNextPodFunc对队列pop的一个封装,增加了日志功能
queue.PriorityQueue.Pop
scheduler.Scheduler.frameworkForPod //通过pod的schedulerName字段获取pod对应的调度器
scheduler.Scheduler.skipPodSchedule //如果调度时pod已经被删除了或者pod在之前就已经调度完了但是此时还没有绑定完成状态还没有更新,就跳过调度
scheduler.genericScheduler.Schedule //为pod挑选一个合适的node
scheduler.genericScheduler.snapshot //获取当前node最新信息的一个快照
scheduler.genericScheduler.findNodesThatFitPod //通过运行过滤插件来过滤Node,来找到可调度的node集合
runtime.frameworkImpl.RunPreFilterPlugins //进行一些初始化操作,把数据写到framework.CycleState变量中,然后在后续阶段从此变量中读取数据,
//在该pod的调度周期,该插件的prefilter函数只会运行一次
scheduler.genericScheduler.evaluateNominatedNode //如果该pod在之前的调度中已经提名了,则首先尝试提名的node,失败再去迭代所有node
scheduler.genericScheduler.findNodesThatPassFilters //运行过滤插件来过滤node,如果没有过滤插件,则默认都可以通过过滤
runtime.frameworkImpl.RunFilterPluginsWithNominatedPods//会过滤两次,第一次过滤时会假定提名的pod抢占成功即给node已使用的资源加上该提名pod的信息,
//第二次则假定提名的pod抢占失败,即不考虑提名pod的资源
runtime.frameworkImpl.RunFilterPlugins //过滤一个node,默认有十几个过滤器,包括了节点亲和性这些,v1beta2.getDefaultPlugins获取所有默认的插件
scheduler.genericScheduler.findNodesThatPassFilters //如果提名的node没有通过过滤器检测,则再次尝试过滤所有node
scheduler.prioritizeNodes //打分
runtime.frameworkImpl.RunPreScorePlugins //有许多插件,如节点亲和性、拓扑逻辑等,暂不了解
runtime.frameworkImpl.RunScorePlugins //打分,默认也有一大堆的打分插件
1:运行所有打分插件
2:每一个节点的最终分=sum(插件分*插件权重)
if 没有找到合适node:
runtime.frameworkImpl.RunPostFilterPlugins //默认只有一个抢占插件,该抢占逻辑就是优先级低的pod可以被抢占(可能需要抢占多个才能),然后挑选每一个node上可以被抢占的pod集合,然后先挑选node,然后再挑选最好的node
runtime.frameworkImpl.runPostFilterPlugin
defaultpreemption.DefaultPreemption.PostFilter
preemption.Evaluator.Preempt
preemption.Evaluator.findCandidates //挑选合适的node
preemption.Evaluator.DryRunPreemption //可以抢占低优先级的pod,挑选被抢占的pod
preemption.Evaluator.SelectCandidate //挑选一个最合适的node
preemption.Evaluator.prepareCandidate //通过clientset向apiserver执行删除操作,此处无需更新node的资源使用情况,此时此pod为不可调度状态,
//本次调度即不会调度pod,但会提名pod,就是标记一下这个pod在等待这个node,等到此pod再次被调度时,会优先尝试调度到这个node上
//当低优先级pod被驱逐后,因为k8s在scheduler.addAllEventHandlers中注册了Add/Update/Delete函数,所以是在此处更新node的资源使用情况
scheduler.Scheduler.recordSchedulingFailure //本次调度到此结束,此处会把pod放到不可调度队列,等待下一次调度
scheduler.Scheduler.assume //把pod添加到调度器内部的cache,该cache保存了所有假定已经调度完毕的pod(即挑选了node,但是还没有绑定完成,下次调度会被跳过)
cache.schedulerCache.AssumePod //添加到cache内部的map
//此处操作的pod结构体是副本,所以系统中的pod结构体的status字段仍然是未绑定,所以该pod当发生updatePod事件时该pod仍然可能再次被添加到调度队列
cache.schedulerCache.addPod
framework.NodeInfo.AddPod
framework.NodeInfo.AddPodInfo //这一步会把pod的cpu/memory limits(不是requests)加到node对象上,也就是假定绑定成功,假定这些资源被消耗了
framework.NodeInfo.calculateResource //这一步会计算出一个pod所需的资源:所有容器的limits之和
...更新nodeinfo以及时间戳,时间戳在更新nodesnap的时候判断一个node信息是否是最新的...
runtime.frameworkImpl.RunReservePluginsReserve //执行预留资源操作,实际只是更新本地资源对象,并没有更新apiserver?
volumebinding.VolumeBinding.Reserve //默认reserve阶段只有这个插件,绑定pvc/pv
runtime.frameworkImpl.RunPermitPlugins //默认没有批准插件
go func{ //异步绑定,在此之前已经假定pod成功绑定到了该node上,即更新了相关对象
runtime.frameworkImpl.WaitOnPermit //等待批准,默认无批准插件,所以无需等待
runtime.frameworkImpl.RunPreBindPlugins
scheduler.Scheduler.bind //执行绑定操作,实际就是发一个请求到apiserver更新apiserver上对象的状态
runtime.frameworkImpl.RunBindPlugins
defaultbinder.DefaultBinder.Bind
v1.pods.Bind //post请求到apiserver更新对象状态
if err!=nil
runtime.frameworkImpl.RunReservePluginsUnreserve //如果有错误则取消预留的资源(默认只有volume可能需要处理)
runtime.frameworkImpl.RunPostBindPlugins //默认没有postbind插件,但是我们自己给他添加了一个dynamic插件
}
//scheme加载:
//k8s.io\kubernetes\pkg\scheduler\apis\config\scheme.go: 这是k8s内建的scheduler的scheme
package scheme
import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
config "k8s.io/kubernetes/pkg/scheduler/apis/config"
configv1beta2 "k8s.io/kubernetes/pkg/scheduler/apis/config/v1beta2"
configv1beta3 "k8s.io/kubernetes/pkg/scheduler/apis/config/v1beta3"
)
var (
// Scheme is the runtime.Scheme to which all kubescheduler api types are registered.
Scheme = runtime.NewScheme() //这是k8s内建的scheme,如果我们想要调度器成功加载我们自己的对象,那么就必须把我们的pair(gvk,struct)注册到这个scheme里
// Codecs provides access to encoding and decoding for the scheme.
Codecs = serializer.NewCodecFactory(Scheme, serializer.EnableStrict) //编码/解码器,用来二进制数据中解码出对应的struct,我们在启动的时候在options.loadConfig里调用这个Codecs
)
func init() { //init函数会在这个文件加载的时候自动被调用,这是go的机制
AddToScheme(Scheme)
}
func AddToScheme(scheme *runtime.Scheme) {
utilruntime.Must(config.AddToScheme(scheme)) //把config包中通过addKnowTypes方法注册的pair(gvk,struct)注册到整个scheme中,下面以config包为例
utilruntime.Must(configv1beta2.AddToScheme(scheme))
utilruntime.Must(configv1beta3.AddToScheme(scheme))
utilruntime.Must(scheme.SetVersionPriority(
configv1beta3.SchemeGroupVersion,
configv1beta2.SchemeGroupVersion))
}
//k8s.io/kubernetes/pkg/scheduler/apis/config.go
package config
import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
const GroupName = "kubescheduler.config.k8s.io"
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: runtime.APIVersionInternal}
var (
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
AddToScheme = SchemeBuilder.AddToScheme //在上面k8s.io\kubernetes\pkg\scheduler\apis\config\scheme的AddToScheme中被调用,即把本包内的对象注册到了全局scheduler使用的scheme中
)
func addKnownTypes(scheme *runtime.Scheme) error {
//注册gvk(g:group,v:version,k:kind)和struct,实际就是一个map[gvk]struct,我们读取一个二进制文件,首先解析成map[interface]interface
//然后读取apiVersion和Kind,然后从apiversion中再解析出Group和Version,这样就获取了GVK,就知道二进制文件对应的结构体类型
//就可以通过反射来创建一个初始对象,就知道有哪些字段了,然后再从map[interface]interface读取对应的字段填充到初始对象
//这样就完成了从二进制文件创建对象的功能了
scheme.AddKnownTypes(SchemeGroupVersion,
&KubeSchedulerConfiguration{},
&DefaultPreemptionArgs{},
&InterPodAffinityArgs{},
&NodeResourcesFitArgs{},
&PodTopologySpreadArgs{},
&VolumeBindingArgs{},
&NodeResourcesBalancedAllocationArgs{},
&NodeAffinityArgs{},
)
return nil
}
//k8s.io\apimachinery\pkg\runtime.go
func (s *Scheme) AddKnownTypes(gv schema.GroupVersion, types ...Object) {
s.addObservedVersion(gv)
for _, obj := range types {
t := reflect.TypeOf(obj)
if t.Kind() != reflect.Ptr {
panic("All types must be pointers to structs.")
}
t = t.Elem()
s.AddKnownTypeWithName(gv.WithKind(t.Name()), obj) //把gv,k组合成gvk,然后把gvk和obj关联起来
}
}
//crane\pkg\plugins\apis\config\register.go 这是我们自己的配置文件
package config
import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog/v2"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
)
var SchemeGroupVersion = schema.GroupVersion{Group: kubeschedulerconfig.GroupName, Version: runtime.APIVersionInternal}
var (
localSchemeBuilder = &kubeschedulerconfig.SchemeBuilder //因为需要k8s能够从文件识别、加载、创建我们自己的对象(如DynamicArgs等),
//所以需要把我们的pair(gvk,dynamicArgs)注册到全局的scheduler的scheme
//而config.SchemeBuilder中的pair(gvk,struct)会被注册到全局scheme,
//所以我们把我们的pair添加到config.SchemeBuilder也是可行的
AddToScheme = localSchemeBuilder.AddToScheme
)
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&DynamicArgs{},
&NodeResourceTopologyMatchArgs{},
)
return nil
}
func init() {
localSchemeBuilder.Register(addKnownTypes) //把我们的pair添加到config.SchemeBuilder中
localSchemeBuilder.Register(RegisterDefaults)
}
/* k8s option机制:
option本质就是一个函数,不同包下面的option是接受不同参数的函数。创建一个option相当于创建了一个容器,这个容器内不保存了一个值(类似于闭包)
然后当我们传递一个对象给这个容器的时候,option就会把内部保存的这个值保存到这个对象里面
举例:app.WithPlugin函数返回一个app.Option对象,这个option对象内部保存了两个值(name,factory),
当我们传递一个resitry对象给这个option的时候,这个option就会把(name,factory)保存到registery对象内部
*/
cmd := app.NewSchedulerCommand(
app.WithPlugin(dynamic.Name, dynamic.NewDynamicScheduler),
app.WithPlugin(noderesourcetopology.Name, noderesourcetopology.New),
)
type Option func(runtime.Registry) error
func WithPlugin(name string, factory runtime.PluginFactory) Option {
return func(registry runtime.Registry) error {
return registry.Register(name, factory)
}
}
func Setup(){
.......
outOfTreeRegistry := make(runtime.Registry) //我们创建一个叫做outOfTreeRegistry的registry(因为k8s有个默认的叫做inTree,所以叫做outOfTree,后面会合并这两个registery)
for _, option := range outOfTreeRegistryOptions { //option即我们用app.WithPlugin添加的外部插件
if err := option(outOfTreeRegistry); err != nil { //option(outOfTreeRegistry)这一步会把option里面的插件登记到这个outOfTreeRegistry中
return nil, nil, err
}
}
......
}
附注
v1beta2.getDefaultPlugins获取所有默认的插件
调度器的Extenders字段一直为空,即默认调度器是没有扩展的,可以忽略
别人写的k8s 调度器源码注释,我觉得还挺好,贴出来和大家分享一下:
https://github.com/jindezgm/k8s-src-analysis/blob/master/kube-scheduler/Plugin.md
更多推荐
已为社区贡献6条内容
所有评论(0)