关键函数

// loop receives from m.incoming and distributes to all watchers.
func (m *Broadcaster) loop() {
   // Deliberately not catching crashes here. Yes, bring down the process if there's a
   // bug in watch.Broadcaster.
   for event := range m.incoming {
      if event.Type == internalRunFunctionMarker {
         event.Object.(functionFakeRuntimeObject)()
         continue
      }
      m.distribute(event)
   }
   m.closeAll()
   m.distributing.Done()
}

生成goroutine一直运行,等待income管道有数据进入,而income是boardcast通过调用action进行操作,即action就是boardcaster进行分发消息的接口,通过action操作进行将event分发给watcher进行响应handler处理事件。
func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
   m.incoming <- Event{action, obj}
}

// distribute sends event to all watchers. Blocking.
func (m *Broadcaster) distribute(event Event) {
   m.lock.Lock()
   defer m.lock.Unlock()
   if m.fullChannelBehavior == DropIfChannelFull {
      for _, w := range m.watchers {
         select {
         case w.result <- event:
         case <-w.stopped:
         default: // Don't block if the event can't be queued.
         }
      }
   } else {
      for _, w := range m.watchers {
         select {
         case w.result <- event:
         case <-w.stopped:
         }
      }
   }
}

w.result 管道在sink的函数中调用,发现result有结果了就将event调用client进行http操作,create,put,post等

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐