目录

Informer 机制

Informer 的实现机制

Informer 机制架构设计

开发背景 

开发流程

 代码结构

部署测试流程


Informer 机制

Kubernetes 中使用 http 进行通信,如何不依赖中间件的情况下保证消息的实时性,可靠性和顺序性等呢?答案就是利用了 Informer 机制。Informer 的机制,降低了 Kubernetes 各个组件跟 Etcd 与 Kubernetes API Server 的通信压力。

Informer 的实现机制

  1. 通过一种叫作 ListAndWatch 的方法,把 APIServer 中的 API 对象缓存在了本地,并负责更新和维护这个缓存。ListAndWatch通过 APIServer 的 LIST API“获取”所有最新版本的 API 对象;然后,再通过 WATCH API 来“监听”所有这些 API 对象的变化;
  2. 注册相应的事件,之后如果监听到的事件变化就会调用事件对应的EventHandler,实现回调

Informer 机制架构设计

简单介绍一下其中几个组件及其作用: 

  • Reflector:用于监控指定的k8s资源,当资源发生变化时,触发相应的变更事件,如Added事件、Updated事件、Deleted事件,并将器资源对象放到本地DeltaFIFO Queue中;
  • DeltaFIFO:DeltaFIFO是一个先进先出的队列,可以保存资源对象的操作类型;
  • Indexer:用来存储资源对象并自带索引功能的本地存储,Reflector从DeltaFIFO中将消费出来的资源对象存储至Indexer

开发背景 

由于 K8S 内置了许多的 Controller 来对各种各样的资源进行 List & Watch,因此也会产生各种不同的事件(Event),其中部分事件是需要我们作为告警来处理的,比如 ReadinessProbe Failed 这种事件,我们需要在其到达失败阈值之前获得通知并做及时处理。

然而事件诞生的机制是随机性的,定时巡检一是有滞后性,二是无疑会增加 api server 的压力。因此,我们同样可以利用 Informer 中的回调机制,当监听的资源对象产生变化时,触发我们预设好的 Handler 函数,然后把消息推送出来

开发流程

 代码结构

同样是先贴出代码结构

.
├── dockerfile
├── go.mod
├── go.sum
├── lib
│   └── client.go //用于生成K8S rest client
├── models
│   └── webHookSink.go //从启动参数中获取构造请求需要的元素
└── sink
│   └── webhook.go //请求的构造以及模版的渲染
├── handlers //各类处理K8S资源对象的handler
│   ├── cmhandler.go
│   └── eventHandler.go
├── main.go //构造informer

讲一下大致流程:

首先main中构建informer,指定监听的资源对象(Events),并加入指定的 Handler。

func main() {
	flag.Var(&sink.ArgSinks, "sink", "external sink(s) that receive events")

	client := lib.K8sRestClientInPod()

	factory := informers.NewSharedInformerFactory(client, 0)
	//eventsInformer := factory.Events().V1().Events()
	eventGVR := schema.GroupVersionResource{
		Group:    "",
		Version:  "v1",
		Resource: "events",
	}
	eventsInformer, _ := factory.ForResource(eventGVR)
	eventsInformer.Informer().AddEventHandler(&handlers.EventHandler{})
	factory.Start(wait.NeverStop)
	go startHTTPServer()
	select {}
}

这里的Handler是一个需要实现OnAdd,OnUpdate,OnDelete函数的接口,分别对应了资源的添加、更新、删除。我们可以在 OnAdd() 中实现我们想要的逻辑,比如处理什么事件,不处理什么事件,来抑制某些事件的告警。

我们通过flag.Var去接启动项参数,这里我们需要自己实现一个结构体,包括了 Set() 和 String() 方法,其中 String() 方法会对我们传入的启动项参数(本质是一串url字符串)进行解构和提取,获得我们构造推送到机器人所需要的请求信息。

type WebHookSink struct {
	uri          *url.URL
	HeaderMap    map[string]string
	Endpoint     string
	Method       string
	BodyTemplate string
}

// NewWebHookSink 构造函数
func NewWebHookSink() WebHookSink {
	return WebHookSink{
		HeaderMap:    map[string]string{"Content-Type": "application/json"},
		Method:       http.MethodPost,
		BodyTemplate: bodyTemplate,
		Endpoint:     "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=6400ebf4-4a4a-4b84-8667-933495f48c2f",
	}
}

func (w WebHookSink) String() string {
	return ""
}

func (w WebHookSink) Set(s string) error {
	e := os.ExpandEnv(s)
	uri, err := url.Parse(e)
	if err != nil {
		return err
	} else {
		w.uri = uri
	}

	if len(uri.Host) > 0 {
		w.Endpoint = uri.String()
	} else {
		klog.Errorf("uri host's length is 0 and pls check your uri: %v", uri)
	}

	opts := uri.Query()

	if len(opts["method"]) >= 1 {
		w.Method = opts["method"][0]
	}

	// set header of webHook
	w.HeaderMap = parseHeaders(opts["header"])

	return nil
}

当触发了 OnAdd Handler ,就会根据已经获取到的元素来构造请求并发送。

// Send send msg to generic webHook
func Send(event *v1.Event) (err error) {

	body, err := RenderBodyTemplate(event)
	fmt.Println(body)
	if err != nil {
		klog.Errorf("Failed to RenderBodyTemplate,because of %v", err)
		return err
	}

	bodyBuffer := bytes.NewBuffer([]byte(body))
	req, err := http.NewRequest(ArgSinks.Method, ArgSinks.Endpoint, bodyBuffer)

	// append header to http request
	if ArgSinks.HeaderMap != nil && len(ArgSinks.HeaderMap) != 0 {
		for k, v := range ArgSinks.HeaderMap {
			req.Header.Set(k, v)
		}
	}

	if err != nil {
		klog.Errorf("Failed to create request,because of %v", err)
		return err
	}

	resp, err := http.DefaultClient.Do(req)
	klog.Info(resp)
	if err != nil {
		klog.Errorf("Failed to send event to sink,because of %v", err)
		return err
	}
	defer resp.Body.Close()

	if resp != nil && resp.StatusCode != http.StatusOK {
		body, err := ioutil.ReadAll(resp.Body)
		if err != nil {
			return err
		}
		err = fmt.Errorf("failed to send msg to sink, because the response code is %d, body is : %v", resp.StatusCode, string(body))
		klog.Errorln(err)
		return err
	}
	return nil
}

其中,因为钉/企微的 Webhook 机器人有固定的消息模版用于展示,我们需要在预先定义好的模版中填充关于事件的一些信息字段,这些字段可以表示成事件对象(结构体)的成员变量名。

func RenderBodyTemplate(event *v1.Event) (body string, err error) {
	var tpl bytes.Buffer
	tp, err := template.New("body").Parse(ArgSinks.BodyTemplate)
	if err != nil {
		klog.Errorf("Failed to parse template,because of %v", err)
		return "", err
	}
	// https://github.com/AliyunContainerService/kube-eventer/issues/165
	event.Message = strings.Replace(event.Message, `"`, ``, -1)
	if err := tp.Execute(&tpl, event); err != nil {
		klog.Errorf("Failed to renderTemplate,because of %v", err)
		return "", err
	}
	return tpl.String(), nil
}

 具体的表示方法,可以在template包中可以看到:

至此,编码阶段就完成了。

部署测试流程

因为其不对外对内提供服务,所以只需要部署deployment就行了;

又由于其是在体内访问api server,所以需要为其配置对应的权限:

每个pod在创建时默认会挂载所处命名空间下的 default ServiceAccount 中的 ca.crt 和 token 文件到容器内的 /var/run/secrets/kubernetes.io/serviceaccount 路径下面,我们需要为其创建 ServiceAccount,CluserRole 以及 ClusterRolebinding,并在 deployment 中指定 serviceaccount

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    name: event-informer
  name: event-informer
  namespace: ops
spec:
  replicas: 1
  selector:
    matchLabels:
      app: event-informer
  template:
    metadata:
      labels:
        app: event-informer
      annotations:
        scheduler.alpha.kubernetes.io/critical-pod: ''
    spec:
      dnsPolicy: ClusterFirstWithHostNet
      serviceAccount: event-informer
      containers:
        - image: event-informer
          name: event-informer
          command:
            - "/app/httpserver"
            - --sink=https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxx
          env:
          # If TZ is assigned, set the TZ value as the time zone
          - name: TZ
            value: "Asia/Shanghai"
          volumeMounts:
            - name: localtime
              mountPath: /etc/localtime
              readOnly: true
            - name: zoneinfo
              mountPath: /usr/share/zoneinfo
              readOnly: true
          resources:
            requests:
              cpu: 100m
              memory: 100Mi
            limits:
              cpu: 500m
              memory: 250Mi
      volumes:
        - name: localtime
          hostPath:
            path: /etc/localtime
        - name: zoneinfo
          hostPath:
            path: /usr/share/zoneinfo
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: event-informer
rules:
  - apiGroups:
      - ""
    resources:
      - events
    verbs:
      - get
      - list
      - watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: event-informer
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: event-informer
subjects:
  - kind: ServiceAccount
    name: event-informer
    namespace: ops
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: event-informer
  namespace: ops

当pod首次运行时,已经存在的 Events 会各自触发一次 OnAddHandler,我们也可以在钉/企微上观察到对应的事件告警消息:

Kubernetes二次开发系列文章:

Kubernetes 开发【1】——webhook 实现 API Server 请求拦截和修改

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐