k8s 实现自定义控制器-原理篇
文章最开头实现的控制器虽然也能实现控制器功能,但并不是官方推荐的编程模式,现在来看下官方推荐的自定义控制器的编程模式。在之前直接写informer.AddEventHandler不同,这里引入了一个工作队列,EventHandler里的逻辑就是把收到的消息发送到工作队列中,然后有一段代码不断从工作队列中消费消息,而这段代码就被称作control loop 控制循环。client 包提供了现成的工作
k8s 实现自定义控制器-原理篇
机制介绍
k8s 中pod,deployment是内置的资源,而k8s允许我们自定义资源类型,而如何对这些自定义的资源类型进行业务逻辑控制呢,这就需要一个控制器来完成这种操作,这个控制器被k8s称作operator。
而client-go k8s的客户端包为我们提供了informer机制来方便的去编写自己的控制器。k8s里也内置了许多控制器,例如NodeController、Deployment、DaemonSet和ServiceController等,它们打包成单个二进制程序kube-controller-manager并统一运行在同一个守护进程中。
informer 中文是告密者的意思,想想也符合语义,informer 通过与 api server 通信,将各种资源的信息告诉给开发者。
手写一个最简单的控制器
接下来看看如何用client-go 手写一个k8s控制器。
package main
import (
"fmt"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
"time"
)
func main() {
// 自定义与kube-apiserver通信的config配置
master := "192.168.1.10" // apiserver url
kubeconfig := "/.kube/config"
config, err := clientcmd.BuildConfigFromFlags(master, kubeconfig)
if err != nil {
klog.Fatalf("Failed to create config: %v", err)
}
// 初始化与apiserver通信的clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatalf("Failed to create client: %v", err)
}
// 初始化shared informer factory以及pod informer
factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
podInformer := factory.Core().V1().Pods()
informer := podInformer.Informer()
// 注册informer的自定义ResourceEventHandler
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// todo
},
UpdateFunc: func(oldObj, newObj interface{}) {
// todo
},
DeleteFunc: func(obj interface{}) {
// todo
},
})
// 启动shared informer factory,开始informer的list & watch操作
stopper := make(chan struct{})
go factory.Start(stopper) // factory start 会将其包含的所有informer都调用run方法
// 等待informer 从kube-apiserver同步资源完成,即informer的list操作获取的对象都存入到informer中的indexer本地缓存中
// 或者调用factory.WaitForCacheSync(stopper)
if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
}
首先用kubeconfig 初始化client用于和api server进行通信。
然后初始化了一个informer的factory,通过factory创建了一个informer。
接着向informar添加了一个事件处理函数,这个事件处理函数就可以写上自己的逻辑。
然后调用factory 的start方法,factory start方法内部调用它拥有的informer的 run方法,自此,informer正式启动。
上述例子用的clientset 初始化了factory,这样只能创建对内置资源的informer,除此以外还能通过其他方式创建支持自定义资源监控的informer,叫做dynamic informer,不论那种informer,他们底层都是构建了一个SharedIndexInformer对象。
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
informer 工作机制
结合下面的图和接下来的源码来分析下informer的工作机制以及各个组件间的联系。
结合上面这张图,大致看下informer的工作机制,informer内部会有个Reflector去进行list watch操作,可以将list watch 操作其实是 客户端与api server建立了长连接,然后不断监听来自 api server传来的事件信息。而建立长连接的机制,则是通过http chunked传输实现的。
reflector将监听到的信息传给了一个队列,然后informer内部的controller将会消费这个队列消息,执行HandleDeltas方法,然后将消息存入indexer,indexer可以认为是客户端的本地存储,用于保存收到的消息,可用于消费消息失败后重新消费消息。 最后HandleDeltas 又会调用distribute方法,将消息发给监听者。
监听者是何时初始化的呢,还记得在上面手写一个最简单的控制器种的informer.AddEventHandler 方法吗,这个方法实现逻辑即创建一个监听者,然后将传入的方法作为监听者的处理函数。
接下来看看实际的代码片段。
factory 调用start方法会将内部拥有的informer全部执行起来。
informer的run方法内部会创建一个controller结构 执行真正的控制逻辑。
controller 的run方法内部,又会创建一个reflector并执行其Run方法,去真正执行list watch 逻辑
再来结合下informer工作机制那张图,应该可以猜到控制器的processLoop 就是不断循环处理来自Delta FIFO 先进先出队列的信息,而reflctor 的run方法内部则实现监听,并将监听的信息放到Delta FIFO 队列里。而事实上的确是这样的。
完整的自定义控制器实现
文章最开头实现的控制器虽然也能实现控制器功能,但并不是官方推荐的编程模式,现在来看下官方推荐的自定义控制器的编程模式。
在之前直接写informer.AddEventHandler不同,这里引入了一个工作队列,EventHandler里的逻辑就是把收到的消息发送到工作队列中,然后有一段代码不断从工作队列中消费消息,而这段代码就被称作control loop 控制循环。
client 包提供了现成的工作队列
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
informer.AddEventHandler
cache.ResourceEventHandlerFuncs{
// 响应新增资源事件的方法,可以按照业务需求来定制,
// 这里的做法比较常见:写入工作队列
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
// 响应修改资源事件的方法,可以按照业务需求来定制,
// 这里的做法比较常见:写入工作队列
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
queue.Add(key)
}
},
// 响应修改资源事件的方法,可以按照业务需求来定制,
// 这里的做法比较常见:写入工作队列,注意删除的时候生成key的方法和新增修改不一样
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta queue, therefore for deletes we have to use this
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
再实现一段逻辑不断读取工作队列消息
func (c *Controller) runWorker() {
for c.processNextItem() {
}
}
func (c *Controller) processNextItem() bool {
key, quit := c.queue.Get()
// 处理消息...
}
这种编程模式目前已经有工具帮我们生成框架代码,kubebuilder,operator sdk都可以帮我们生成框架代码,后续将会出这些系列文章,尽情期待。
更多推荐
所有评论(0)