【kubernetes/k8s源码分析】kubernetes event源码分析
描述 使用方式eventBroadcaster := record.NewBroadcaster()eventBroadcaster.StartLogging(glog.Infof)eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1(...
描述
使用方式
eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"})
源码实现路径:client-go/tools/record/event.go
Event结构体
// Event is a report of an event somewhere in the cluster.
// TODO: Decide whether to store these separately or with the object they apply to.
type Event struct {
metav1.TypeMeta
// +optional
metav1.ObjectMeta
// Required. The object that this event is about. Mapped to events.Event.regarding
// +optional
InvolvedObject ObjectReference
// Optional; this should be a short, machine understandable string that gives the reason
// for this event being generated. For example, if the event is reporting that a container
// can't start, the Reason might be "ImageNotFound".
// TODO: provide exact specification for format.
// +optional
Reason string
// Optional. A human-readable description of the status of this operation.
// TODO: decide on maximum length. Mapped to events.Event.note
// +optional
Message string
// Optional. The component reporting this event. Should be a short machine understandable string.
// +optional
Source EventSource
// The time at which the event was first recorded. (Time of server receipt is in TypeMeta.)
// +optional
FirstTimestamp metav1.Time
// The time at which the most recent occurrence of this event was recorded.
// +optional
LastTimestamp metav1.Time
// The number of times this event has occurred.
// +optional
Count int32
// Type of this event (Normal, Warning), new types could be added in the future.
// +optional
Type string
// Time when this Event was first observed.
// +optional
EventTime metav1.MicroTime
// Data about the Event series this event represents or nil if it's a singleton Event.
// +optional
Series *EventSeries
// What action was taken/failed regarding to the Regarding object.
// +optional
Action string
// Optional secondary object for more complex actions.
// +optional
Related *ObjectReference
// Name of the controller that emitted this Event, e.g. `kubernetes.io/kubelet`.
// +optional
ReportingController string
// ID of the controller instance, e.g. `kubelet-xyzf`.
// +optional
ReportingInstance string
}
EventRecorder接口
记录事件用的,Eventf 封装了类似 Printf 打印机制,也会调用 Event,而 PastEventf 允许传入自定义的时间戳,可以设置事件产生的时间
// EventRecorder knows how to record events on behalf of an EventSource.
type EventRecorder interface {
// Event constructs an event from the given information and puts it in the queue for sending.
// 'object' is the object this event is about. Event will make a reference-- or you may also
// pass a reference to the object directly.
// 'type' of this event, and can be one of Normal, Warning. New types could be added in future
// 'reason' is the reason this event is generated. 'reason' should be short and unique; it
// should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
// to automate handling of events, so imagine people writing switch statements to handle them.
// You want to make that easy.
// 'message' is intended to be human readable.
//
// The resulting event will be created in the same namespace as the reference object.
Event(object runtime.Object, eventtype, reason, message string)
// Eventf is just like Event, but with Sprintf for the message field.
Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
// PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field.
PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{})
// AnnotatedEventf is just like eventf, but with annotations attached
AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}
EventBroadcaster接口
事件广播器,StartLogging (记录到日志)和 StartRecordingToSink(发送到apiserver) 两个不同的事件处理函数
StartEventWatcher,在后台启动一个 goroutine,不断从 EventBroadcaster 提供的管道中接收事件,然后调用 eventHandler 处理函数对事件进行处理
NewRecorder 新建一个 EventRecoder ,它会把事件发送给 EventBroadcaster
// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
type EventBroadcaster interface {
// StartEventWatcher starts sending events received from this EventBroadcaster to the given
// event handler function. The return value can be ignored or used to stop recording, if
// desired.
StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface
// StartRecordingToSink starts sending events received from this EventBroadcaster to the given
// sink. The return value can be ignored or used to stop recording, if desired.
StartRecordingToSink(sink EventSink) watch.Interface
// StartLogging starts sending events received from this EventBroadcaster to the given logging
// function. The return value can be ignored or used to stop recording, if desired.
StartLogging(logf func(format string, args ...interface{})) watch.Interface
// NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
// with the event source set to the given event source.
NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder
}
1. NewBroadcaster函数
路径: client-go/tools/record/event.go
创建一个event broadcaster
// Creates a new event broadcaster.
func NewBroadcaster() EventBroadcaster {
return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration}
}
1.1 NewBroadcaster函数
路径 apimachinery/pkg/watch/mux.go
最大队列queueLength为1000,如果队列满的情况drop的行为,主要逻辑在m.loop
// NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue per watcher.
// It is guaranteed that events will be distributed in the order in which they occur,
// but the order in which a single event is distributed among all of the watchers is unspecified.
func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
m := &Broadcaster{
watchers: map[int64]*broadcasterWatcher{},
incoming: make(chan Event, incomingQueueLength),
watchQueueLength: queueLength,
fullChannelBehavior: fullChannelBehavior,
}
m.distributing.Add(1)
go m.loop()
return m
}
1.2 loop函数
对于incoming队列取出数据,调用distribute函数分发给各个watchers
// loop receives from m.incoming and distributes to all watchers.
func (m *Broadcaster) loop() {
// Deliberately not catching crashes here. Yes, bring down the process if there's a
// bug in watch.Broadcaster.
for event := range m.incoming {
if event.Type == internalRunFunctionMarker {
event.Object.(functionFakeRuntimeObject)()
continue
}
m.distribute(event)
}
m.closeAll()
m.distributing.Done()
}
1.3 distribute函数
遍历各个watchers,将event分发,如果channel满了直接pass过,绝不阻塞
// distribute sends event to all watchers. Blocking.
func (m *Broadcaster) distribute(event Event) {
m.lock.Lock()
defer m.lock.Unlock()
if m.fullChannelBehavior == DropIfChannelFull {
for _, w := range m.watchers {
select {
case w.result <- event:
case <-w.stopped:
default: // Don't block if the event can't be queued.
}
}
} else {
for _, w := range m.watchers {
select {
case w.result <- event:
case <-w.stopped:
}
}
}
}
2. StartLogging函数
启动一个goroutine,循环监听channel,根据传入的参数就是写入log操作
// StartLogging starts sending events received from this EventBroadcaster to the given logging function.
// The return value can be ignored or used to stop recording, if desired.
func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
return eventBroadcaster.StartEventWatcher(
func(e *v1.Event) {
logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
})
}
// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value can be ignored or used to stop recording, if desired.
func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
watcher := eventBroadcaster.Watch()
go func() {
defer utilruntime.HandleCrash()
for watchEvent := range watcher.ResultChan() {
event, ok := watchEvent.Object.(*v1.Event)
if !ok {
// This is all local, so there's no reason this should
// ever happen.
continue
}
eventHandler(event)
}
}()
return watcher
}
3. StartRecordingToSink函数
把event结构化,进行落叶归根
参数sink是个EventSink接口,根据调用者传参,
&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}
EventSinkImpl实现了EventSink接口,而且使用的是event API调用
// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
// The return value can be ignored or used to stop recording, if desired.
// TODO: make me an object with parameterizable queue length and retry interval
func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
// The default math/rand package functions aren't thread safe, so create a
// new Rand object for each StartRecording call.
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
eventCorrelator := NewEventCorrelator(clock.RealClock{})
return eventBroadcaster.StartEventWatcher(
func(event *v1.Event) {
recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration)
})
}
3.1 recordToSink函数
调用recordEvent在调用event API接口
func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand, sleepDuration time.Duration) {
// Make a copy before modification, because there could be multiple listeners.
// Events are safe to copy like this.
eventCopy := *event
event = &eventCopy
result, err := eventCorrelator.EventCorrelate(event)
if err != nil {
utilruntime.HandleError(err)
}
if result.Skip {
return
}
tries := 0
for {
if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
break
}
tries++
if tries >= maxTriesPerEvent {
glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
break
}
// Randomize the first sleep so that various clients won't all be
// synced up if the master goes down.
if tries == 1 {
time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))
} else {
time.Sleep(sleepDuration)
}
}
}
eventBroadcaster实现NewRecorder函数
recorderImpl实现了EventRecorder接口
// NewRecorder returns an EventRecorder that records events with the given event source.
func (eventBroadcaster *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder {
return &recorderImpl{scheme, source, eventBroadcaster.Broadcaster, clock.RealClock{}}
}
type recorderImpl struct {
scheme *runtime.Scheme
source v1.EventSource
*watch.Broadcaster
clock clock.Clock
}
4. Event函数
这一些接口基本调用的是generateEvent函数
func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
recorder.generateEvent(object, nil, metav1.Now(), eventtype, reason, message)
}
4.1 generateEvent函数
- GetReference获得object信息
- validateEventType符合的只有Normal和warning
- 调用makeEvent生成一个event,名字是对象的名字.时间戳string值
- 异步的将event丢入队列,防止阻塞
func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {
ref, err := ref.GetReference(recorder.scheme, object)
if err != nil {
glog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
return
}
if !validateEventType(eventtype) {
glog.Errorf("Unsupported event type: '%v'", eventtype)
return
}
event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
event.Source = recorder.source
go func() {
// NOTE: events should be a non-blocking operation
defer utilruntime.HandleCrash()
recorder.Action(watch.Added, event)
}()
}
丢入incoming队列,消费者为Broadcaster,直接进入1.2章节消费处理
// Action distributes the given event among all watchers.
func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
m.incoming <- Event{action, obj}
}
更多推荐
所有评论(0)