Go之Operator(一)
学习链接operator实战k8s开发人应该掌握的技能k8s基础api的规范和基本概念,以及一些常用的基础工具和库k8s子库api,apimachinery,client-go等基础库的熟悉k8s扩展api相关的开发框架聊聊k8s的API模型的定义对应API模型的操作资源模型的分类关于Labels,Selectors和Annotations的简单介绍关于controller的简单介绍1. 资源模型
学习链接operator实战
k8s开发人应该掌握的技能
- k8s基础api的规范和基本概念,以及一些常用的基础工具和库
- k8s子库api,apimachinery,client-go等基础库的熟悉
- k8s扩展api相关的开发框架
- 聊聊k8s的API模型的定义
- 对应API模型的操作
- 资源模型的分类
- 关于Labels,Selectors和Annotations的简单介绍
- 关于controller的简单介绍
1. 资源模型组成
scheme(TypeMeta) 包含group version kind , group通常是域名
metadata(ObjectMeta) scheme中的具体实例特有的信息
spec 定义期望状态以及基础信息
status 时时的状态(通常是被各种controller更新)
scheme (Group, Version, Kind)
Kind 资源类型 例如: pod ,service等
Version 资源版本 例如: v1 ,v1beta1等
Group 资源组名 类似golang中的pkg的作用. 起到消除同名资源的作用,方便归类资源
关于版本的注意点
Alpha 可能会通过更改字段名称,默认值或行为来破坏向后兼容性。后续的版本更新中可能不在支持
Beta 基本趋于稳定的版本,可能缺少文档版本的功能
GA 稳定版本,可以用宇生产
2. 对应资源模型的相关操作
Create
Update
Patch
Delete
Get
List
Watch
Deletecollection
Create,Update,Patch和Delete可用于修改对象。Update用提供的内容替换对象,Patch选择性地更新字段。
getet,list和watch可用于按名称获取特定资源,列出与标签匹配的所有资源,或持续监视更新。
注意点:
- 在调用更新接口的时候, 一定要先查询对应的资源模型, 然后修改查询得到的资源模型数据, 然后通过改数据去更新. 如果不这样,会导致原来数据很多属性丢失.
- Watch Api在调用时时间长了会超时,需要手动的去重新链接(如果代码中手动的调用watch,应该做超时重连机制)
3. 空间资源和集群资源
大部分资源模型是namespaced,比如: pod,service等等, 也有非空间资源,属于集群资源的, 比如pv,node等
4. 关于 Labels, Selectors and Annotations
Labels和nnotations只存在ObjectMeta
Labels 可以查询以查找匹配对象的键值对。用于在Kubernetes集群中将对象连接在一起。例如,服务使用标签来确定将流量定向到哪些Pod,而Deployments使用标签(以及OwnersReferences)来标识他们创建的Pod。
Annotations 允许将任意数据写入可能不适合资源模式的资源,但最终用户或工具可能需要这些资源,注释可用于在资源上定义新的扩展字段,而无需修改对象的模式。这允许用户为现有的核心Kubernetes资源定义自己的私有模式扩展。
Selectors 通常是有关联关系的资源用来筛选匹配资源的作用, 比如service通过selector选择符合selector标签的一组pod来实现路由转发.
5. 关于controller的简单介绍
k8s的controller实际上是一个调和进程, 它通常watch某个对对象的全局的期望描述状态和实际的状态,然后通过调用apiserver的接口更新对应的实际状态直到与描述状态期望的结果一致.
通常是一个周期的姓的循环. 像如下一样.
describe := get describe status
current := get current status
通过当前状态 更新状态 最终达到和describe状态一致
一个简单的控制器大概如下:
type Controller struct {
// pods gives cached access to pods.
//pod lister 周期性的list数据把数据传输过来会用到这个
pods informers.PodLister
//判断lister是不是从api中同步数据
podsSynced cache.InformerSynced
// queue is where incoming work is placed to de-dup and to allow "easy"
// rate limited requeues on errors
queue workqueue.RateLimitingInterface
//把lister过来的数据放在queue中
}
func NewController(pods informers.PodInformer) *Controller {
c := &Controller{
pods: pods.Lister(),
podsSynced: pods.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "controller-name"),
}
// register event handlers to fill the queue with pod creations, updates and deletions
//会watch k8s 如果有资源 他会拿过来然后放在queue中
pods.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
c.queue.Add(key)
}
},
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
c.queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta nodeQueue, therefore for deletes we have to use this
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
c.queue.Add(key)
}
},
},)
return c
}
//会把这个run起来 去对应的queue中拿到数据 然后做对应的逻辑处理
func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
// don't let panics crash the process
defer utilruntime.HandleCrash()
// make sure the work queue is shutdown which will trigger workers to end
defer c.queue.ShutDown()
glog.Infof("Starting <NAME> controller")
// wait for your secondary caches to fill before starting your work
if !cache.WaitForCacheSync(stopCh, c.podsSynced) {
return
}
// start up your worker threads based on threadiness. Some controllers
// have multiple kinds of workers
for i := 0; i < threadiness; i++ {
// runWorker will loop until "something bad" happens. The .Until will
// then rekick the worker after one second
go wait.Until(c.runWorker, time.Second, stopCh)
}
// wait until we're told to stop
<-stopCh
glog.Infof("Shutting down <NAME> controller")
}
func (c *Controller) runWorker() {
// hot loop until we're told to stop. processNextWorkItem will
// automatically wait until there's work available, so we don't worry
// about secondary waits
for c.processNextWorkItem() {
}
}
// processNextWorkItem deals with one key off the queue. It returns false
// when it's time to quit.
func (c *Controller) processNextWorkItem() bool {
// pull the next work item from queue. It should be a key we use to lookup
// something in a cache
key, quit := c.queue.Get()
if quit {
return false
}
// you always have to indicate to the queue that you've completed a piece of
// work
defer c.queue.Done(key)
// do your work on the key. This method will contains your "do stuff" logic
err := c.syncHandler(key.(string))
if err == nil {
// if you had no error, tell the queue to stop tracking history for your
// key. This will reset things like failure counts for per-item rate
// limiting
c.queue.Forget(key)
return true
}
// there was a failure so be sure to report it. This method allows for
// pluggable error handling which can be used for things like
// cluster-monitoring
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
// since we failed, we should requeue the item to work on later. This
// method will add a backoff to avoid hotlooping on particular items
// (they're probably still not going to work right away) and overall
// controller protection (everything I've done is broken, this controller
// needs to calm down or it can starve other useful work) cases.
c.queue.AddRateLimited(key)
return true
}
更多推荐
所有评论(0)