k8s 实现自定义控制器-原理篇

容器系列文章

容器系列视频
在这里插入图片描述

机制介绍

k8s 中pod,deployment是内置的资源,而k8s允许我们自定义资源类型,而如何对这些自定义的资源类型进行业务逻辑控制呢,这就需要一个控制器来完成这种操作,这个控制器被k8s称作operator。

而client-go k8s的客户端包为我们提供了informer机制来方便的去编写自己的控制器。k8s里也内置了许多控制器,例如NodeController、Deployment、DaemonSet和ServiceController等,它们打包成单个二进制程序kube-controller-manager并统一运行在同一个守护进程中。

informer 中文是告密者的意思,想想也符合语义,informer 通过与 api server 通信,将各种资源的信息告诉给开发者。

手写一个最简单的控制器

接下来看看如何用client-go 手写一个k8s控制器。

package main

import (
	"fmt"
	"k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/klog/v2"
	"time"
)

func main() {
	// 自定义与kube-apiserver通信的config配置
	master := "192.168.1.10" // apiserver url
	kubeconfig := "/.kube/config"
	config, err := clientcmd.BuildConfigFromFlags(master, kubeconfig)
	if err != nil {
		klog.Fatalf("Failed to create config: %v", err)
	}
	// 初始化与apiserver通信的clientset
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		klog.Fatalf("Failed to create client: %v", err)
	}
	// 初始化shared informer factory以及pod informer
	factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
	podInformer := factory.Core().V1().Pods()
	informer := podInformer.Informer()
	
	// 注册informer的自定义ResourceEventHandler
	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			// todo
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			// todo
		},
		DeleteFunc: func(obj interface{}) {
			// todo
		},
	})
	// 启动shared informer factory,开始informer的list & watch操作
	stopper := make(chan struct{})
	go factory.Start(stopper) // factory start 会将其包含的所有informer都调用run方法

	// 等待informer 从kube-apiserver同步资源完成,即informer的list操作获取的对象都存入到informer中的indexer本地缓存中
	// 或者调用factory.WaitForCacheSync(stopper)
	if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
		return
	}

}

首先用kubeconfig 初始化client用于和api server进行通信。

然后初始化了一个informer的factory,通过factory创建了一个informer。

接着向informar添加了一个事件处理函数,这个事件处理函数就可以写上自己的逻辑。

然后调用factory 的start方法,factory start方法内部调用它拥有的informer的 run方法,自此,informer正式启动。

上述例子用的clientset 初始化了factory,这样只能创建对内置资源的informer,除此以外还能通过其他方式创建支持自定义资源监控的informer,叫做dynamic informer,不论那种informer,他们底层都是构建了一个SharedIndexInformer对象。

func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {

informer 工作机制

结合下面的图和接下来的源码来分析下informer的工作机制以及各个组件间的联系。
源自k8s源码分析
结合上面这张图,大致看下informer的工作机制,informer内部会有个Reflector去进行list watch操作,可以将list watch 操作其实是 客户端与api server建立了长连接,然后不断监听来自 api server传来的事件信息。而建立长连接的机制,则是通过http chunked传输实现的。

reflector将监听到的信息传给了一个队列,然后informer内部的controller将会消费这个队列消息,执行HandleDeltas方法,然后将消息存入indexer,indexer可以认为是客户端的本地存储,用于保存收到的消息,可用于消费消息失败后重新消费消息。 最后HandleDeltas 又会调用distribute方法,将消息发给监听者。

监听者是何时初始化的呢,还记得在上面手写一个最简单的控制器种的informer.AddEventHandler 方法吗,这个方法实现逻辑即创建一个监听者,然后将传入的方法作为监听者的处理函数。

接下来看看实际的代码片段。

factory 调用start方法会将内部拥有的informer全部执行起来。
image.png

informer的run方法内部会创建一个controller结构 执行真正的控制逻辑。
image.png
controller 的run方法内部,又会创建一个reflector并执行其Run方法,去真正执行list watch 逻辑
image.png

再来结合下informer工作机制那张图,应该可以猜到控制器的processLoop 就是不断循环处理来自Delta FIFO 先进先出队列的信息,而reflctor 的run方法内部则实现监听,并将监听的信息放到Delta FIFO 队列里。而事实上的确是这样的。

完整的自定义控制器实现

文章最开头实现的控制器虽然也能实现控制器功能,但并不是官方推荐的编程模式,现在来看下官方推荐的自定义控制器的编程模式。

image.png
在之前直接写informer.AddEventHandler不同,这里引入了一个工作队列,EventHandler里的逻辑就是把收到的消息发送到工作队列中,然后有一段代码不断从工作队列中消费消息,而这段代码就被称作control loop 控制循环。

client 包提供了现成的工作队列

	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

informer.AddEventHandler

cache.ResourceEventHandlerFuncs{
		// 响应新增资源事件的方法,可以按照业务需求来定制,
		// 这里的做法比较常见:写入工作队列
		AddFunc: func(obj interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}
		},
		// 响应修改资源事件的方法,可以按照业务需求来定制,
		// 这里的做法比较常见:写入工作队列
		UpdateFunc: func(old interface{}, new interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(new)
			if err == nil {
				queue.Add(key)
			}
		},
		// 响应修改资源事件的方法,可以按照业务需求来定制,
		// 这里的做法比较常见:写入工作队列,注意删除的时候生成key的方法和新增修改不一样
		DeleteFunc: func(obj interface{}) {
			// IndexerInformer uses a delta queue, therefore for deletes we have to use this
			// key function.
			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}
		},

再实现一段逻辑不断读取工作队列消息

func (c *Controller) runWorker() {
	for c.processNextItem() {
	}
}

func (c *Controller) processNextItem() bool {
	key, quit := c.queue.Get()
	// 处理消息...
	}

这种编程模式目前已经有工具帮我们生成框架代码,kubebuilder,operator sdk都可以帮我们生成框架代码,后续将会出这些系列文章,尽情期待。

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐