k8sEVENT分发
关键函数// loop receives from m.incoming and distributes to all watchers.func (m *Broadcaster) loop() {// Deliberately not catching crashes here. Yes, bring down the process if there's a// bug...
·
关键函数
// loop receives from m.incoming and distributes to all watchers.
func (m *Broadcaster) loop() {
// Deliberately not catching crashes here. Yes, bring down the process if there's a
// bug in watch.Broadcaster.
for event := range m.incoming {
if event.Type == internalRunFunctionMarker {
event.Object.(functionFakeRuntimeObject)()
continue
}
m.distribute(event)
}
m.closeAll()
m.distributing.Done()
}
生成goroutine一直运行,等待income管道有数据进入,而income是boardcast通过调用action进行操作,即action就是boardcaster进行分发消息的接口,通过action操作进行将event分发给watcher进行响应handler处理事件。
func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
m.incoming <- Event{action, obj}
}
// distribute sends event to all watchers. Blocking.
func (m *Broadcaster) distribute(event Event) {
m.lock.Lock()
defer m.lock.Unlock()
if m.fullChannelBehavior == DropIfChannelFull {
for _, w := range m.watchers {
select {
case w.result <- event:
case <-w.stopped:
default: // Don't block if the event can't be queued.
}
}
} else {
for _, w := range m.watchers {
select {
case w.result <- event:
case <-w.stopped:
}
}
}
}
w.result 管道在sink的函数中调用,发现result有结果了就将event调用client进行http操作,create,put,post等
更多推荐



所有评论(0)