学习链接operator实战

k8s开发人应该掌握的技能

  • k8s基础api的规范和基本概念,以及一些常用的基础工具和库
  • k8s子库api,apimachinery,client-go等基础库的熟悉
  • k8s扩展api相关的开发框架
  1. 聊聊k8s的API模型的定义
  2. 对应API模型的操作
  3. 资源模型的分类
  4. 关于Labels,Selectors和Annotations的简单介绍
  5. 关于controller的简单介绍

1. 资源模型组成

scheme(TypeMeta) 包含group version kind , group通常是域名
metadata(ObjectMeta) scheme中的具体实例特有的信息
spec 定义期望状态以及基础信息
status 时时的状态(通常是被各种controller更新)

scheme (Group, Version, Kind)

Kind 资源类型 例如: pod ,service等
Version 资源版本 例如: v1 ,v1beta1等
Group 资源组名 类似golang中的pkg的作用. 起到消除同名资源的作用,方便归类资源

关于版本的注意点
Alpha 可能会通过更改字段名称,默认值或行为来破坏向后兼容性。后续的版本更新中可能不在支持
Beta 基本趋于稳定的版本,可能缺少文档版本的功能
GA 稳定版本,可以用宇生产

2. 对应资源模型的相关操作

Create
Update
Patch
Delete
Get
List
Watch
Deletecollection

Create,Update,Patch和Delete可用于修改对象。Update用提供的内容替换对象,Patch选择性地更新字段。

getet,list和watch可用于按名称获取特定资源,列出与标签匹配的所有资源,或持续监视更新。

注意点:

  1. 在调用更新接口的时候, 一定要先查询对应的资源模型, 然后修改查询得到的资源模型数据, 然后通过改数据去更新. 如果不这样,会导致原来数据很多属性丢失.
  2. Watch Api在调用时时间长了会超时,需要手动的去重新链接(如果代码中手动的调用watch,应该做超时重连机制)

3. 空间资源和集群资源
大部分资源模型是namespaced,比如: pod,service等等, 也有非空间资源,属于集群资源的, 比如pv,node等

4. 关于 Labels, Selectors and Annotations

Labels和nnotations只存在ObjectMeta

Labels 可以查询以查找匹配对象的键值对。用于在Kubernetes集群中将对象连接在一起。例如,服务使用标签来确定将流量定向到哪些Pod,而Deployments使用标签(以及OwnersReferences)来标识他们创建的Pod。
Annotations 允许将任意数据写入可能不适合资源模式的资源,但最终用户或工具可能需要这些资源,注释可用于在资源上定义新的扩展字段,而无需修改对象的模式。这允许用户为现有的核心Kubernetes资源定义自己的私有模式扩展。
Selectors 通常是有关联关系的资源用来筛选匹配资源的作用, 比如service通过selector选择符合selector标签的一组pod来实现路由转发.

5. 关于controller的简单介绍

k8s的controller实际上是一个调和进程, 它通常watch某个对对象的全局的期望描述状态和实际的状态,然后通过调用apiserver的接口更新对应的实际状态直到与描述状态期望的结果一致.
通常是一个周期的姓的循环. 像如下一样.

describe := get describe status
current := get current status
通过当前状态 更新状态 最终达到和describe状态一致

一个简单的控制器大概如下:

type Controller struct {
	// pods gives cached access to pods.
	//pod lister 周期性的list数据把数据传输过来会用到这个
	pods informers.PodLister
	//判断lister是不是从api中同步数据
	podsSynced cache.InformerSynced

	// queue is where incoming work is placed to de-dup and to allow "easy"
	// rate limited requeues on errors
	queue workqueue.RateLimitingInterface
	//把lister过来的数据放在queue中
}

func NewController(pods informers.PodInformer) *Controller {
	c := &Controller{
		pods: pods.Lister(),
		podsSynced: pods.Informer().HasSynced,
		queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "controller-name"),
	}
	
	// register event handlers to fill the queue with pod creations, updates and deletions
	//会watch k8s 如果有资源 他会拿过来然后放在queue中
	pods.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(obj)
			if err == nil {
				c.queue.Add(key)
			}
		},
		UpdateFunc: func(old interface{}, new interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(new)
			if err == nil {
				c.queue.Add(key)
			}
		},
		DeleteFunc: func(obj interface{}) {
			// IndexerInformer uses a delta nodeQueue, therefore for deletes we have to use this
			// key function.
			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
			if err == nil {
				c.queue.Add(key)
			}
		},
	},)
	
	return c
}
//会把这个run起来 去对应的queue中拿到数据 然后做对应的逻辑处理
func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
	// don't let panics crash the process
	defer utilruntime.HandleCrash()
	// make sure the work queue is shutdown which will trigger workers to end
	defer c.queue.ShutDown()

	glog.Infof("Starting <NAME> controller")

	// wait for your secondary caches to fill before starting your work
	if !cache.WaitForCacheSync(stopCh, c.podsSynced) {
		return
	}

	// start up your worker threads based on threadiness.  Some controllers
	// have multiple kinds of workers
	for i := 0; i < threadiness; i++ {
		// runWorker will loop until "something bad" happens.  The .Until will
		// then rekick the worker after one second
		go wait.Until(c.runWorker, time.Second, stopCh)
	}

	// wait until we're told to stop
	<-stopCh
	glog.Infof("Shutting down <NAME> controller")
}

func (c *Controller) runWorker() {
	// hot loop until we're told to stop.  processNextWorkItem will
	// automatically wait until there's work available, so we don't worry
	// about secondary waits
	for c.processNextWorkItem() {
	}
}

// processNextWorkItem deals with one key off the queue.  It returns false
// when it's time to quit.
func (c *Controller) processNextWorkItem() bool {
	// pull the next work item from queue.  It should be a key we use to lookup
	// something in a cache
	key, quit := c.queue.Get()
	if quit {
		return false
	}
	// you always have to indicate to the queue that you've completed a piece of
	// work
	defer c.queue.Done(key)

	// do your work on the key.  This method will contains your "do stuff" logic
	err := c.syncHandler(key.(string))
	if err == nil {
		// if you had no error, tell the queue to stop tracking history for your
		// key. This will reset things like failure counts for per-item rate
		// limiting
		c.queue.Forget(key)
		return true
	}

	// there was a failure so be sure to report it.  This method allows for
	// pluggable error handling which can be used for things like
	// cluster-monitoring
	utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))

	// since we failed, we should requeue the item to work on later.  This
	// method will add a backoff to avoid hotlooping on particular items
	// (they're probably still not going to work right away) and overall
	// controller protection (everything I've done is broken, this controller
	// needs to calm down or it can starve other useful work) cases.
	c.queue.AddRateLimited(key)

	return true
}
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐