edgecontroller源码阅读

边缘控制器概述

EdgeController是Kubernetes Api服务器和Edgecore之间的桥梁

边缘控制器执行的操作

以下是Edge控制器执行的功能:

–下游控制器:将添加/更新/删除事件从K8s Api服务器同步到Edgecore

-上游控制器:将监视和更新资源和事件(节点,pod和configmap)的状态同步到K8s-Api服务器,还订阅来自edgecore的消息-控制器管理器:创建管理器接口,该接口实现用于管理ConfigmapManager,LocationCache和podManager的事件

下游控制器:

将添加/更新/删除事件同步到边缘
  • 下游控制器:监视K8S-Api服务器并通过cloudHub将更新发送到Edgecore
  • 通过cloudHub将(pod,configmap,secret)添加/更新/删除事件同步到边缘
  • 通过调用管理器界面创建相应的管理器(pod,configmap,secret)以处理事件
  • 找到configmap和secret应该发送到哪个节点

在这里插入图片描述

上游控制器:

同步监视并更新资源和事件的状态
  • UpstreamController接收来自edgecore的消息并将更新同步到K8S-Api服务器
  • 创建停止通道以调度和停止事件以处理Pod,configMap,node和secret
  • 创建消息通道以更新Nodestatus,Podstatus,Secret和configmap相关事件
  • 获取Podcondition信息,例如Ready,Initialized,Podscheduled和Unscheduled详细信息
  • 以下是PodCondition的信息
    • Ready:PodReady表示该Pod能够处理请求,应将其添加到所有匹配服务的负载平衡池中
    • PodScheduled:表示此Pod的调度过程的状态
    • Unschedulable:这意味着现在无法调度安排pod ,可能是由于集群中的资源不足
    • Initialized:这意味着容器中的所有Init容器已成功启动
    • ContainersReady:指示容器中的所有容器是否已准备就绪
  • 以下是PodStatus的信息
    • PodPhase:容器的当前状态
    • Conditions:详细说明pod为何处于这种状态
    • HostIP:分配了pod的主机的IP地址
    • PodIp:分配给Pod的IP地址
    • QosClass:根据资源需求分配给Pod

在这里插入图片描述

控制器管理员:

创建管理器接口并实现ConfigmapManager,LocationCache和podManager
  • Manager定义管理器的接口,ConfigManager,Podmanager,secretmanager实现
  • 管理OnAdd,OnUpdate和OnDelete事件,这些事件将从K8s-Api服务器更新到相应的边缘节点
  • 创建一个eventManager(configMaps,pod,secrets),它将为每个事件启动CommonResourceEventHandler,NewListWatch和newShared Informer,以通过cloudHub将事件(pod,configmap,secret)同步(添加/更新/删除)到edgecore
  • 以下是控制器管理器创建的处理程序列表
    • CommonResourceEventHandler:NewcommonResourceEventHandler创建用于Configmap和Pod Manager的CommonResourceEventHandler
    • NewListWatch:从指定的客户端资源名称空间和字段选择器创建新的ListWatch
    • NewSharedInformer:为Listwatcher创建一个新实例

1、入口函数

kubeedge\cloud\cmd\cloudcore\app\server.go

// registerModules register all the modules started in cloudcore
func registerModules(c *v1alpha1.CloudCoreConfig) {
   cloudhub.Register(c.Modules.CloudHub, c.KubeAPIConfig)
   edgecontroller.Register(c.Modules.EdgeController, c.KubeAPIConfig, "", false)
   devicecontroller.Register(c.Modules.DeviceController, c.KubeAPIConfig)
   synccontroller.Register(c.Modules.SyncController, c.KubeAPIConfig)
   cloudstream.Register(c.Modules.CloudStream)
}

2、结构定义、初始化

// EdgeController use beehive context message layer
type EdgeController struct {
   enable bool
}

func newEdgeController(enable bool) *EdgeController {
   return &EdgeController{
      enable: enable,
   }
}

3、启动方法

// Start controller
func (ec *EdgeController) Start() {
   upstream, err := controller.NewUpstreamController()
   if err != nil {
      klog.Errorf("new upstream controller failed with error: %s", err)
      os.Exit(1)
   }

   if err := upstream.Start(); err != nil {
      klog.Fatalf("start upstream failed with error: %s", err)
   }

   downstream, err := controller.NewDownstreamController()
   if err != nil {
      klog.Fatalf("new downstream controller failed with error: %s", err)
   }

   if err := downstream.Start(); err != nil {
      klog.Fatalf("start downstream failed with error: %s", err)
   }
}

4、上游控制器:接收来自edgecore的消息并将更新同步到K8S-Api服务器

// NewUpstreamController create UpstreamController from config
func NewUpstreamController() (*UpstreamController, error) {
   cli, err := utils.KubeClient()
   if err != nil {
      klog.Warningf("create kube client failed with error: %s", err)
      return nil, err
   }
   uc := &UpstreamController{
      kubeClient:   cli,
      messageLayer: messagelayer.NewContextMessageLayer(),
   }
   return uc, nil
}
// NewContextMessageLayer create a ContextMessageLayer
func NewContextMessageLayer() MessageLayer {
   return &ContextMessageLayer{
      SendModuleName:     string(config.Config.Context.SendModule),
      ReceiveModuleName:  string(config.Config.Context.ReceiveModule),
      ResponseModuleName: string(config.Config.Context.ResponseModule),
   }
}

启动UpstreamController

// Start UpstreamController
func (uc *UpstreamController) Start() error {
   klog.Info("start upstream controller")

   uc.nodeStatusChan = make(chan model.Message, config.Config.Buffer.UpdateNodeStatus)
   uc.podStatusChan = make(chan model.Message, config.Config.Buffer.UpdatePodStatus)
   uc.configMapChan = make(chan model.Message, config.Config.Buffer.QueryConfigMap)
   uc.secretChan = make(chan model.Message, config.Config.Buffer.QuerySecret)
   uc.serviceChan = make(chan model.Message, config.Config.Buffer.QueryService)
   uc.endpointsChan = make(chan model.Message, config.Config.Buffer.QueryEndpoints)
   uc.persistentVolumeChan = make(chan model.Message, config.Config.Buffer.QueryPersistentVolume)
   uc.persistentVolumeClaimChan = make(chan model.Message, config.Config.Buffer.QueryPersistentVolumeClaim)
   uc.volumeAttachmentChan = make(chan model.Message, config.Config.Buffer.QueryVolumeAttachment)
   uc.queryNodeChan = make(chan model.Message, config.Config.Buffer.QueryNode)
   uc.updateNodeChan = make(chan model.Message, config.Config.Buffer.UpdateNode)
   uc.podDeleteChan = make(chan model.Message, config.Config.Buffer.DeletePod)

   go uc.dispatchMessage()

   for i := 0; i < int(config.Config.Load.UpdateNodeStatusWorkers); i++ {
      go uc.updateNodeStatus()
   }
   for i := 0; i < int(config.Config.Load.UpdatePodStatusWorkers); i++ {
      go uc.updatePodStatus()
   }
   for i := 0; i < int(config.Config.Load.QueryConfigMapWorkers); i++ {
      go uc.queryConfigMap()
   }
   for i := 0; i < int(config.Config.Load.QuerySecretWorkers); i++ {
      go uc.querySecret()
   }
   for i := 0; i < int(config.Config.Load.QueryServiceWorkers); i++ {
      go uc.queryService()
   }
   for i := 0; i < int(config.Config.Load.QueryEndpointsWorkers); i++ {
      go uc.queryEndpoints()
   }
   for i := 0; i < int(config.Config.Load.QueryPersistentVolumeWorkers); i++ {
      go uc.queryPersistentVolume()
   }
   for i := 0; i < int(config.Config.Load.QueryPersistentVolumeClaimWorkers); i++ {
      go uc.queryPersistentVolumeClaim()
   }
   for i := 0; i < int(config.Config.Load.QueryVolumeAttachmentWorkers); i++ {
      go uc.queryVolumeAttachment()
   }
   for i := 0; i < int(config.Config.Load.QueryNodeWorkers); i++ {
      go uc.queryNode()
   }
   for i := 0; i < int(config.Config.Load.UpdateNodeWorkers); i++ {
      go uc.updateNode()
   }
   for i := 0; i < int(config.Config.Load.DeletePodWorkers); i++ {
      go uc.deletePod()
   }
   return nil
}

5、下游控制器:将添加/更新/删除事件从K8s Api服务器同步到Edgecore

// NewDownstreamController create a DownstreamController from config
func NewDownstreamController() (*DownstreamController, error) {
   lc := &manager.LocationCache{}

   cli, err := utils.KubeClient()
   if err != nil {
      klog.Warningf("create kube client failed with error: %s", err)
      return nil, err
   }

   var nodeName = ""
   if config.Config.EdgeSiteEnable {
      if config.Config.NodeName == "" {
         return nil, fmt.Errorf("kubeEdge node name is not provided in edgesite controller configuration")
      }
      nodeName = config.Config.NodeName
   }

   podManager, err := manager.NewPodManager(cli, v1.NamespaceAll, nodeName)
   if err != nil {
      klog.Warningf("create pod manager failed with error: %s", err)
      return nil, err
   }

   configMapManager, err := manager.NewConfigMapManager(cli, v1.NamespaceAll)
   if err != nil {
      klog.Warningf("create configmap manager failed with error: %s", err)
      return nil, err
   }

   secretManager, err := manager.NewSecretManager(cli, v1.NamespaceAll)
   if err != nil {
      klog.Warningf("create secret manager failed with error: %s", err)
      return nil, err
   }

   nodesManager, err := manager.NewNodesManager(cli, v1.NamespaceAll)
   if err != nil {
      klog.Warningf("Create nodes manager failed with error: %s", err)
      return nil, err
   }

   serviceManager, err := manager.NewServiceManager(cli, v1.NamespaceAll)
   if err != nil {
      klog.Warningf("Create service manager failed with error: %s", err)
      return nil, err
   }

   endpointsManager, err := manager.NewEndpointsManager(cli, v1.NamespaceAll)
   if err != nil {
      klog.Warningf("Create endpoints manager failed with error: %s", err)
      return nil, err
   }

   dc := &DownstreamController{
      kubeClient:       cli,
      podManager:       podManager,
      configmapManager: configMapManager,
      secretManager:    secretManager,
      nodeManager:      nodesManager,
      serviceManager:   serviceManager,
      endpointsManager: endpointsManager,
      messageLayer:     messagelayer.NewContextMessageLayer(),
      lc:               lc,
   }
   if err := dc.initLocating(); err != nil {
      return nil, err
   }

   return dc, nil
}
odesManager,
      serviceManager:   serviceManager,
      endpointsManager: endpointsManager,
      messageLayer:     messagelayer.NewContextMessageLayer(),
      lc:               lc,
   }
   if err := dc.initLocating(); err != nil {
      return nil, err
   }

   return dc, nil
}
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐