专注于大数据及容器云核心技术解密,可提供全栈的大数据+云原生平台咨询方案,请持续关注本套博客。如有任何学术交流,可随时联系。留言请关注《数据云技术社区》公众号。

1 CRD资源扩展

  • CRD 即CustomResourceDefinition,是 kubernetes 极力推荐的资源扩展方式。
  • 基于 CRD 技术,用户能将自定义资源注册到 kubernetes 系统,并像使用原生资源(如 pod、statefulset )一样对自定义资源进行创建、查看、修改、删除等操作,实现了类似于插件式的功能增强。

2 CRD开发步骤

1.根据go.mod设定,建立目录$GOPATH/src/k8s.io 
   module k8s.io/sample-controller
2.设置代理
    export GOPROXY=https://goproxy.io
3.加载依赖包,自动归档到vendor目录
    go mod vendor -v
4.设置update-codegen.sh
      "${CODEGEN_PKG}"/generate-groups.sh "deepcopy,client,informer,lister" \
      k8s.io/sample-controller/pkg/client k8s.io/sample-controller/pkg/apis \
      samplecontroller:v1alpha1 \
      --output-base "$(dirname "${BASH_SOURCE[0]}")/../../.." \
      --go-header-file "${SCRIPT_ROOT}"/hack/boilerplate.go.txt
 5.自定义控制器及main.go
 6.编译sample-controller生成可执行程序sample-controller
    go build -o sample-controller .
 7.运行sample-controller
    ./sample-controller -kubeconfig=$HOME/.kube/congig
 8.注册 CRD 资源 
    crd.yaml
    
    apiVersion: apiextensions.k8s.io/v1beta1
    kind: CustomResourceDefinition
    metadata:
      name: foos.samplecontroller.k8s.io
    spec:
      group: samplecontroller.k8s.io
      version: v1alpha1
      names:
        kind: Foo
        plural: foos
      scope: Namespaced
 
    kubectl create -f artifacts/examples/crd.yaml
9.创建 CRD 资源 
    example-foo.yaml
    
    apiVersion: samplecontroller.k8s.io/v1alpha1
    kind: Foo
    metadata:
      name: example-foo
    spec:
      deploymentName: example-foo
      replicas: 1
    
    kubectl create -f artifacts/examples/example-foo.yaml

 10.查看 Foo 的部署情况
    kubectl get deployments
复制代码

3 CRD项目自定义控制器开发(以sample-controller为例)

3.1 main.go开发

  • 读取 kubeconfig 配置
  • 初始化kubeClient,监听普通事件
  • 初始化exampleClient, 监听Foo 事件
  • 初始化kubeInformerFactory和exampleInformerFactory
  • 初始化自定义 Controller
  • 开启 Controller
func main() {
	flag.Parse()
 
	// set up signals so we handle the first shutdown signal gracefully
	stopCh := signals.SetupSignalHandler()
 
        //读取 kubeconfig 配置,
	cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
	if err != nil {
		klog.Fatalf("Error building kubeconfig: %s", err.Error())
	}
        //监听普通事件
	kubeClient, err := kubernetes.NewForConfig(cfg)
	if err != nil {
		klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
	}
    
        //一个监听 Foo 事件
	exampleClient, err := clientset.NewForConfig(cfg)
	if err != nil {
		klog.Fatalf("Error building example clientset: %s", err.Error())
	}
 
	kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
	exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)
 
       //基于 Client(kubeClient, exampleClient)、Informer 初始化自定义 Controller,
       //监听 Deployment 以及 Foos 资源变化
	controller := NewController(kubeClient, exampleClient,
		kubeInformerFactory.Apps().V1().Deployments(),
		exampleInformerFactory.Samplecontroller().V1alpha1().Foos())
 
	// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
	// Start method is non-blocking and runs all registered informers in a dedicated goroutine.
	kubeInformerFactory.Start(stopCh)
	exampleInformerFactory.Start(stopCh)
    
       //开启 Controller
	if err = controller.Run(2, stopCh); err != nil {
		klog.Fatalf("Error running controller: %s", err.Error())
	}
}
复制代码

3.2 Controller处理事件逻辑

  • Controller 的关键成员即两个事件的 Listener(appslisters.DeploymentLister、listers.FooLister)这两个成员将由 main 函数传入参数进行初始化。
  • 为了缓冲事件处理,这里使用队列暂存事件,相关成员即为 workqueue.RateLimitingInterface record.EventRecorder 用于记录事件。
// Controller is the controller implementation for Foo resources
type Controller struct {
	kubeclientset kubernetes.Interface
	sampleclientset clientset.Interface
 
	deploymentsLister appslisters.DeploymentLister
	deploymentsSynced cache.InformerSynced
	foosLister        listers.FooLister
	foosSynced        cache.InformerSynced
 
	// workqueue is a rate limited work queue. This is used to queue work to be
	// processed instead of performing it as soon as a change happens. This
	// means we can ensure we only process a fixed amount of resources at a
	// time, and makes it easy to ensure we are never processing the same item
	// simultaneously in two different workers.
	workqueue workqueue.RateLimitingInterface
	// recorder is an event recorder for recording Event resources to the
	// Kubernetes API.
	recorder record.EventRecorder
}

//Controller.go
controller := &Controller{
	kubeclientset:     kubeclientset,
	sampleclientset:   sampleclientset,
	deploymentsLister: deploymentInformer.Lister(),
	deploymentsSynced: deploymentInformer.Informer().HasSynced,
	foosLister:        fooInformer.Lister(),
	foosSynced:        fooInformer.Informer().HasSynced,
	workqueue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),
	recorder:          recorder,
}

//main.go初始化入参
func NewController(
	kubeclientset kubernetes.Interface,
	sampleclientset clientset.Interface,
	deploymentInformer appsinformers.DeploymentInformer,
	fooInformer informers.FooInformer) *Controller
复制代码

3.3 自定义Controller事件处理函数

  • 设置对 Foo 资源变化的事件处理函数(Add、Update 均通过 enqueueFoo 处理)
  • 设置对 Deployment 资源变化的事件处理函数(Add、Update、Delete 均通过 handleObject 处理)
  • 引出enqueueFoo 以及 handleObject 的实现

   // Set up an event handler for when Foo resources change
	fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: controller.enqueueFoo,
		UpdateFunc: func(old, new interface{}) {
			controller.enqueueFoo(new)
		},
	})
	
	// Set up an event handler for when Deployment resources change. This
	// handler will lookup the owner of the given Deployment, and if it is
	// owned by a Foo resource will enqueue that Foo resource for
	// processing. This way, we don't need to implement custom logic for
	// handling Deployment resources. More info on this pattern:

	deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: controller.handleObject,
		UpdateFunc: func(old, new interface{}) {
			newDepl := new.(*appsv1.Deployment)
			oldDepl := old.(*appsv1.Deployment)
			if newDepl.ResourceVersion == oldDepl.ResourceVersion {
				// Periodic resync will send update events for all known Deployments.
				// Two different versions of the same Deployment will always have different RVs.
				return
			}
			controller.handleObject(new)
		},
		DeleteFunc: controller.handleObject,
	})
复制代码

3.4 enqueueFoo 以及 handleObject 实现

  • enqueueFoo 就是解析 Foo 资源为namespace/name 形式的字符串,然后入队
  • handleObject 监听了所有实现了 metav1 的资源,但只过滤出 owner 是 Foo 的,将其解析为 namespace/name 入队。不是 Foo不做处理。
// enqueueFoo takes a Foo resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* be
// passed resources of any type other than Foo.
func (c *Controller) enqueueFoo(obj interface{}) {
	var key string
	var err error
	if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
		utilruntime.HandleError(err)
		return
	}
	c.workqueue.AddRateLimited(key)
}
 
// handleObject will take any resource implementing metav1.Object and attempt
// to find the Foo resource that 'owns' it. It does this by looking at the
// objects metadata.ownerReferences field for an appropriate OwnerReference.
// It then enqueues that Foo resource to be processed. If the object does not
// have an appropriate OwnerReference, it will simply be skipped.
func (c *Controller) handleObject(obj interface{}) {
	var object metav1.Object
	var ok bool
	if object, ok = obj.(metav1.Object); !ok {
		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
		if !ok {
			utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type"))
			return
		}
		object, ok = tombstone.Obj.(metav1.Object)
		if !ok {
			utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type"))
			return
		}
		klog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName())
	}
	klog.V(4).Infof("Processing object: %s", object.GetName())
	if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
		// If this object is not owned by a Foo, we should not do anything more
		// with it.
		if ownerRef.Kind != "Foo" {
			return
		}
 
		foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name)
		if err != nil {
			klog.V(4).Infof("ignoring orphaned object '%s' of foo '%s'", object.GetSelfLink(), ownerRef.Name)
			return
		}
 
		c.enqueueFoo(foo)
		return
	}
}
复制代码

3.4 Controler核心run处理函数

  • 在main.go中同步,等待 Informer 同步完成,并发执行runWorker,处理队列内事件。
  • runWorker引出processNextWorkItem,processNextWorkItem引出syncHandler。
// Run will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
	defer utilruntime.HandleCrash()
	defer c.workqueue.ShutDown()
 
	// Start the informer factories to begin populating the informer caches
	klog.Info("Starting Foo controller")
 
	// Wait for the caches to be synced before starting workers
	klog.Info("Waiting for informer caches to sync")
	if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {
		return fmt.Errorf("failed to wait for caches to sync")
	}
 
	klog.Info("Starting workers")
	// Launch two workers to process Foo resources
	for i := 0; i < threadiness; i++ {
		go wait.Until(c.runWorker, time.Second, stopCh)
	}
 
	klog.Info("Started workers")
	<-stopCh
	klog.Info("Shutting down workers")
 
	return nil
}

// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) runWorker() {
	for c.processNextWorkItem() {
	}
}

// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (c *Controller) processNextWorkItem() bool {
	obj, shutdown := c.workqueue.Get()
 
	if shutdown {
		return false
	}
 
	// We wrap this block in a func so we can defer c.workqueue.Done.
	err := func(obj interface{}) error {
		// We call Done here so the workqueue knows we have finished
		// processing this item. We also must remember to call Forget if we
		// do not want this work item being re-queued. For example, we do
		// not call Forget if a transient error occurs, instead the item is
		// put back on the workqueue and attempted again after a back-off
		// period.
		defer c.workqueue.Done(obj)
		var key string
		var ok bool
		// We expect strings to come off the workqueue. These are of the
		// form namespace/name. We do this as the delayed nature of the
		// workqueue means the items in the informer cache may actually be
		// more up to date that when the item was initially put onto the
		// workqueue.
		if key, ok = obj.(string); !ok {
			// As the item in the workqueue is actually invalid, we call
			// Forget here else we'd go into a loop of attempting to
			// process a work item that is invalid.
			c.workqueue.Forget(obj)
			utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
			return nil
		}
		// Run the syncHandler, passing it the namespace/name string of the
		// Foo resource to be synced.
		if err := c.syncHandler(key); err != nil {
			// Put the item back on the workqueue to handle any transient errors.
			c.workqueue.AddRateLimited(key)
			return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
		}
		// Finally, if no error occurs we Forget this item so it does not
		// get queued again until another change happens.
		c.workqueue.Forget(obj)
		klog.Infof("Successfully synced '%s'", key)
		return nil
	}(obj)
 
	if err != nil {
		utilruntime.HandleError(err)
		return true
	}
 
	return true
}
复制代码

3.5 Controler核心逻辑syncHandler自定义处理函数

  • syncHandler 的处理逻辑大体如下:
  • 根据 namespace/name 获取 foo 资源
  • 根据 foo,获取其 Deployment 名称,进而获取 deployment 资源(没有就为其创建)
  • 根据 foo 的 Replicas 更新 deployment 的 Replicas(如果不匹配)
  • 更新 foo 资源的状态为最新 deployment 的状态(其实就是 AvailableReplicas)
  • 由此,可知 foo 的实现实体其实就是 deployment
// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Foo resource
// with the current status of the resource.
func (c *Controller) syncHandler(key string) error {
	// Convert the namespace/name string into a distinct namespace and name
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
		return nil
	}

	// Get the Foo resource with this namespace/name
	foo, err := c.foosLister.Foos(namespace).Get(name)
	if err != nil {
		// The Foo resource may no longer exist, in which case we stop
		// processing.
		if errors.IsNotFound(err) {
			utilruntime.HandleError(fmt.Errorf("foo '%s' in work queue no longer exists", key))
			return nil
		}

		return err
	}

	deploymentName := foo.Spec.DeploymentName
	if deploymentName == "" {
		// We choose to absorb the error here as the worker would requeue the
		// resource otherwise. Instead, the next time the resource is updated
		// the resource will be queued again.
		utilruntime.HandleError(fmt.Errorf("%s: deployment name must be specified", key))
		return nil
	}

	// Get the deployment with the name specified in Foo.spec
	deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
	// If the resource doesn't exist, we'll create it
	if errors.IsNotFound(err) {
		deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(newDeployment(foo))
	}

	// If an error occurs during Get/Create, we'll requeue the item so we can
	// attempt processing again later. This could have been caused by a
	// temporary network failure, or any other transient reason.
	if err != nil {
		return err
	}

	// If the Deployment is not controlled by this Foo resource, we should log
	// a warning to the event recorder and ret
	if !metav1.IsControlledBy(deployment, foo) {
		msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
		c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)
		return fmt.Errorf(msg)
	}

	// If this number of the replicas on the Foo resource is specified, and the
	// number does not equal the current desired replicas on the Deployment, we
	// should update the Deployment resource.
	if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
		klog.V(4).Infof("Foo %s replicas: %d, deployment replicas: %d", name, *foo.Spec.Replicas, *deployment.Spec.Replicas)
		deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(newDeployment(foo))
	}

	// If an error occurs during Update, we'll requeue the item so we can
	// attempt processing again later. THis could have been caused by a
	// temporary network failure, or any other transient reason.
	if err != nil {
		return err
	}

	// Finally, we update the status block of the Foo resource to reflect the
	// current state of the world
	err = c.updateFooStatus(foo, deployment)
	if err != nil {
		return err
	}

	c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
	return nil
}
复制代码

4 Controler核心逻辑注册

  • pkg/samplecontroller/v1alpha1/register.go(处理类型 Schema)
// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: samplecontroller.GroupName, Version: "v1alpha1"}
 
// Kind takes an unqualified kind and returns back a Group qualified GroupKind
func Kind(kind string) schema.GroupKind {
	return SchemeGroupVersion.WithKind(kind).GroupKind()
}
 
// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
	return SchemeGroupVersion.WithResource(resource).GroupResource()
}
 
var (
	SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
	AddToScheme   = SchemeBuilder.AddToScheme
)
 
// Adds the list of known types to Scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
	scheme.AddKnownTypes(SchemeGroupVersion,
		&Foo{},
		&FooList{},
	)
	metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
	return nil
}
复制代码

5 总结

整个开发遇到各种问题,比如:包结构和相对路径,GO代理,以及K8s CRD 官方提供函数等,通过反复的实践,终于掌握整套Kubernetes CRD开发模式,以此进行留记,方便查阅。

专注于大数据及容器云核心技术解密,可提供全栈的大数据+云原生平台咨询方案,请持续关注本套博客。如有任何学术交流,可随时联系。留言请关注《数据云技术社区》公众号。

转载于:https://juejin.im/post/5d57f750f265da03b94ff11a

Logo

开源、云原生的融合云平台

更多推荐