事件的终点

这里把对事件的处理单独出来分析,之前主要是分析事件从上报到最终每个watcher里面执行处理。对于事件的处理k8s并不是简单的做了发送kube-apiserver,还有一个压缩的逻辑。
先回顾一下3个注册方法:
StartRecordingToSink()
StartLogging()
StartStructuredLogging()

func (e *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
	return e.StartEventWatcher(
		func(e *v1.Event) {
			logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
		})
}

// StartStructuredLogging starts sending events received from this EventBroadcaster to the structured logging function.
// The return value can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) watch.Interface {
	return e.StartEventWatcher(
		func(e *v1.Event) {
			klog.V(verbosity).InfoS("Event occurred", "object", klog.KRef(e.InvolvedObject.Namespace, e.InvolvedObject.Name), "kind", e.InvolvedObject.Kind, "apiVersion", e.InvolvedObject.APIVersion, "type", e.Type, "reason", e.Reason, "message", e.Message)
		})
}

对于StartLogging、StartStructuredLogging方式,都是把事件信息当做日志打印了一下。这里主要看一下StartRecordingToSink方法

func (e *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
	eventCorrelator := NewEventCorrelatorWithOptions(e.options)
	return e.StartEventWatcher(
		func(event *v1.Event) {
			recordToSink(sink, event, eventCorrelator, e.sleepDuration)
		})
}

先看看入参sink
EventSink 代码位于:k8s.io\client-go@v0.19.2\tools\record\event.go

type EventSink interface {
	Create(event *v1.Event) (*v1.Event, error)
	Update(event *v1.Event) (*v1.Event, error)
	Patch(oldEvent *v1.Event, data []byte) (*v1.Event, error)
}

它的实现来自于k8s.io\client-go@v0.19.2\kubernetes\typed\core\v1\event_expansion.go的EventSinkImpl,这struct里面又包含了k8s.io\client-go@v0.19.2\kubernetes\typed\core\v1\event.go里面的EventInterface接口。EventInterface接口的实现是k8s.io\client-go@v0.19.2\kubernetes\typed\core\v1\event.go里面的events

type events struct {
	client rest.Interface    #RESTClient
	ns     string            #命名空间
}

那么显而易见,EventSink就是向kube-apiserver提交事件的创建、更新、补丁的。

NewEventCorrelatorWithOptions方法返回一个EventCorrelator对象,通过名字大概知道这个东西叫事件相关因子,他主要是用来做事件的聚合的,我们知道一个pod在运行过程中会产生很多事件,比如拉取镜像失败,pod会重试拉取镜像,那么就会产生很多相似的事件,这些事件如果不加以处理,就有可能产生过多的事件资源,对etcd造成很大的压力。
接下来看一下
recordToSink()的逻辑

func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) {
	// Make a copy before modification, because there could be multiple listeners.
	// Events are safe to copy like this.
	eventCopy := *event
	event = &eventCopy     // 复制event
	result, err := eventCorrelator.EventCorrelate(event)    // 聚合
	if err != nil {
		utilruntime.HandleError(err)
	}
	if result.Skip {  // 跳过记录此事件
		return
	}
	tries := 0  // 重试
	for {
		if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {  //记录事件,true代表成功或者忽略错误,跳出循环
			break
		}
		tries++
		if tries >= maxTriesPerEvent {    //重试12次退出
			klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
			break
		}
		// Randomize the first sleep so that various clients won't all be
		// synced up if the master goes down.
		if tries == 1 {
			time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64()))  //第一次间隔事件随机,这里不是很明白注释上面意思
		} else {
			time.Sleep(sleepDuration)  //从个第二次起间隔事件正常。
		}
	}
}

这里有两个逻辑比较关键

  1. result, err := eventCorrelator.EventCorrelate(event)
  2. recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator)
首先看看eventCorrelator.EventCorrelate
type EventCorrelator struct {
	// the function to filter the event
	filterFunc EventFilterFunc  // 事件过滤函数
	// the object that performs event aggregation
	aggregator *EventAggregator  // 关于聚合的
	// the object that observes events as they come through
	logger *eventLogger // 事件观察者,决定事件的最终形态
}

EventCorrelator 包含一个过滤事件的方法。和两个结构体EventAggregator,eventLogger

type EventAggregator struct {
	sync.RWMutex

	// The cache that manages aggregation state
	cache *lru.Cache  // 缓存

	// The function that groups events for aggregation
	keyFunc EventAggregatorKeyFunc  //获取唯一事件的key

	// The function that generates a message for an aggregate event
	messageFunc EventAggregatorMessageFunc //获取事件存入cache时使用的key

	// The maximum number of events in the specified interval before aggregation occurs
	maxEvents uint   //超过maxEvents个相似事件才有聚合的必要

	// The amount of time in seconds that must transpire since the last occurrence of a similar event before it's considered new
	maxIntervalInSeconds uint    // 有效的聚合间隔时间,超过这个事件的事件,就当做唯一事件处理,不进行聚合

	// clock is used to allow for testing over a time interval
	clock clock.Clock
}

type eventLogger struct {
	sync.RWMutex
	cache *lru.Cache //缓存
	clock clock.Clock
}

下面看一下初始化的方法:NewEventCorrelatorWithOptions()

func NewEventCorrelatorWithOptions(options CorrelatorOptions) *EventCorrelator {
	optionsWithDefaults := populateDefaults(options)
	spamFilter := NewEventSourceObjectSpamFilter(optionsWithDefaults.LRUCacheSize,
		optionsWithDefaults.BurstSize, optionsWithDefaults.QPS, optionsWithDefaults.Clock)
	return &EventCorrelator{
		filterFunc: spamFilter.Filter,
		aggregator: NewEventAggregator(
			optionsWithDefaults.LRUCacheSize,
			optionsWithDefaults.KeyFunc,
			optionsWithDefaults.MessageFunc,
			optionsWithDefaults.MaxEvents,
			optionsWithDefaults.MaxIntervalInSeconds,
			optionsWithDefaults.Clock),
		logger: newEventLogger(optionsWithDefaults.LRUCacheSize, optionsWithDefaults.Clock),
	}
}

代码整体逻辑比较简单,会给一个默认的optionsWithDefaults 来初始化EventCorrelator对象,你也可以在入参options中对默认值进行修改。
下面看看eventCorrelator.EventCorrelate的逻辑,这里主要是对事件的聚合。

// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
	if newEvent == nil {
		return nil, fmt.Errorf("event is nil")
	}
	aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)  // 聚合事件
	observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey) // 观察事件
	if c.filterFunc(observedEvent) {  // 过滤事件
		return &EventCorrelateResult{Skip: true}, nil
	}
	return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}

聚合的逻辑代码如下

func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
	now := metav1.NewTime(e.clock.Now()) // 当前时间
	var record aggregateRecord
	// eventKey is the full cache key for this event
	eventKey := getEventKey(newEvent)  //source, involvedObject, reason, message组成的key,代表唯一事件
	// aggregateKey is for the aggregate event, if one is needed.
	// keyFunc是 EventAggregator 获取一个用于表示一组事件的key
	// aggregateKey,表示一组事件的key,相对于上面eventKey少了event.InvolvedObject.FieldPath,event.Message,多了event.ReportingController,event.ReportingInstance
	// localKey = event.Message
	// 用aggregateKey判断是否为相似事件
	aggregateKey, localKey := e.keyFunc(newEvent)

	// Do we have a record of similar events in our cache?
	e.Lock()
	defer e.Unlock()
	value, found := e.cache.Get(aggregateKey)  // 寻找相似事件
	if found {
		// cache 里面存放的数据,事件的某些信息组成key,值为aggregateRecord
		record = value.(aggregateRecord)
	}
	// 到这里record要么是空的,要么就是已经存在于cache里面的一个值


	// Is the previous record too old? If so, make a fresh one. Note: if we didn't
	// find a similar record, its lastTimestamp will be the zero value, so we
	// create a new one in that case.
	maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second  // 默认10分钟
	interval := now.Time.Sub(record.lastTimestamp.Time)  // 计算时差
	if interval > maxInterval {   // 如果超过10min
	//为此key(aggregateKey)创建一个空的cache对象,抛弃了原来key对应的里面的值。这里意思就是超过10min的事件间隔就不具备聚合的意义了,重新从此事件开始计算聚合事件
		record = aggregateRecord{localKeys: sets.NewString()}  
	}

	// Write the new event into the aggregation record and put it on the cache
	record.localKeys.Insert(localKey)  // 如果有相似事件,间隔超过10min则,把相似事件对应的记录清空干掉,只保留当前这个message
	record.lastTimestamp = now   // lastTimestamp都更新为now
	e.cache.Add(aggregateKey, record)  // 更新(原来有此相似事件)或者新添加一个cache对象

	// If we are not yet over the threshold for unique events, don't correlate them
	if uint(record.localKeys.Len()) < e.maxEvents {  // 相似事件数少于10,则没有聚合必要,直接返回事件与唯一key。
		return newEvent, eventKey  // 返回事件,和相同唯一事件key
	}

	// do not grow our local key set any larger than max
	record.localKeys.PopAny()     // 相似事件超出了或者等于10,随便删除一个事件。保证下次再来一个事件会触发聚合(9 + 1 = 10),同时会减少缓存压力。

	// create a new aggregate event, and return the aggregateKey as the cache key
	// (so that it can be overwritten.)
	eventCopy := &v1.Event{   // 用于聚合,需要对事件进行修改,Count=1(后面观察事件里面需要用到,因为我们如果返回的是唯一事件的话Count是默认的数值0)
		ObjectMeta: metav1.ObjectMeta{
			Name:      fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()), // 事件名称,源于此处
			Namespace: newEvent.Namespace,
		},
		Count:          1,
		FirstTimestamp: now,
		InvolvedObject: newEvent.InvolvedObject,
		LastTimestamp:  now,
		Message:        e.messageFunc(newEvent),
		Type:           newEvent.Type,
		Reason:         newEvent.Reason,
		Source:         newEvent.Source,
	}
	return eventCopy, aggregateKey   // 返回改造的事件,aggregateKey作为聚合key
}

这里的大概意思是,来一个事件,我就在缓存里面找,看你符合聚合条件不,符合就改造成一下,交给后面去做更新,如果不符合条件,就原样返回,直接存到etcd中

这个聚合过程,我做了一个流程图
EventAggregator中的*lru.cache结构EventAggregator 中lru.cache的结构
在这里插入图片描述

事件观察c.logger.eventObserve(aggregateEvent, ckey)
EventCorrelateResult这个结构包含三部分,事件、补丁、是否跳过执行,代码如下

type EventCorrelateResult struct {
	// the event after correlation
	Event *v1.Event
	// if provided, perform a strategic patch when updating the record on the server
	Patch []byte
	// if true, do no further processing of the event
	Skip bool
}

这里展示一下eventLogger的lru.cache结构
在这里插入图片描述
key包含相同事件的表示,和相似事件的表示,eventObserve方法会根据缓存对事件的key进行匹配,然后计算出响应的事件压缩方式。

// eventObserve records an event, or updates an existing one if key is a cache hit
func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) {
	var (
		patch []byte
		err   error
	)
	eventCopy := *newEvent
	event := &eventCopy    // 复制事件

	e.Lock()
	defer e.Unlock()

	// Check if there is an existing event we should update
	lastObservation := e.lastEventObservationFromCache(key) // 从cache中获取数据,没有则返回空值

	// If we found a result, prepare a patch
	if lastObservation.count > 0 {  // 说明是相似事件,唯一事件的话count都是0
		// update the event based on the last observation so patch will work as desired
		event.Name = lastObservation.name
		event.ResourceVersion = lastObservation.resourceVersion
		event.FirstTimestamp = lastObservation.firstTimestamp
		event.Count = int32(lastObservation.count) + 1  // count + 1,压缩成一个事件,count+1

		eventCopy2 := *event
		eventCopy2.Count = 0
		eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(0, 0))
		eventCopy2.Message = ""

		newData, _ := json.Marshal(event)
		oldData, _ := json.Marshal(eventCopy2)
		patch, err = strategicpatch.CreateTwoWayMergePatch(oldData, newData, event) // 获取事件需要修改的补丁
	}

	// record our new observation
	e.cache.Add( // 记录到cache里面
		key,
		eventLog{
			count:           uint(event.Count),
			firstTimestamp:  event.FirstTimestamp,
			name:            event.Name,
			resourceVersion: event.ResourceVersion,
		},
	)
	return event, patch, err
}

recordToSink代码如下

func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) {
	// Make a copy before modification, because there could be multiple listeners.
	// Events are safe to copy like this.
	eventCopy := *event
	event = &eventCopy
	result, err := eventCorrelator.EventCorrelate(event)
	if err != nil {
		utilruntime.HandleError(err)
	}
	if result.Skip {  // 如果此事件符合过滤条件,则直接跳过
		return
	}
	tries := 0
	for {
		if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {  //记录。
			break
		}
		tries++
		if tries >= maxTriesPerEvent {
			klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
			break
		}
		// Randomize the first sleep so that various clients won't all be
		// synced up if the master goes down.
		if tries == 1 {
			time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64()))
		} else {
			time.Sleep(sleepDuration)
		}
	}
}

recordEvent代码

func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
	var newEvent *v1.Event
	var err error
	if updateExistingEvent {
		newEvent, err = sink.Patch(event, patch)
	}
	// Update can fail because the event may have been removed and it no longer exists.
	if !updateExistingEvent || (updateExistingEvent && util.IsKeyNotFoundError(err)) {
		// Making sure that ResourceVersion is empty on creation
		event.ResourceVersion = ""
		newEvent, err = sink.Create(event)
	}
	if err == nil {
		// we need to update our event correlator with the server returned state to handle name/resourceversion
		eventCorrelator.UpdateState(newEvent)
		return true
	}

	// If we can't contact the server, then hold everything while we keep trying.
	// Otherwise, something about the event is malformed and we should abandon it.
	switch err.(type) {
	case *restclient.RequestConstructionError:
		// We will construct the request the same next time, so don't keep trying.
		klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
		return true
	case *errors.StatusError:
		if errors.IsAlreadyExists(err) {
			klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
		} else {
			klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
		}
		return true
	case *errors.UnexpectedObjectError:
		// We don't expect this; it implies the server's response didn't match a
		// known pattern. Go ahead and retry.
	default:
		// This case includes actual http transport errors. Go ahead and retry.
	}
	klog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
	return false
}

这个比较简单,就是判断是否需要更新,或者创建。然后就是各种错误情况处理。
总结:到这里事件的来龙去脉基本就已经全部讲明白了,通过代码分析,也可以很容易找到自己在记录事件的时候的一些可以定制的地方,比如过滤条件、压缩策略等。

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐