edgecontroller源码阅读
edgecontroller源码阅读文章目录edgecontroller源码阅读边缘控制器概述边缘控制器执行的操作下游控制器:将添加/更新/删除事件同步到边缘上游控制器:同步监视并更新资源和事件的状态控制器管理员:创建管理器接口并实现ConfigmapManager,LocationCache和podManager1、入口函数2、结构定义、初始化3、启动方法4、上游控制器:接收来自edgecore
·
edgecontroller源码阅读
文章目录
边缘控制器概述
EdgeController是Kubernetes Api服务器和Edgecore之间的桥梁
边缘控制器执行的操作
以下是Edge控制器执行的功能:
–下游控制器:将添加/更新/删除事件从K8s Api服务器同步到Edgecore
-上游控制器:将监视和更新资源和事件(节点,pod和configmap)的状态同步到K8s-Api服务器,还订阅来自edgecore的消息-控制器管理器:创建管理器接口,该接口实现用于管理ConfigmapManager,LocationCache和podManager的事件
下游控制器:
将添加/更新/删除事件同步到边缘
- 下游控制器:监视K8S-Api服务器并通过cloudHub将更新发送到Edgecore
- 通过cloudHub将(pod,configmap,secret)添加/更新/删除事件同步到边缘
- 通过调用管理器界面创建相应的管理器(pod,configmap,secret)以处理事件
- 找到configmap和secret应该发送到哪个节点
上游控制器:
同步监视并更新资源和事件的状态
- UpstreamController接收来自edgecore的消息并将更新同步到K8S-Api服务器
- 创建停止通道以调度和停止事件以处理Pod,configMap,node和secret
- 创建消息通道以更新Nodestatus,Podstatus,Secret和configmap相关事件
- 获取Podcondition信息,例如Ready,Initialized,Podscheduled和Unscheduled详细信息
- 以下是PodCondition的信息
- Ready:PodReady表示该Pod能够处理请求,应将其添加到所有匹配服务的负载平衡池中
- PodScheduled:表示此Pod的调度过程的状态
- Unschedulable:这意味着现在无法调度安排pod ,可能是由于集群中的资源不足
- Initialized:这意味着容器中的所有Init容器已成功启动
- ContainersReady:指示容器中的所有容器是否已准备就绪
- 以下是PodStatus的信息
- PodPhase:容器的当前状态
- Conditions:详细说明pod为何处于这种状态
- HostIP:分配了pod的主机的IP地址
- PodIp:分配给Pod的IP地址
- QosClass:根据资源需求分配给Pod
控制器管理员:
创建管理器接口并实现ConfigmapManager,LocationCache和podManager
- Manager定义管理器的接口,ConfigManager,Podmanager,secretmanager实现
- 管理OnAdd,OnUpdate和OnDelete事件,这些事件将从K8s-Api服务器更新到相应的边缘节点
- 创建一个eventManager(configMaps,pod,secrets),它将为每个事件启动CommonResourceEventHandler,NewListWatch和newShared Informer,以通过cloudHub将事件(pod,configmap,secret)同步(添加/更新/删除)到edgecore
- 以下是控制器管理器创建的处理程序列表
- CommonResourceEventHandler:NewcommonResourceEventHandler创建用于Configmap和Pod Manager的CommonResourceEventHandler
- NewListWatch:从指定的客户端资源名称空间和字段选择器创建新的ListWatch
- NewSharedInformer:为Listwatcher创建一个新实例
1、入口函数
kubeedge\cloud\cmd\cloudcore\app\server.go
// registerModules register all the modules started in cloudcore
func registerModules(c *v1alpha1.CloudCoreConfig) {
cloudhub.Register(c.Modules.CloudHub, c.KubeAPIConfig)
edgecontroller.Register(c.Modules.EdgeController, c.KubeAPIConfig, "", false)
devicecontroller.Register(c.Modules.DeviceController, c.KubeAPIConfig)
synccontroller.Register(c.Modules.SyncController, c.KubeAPIConfig)
cloudstream.Register(c.Modules.CloudStream)
}
2、结构定义、初始化
// EdgeController use beehive context message layer
type EdgeController struct {
enable bool
}
func newEdgeController(enable bool) *EdgeController {
return &EdgeController{
enable: enable,
}
}
3、启动方法
// Start controller
func (ec *EdgeController) Start() {
upstream, err := controller.NewUpstreamController()
if err != nil {
klog.Errorf("new upstream controller failed with error: %s", err)
os.Exit(1)
}
if err := upstream.Start(); err != nil {
klog.Fatalf("start upstream failed with error: %s", err)
}
downstream, err := controller.NewDownstreamController()
if err != nil {
klog.Fatalf("new downstream controller failed with error: %s", err)
}
if err := downstream.Start(); err != nil {
klog.Fatalf("start downstream failed with error: %s", err)
}
}
4、上游控制器:接收来自edgecore的消息并将更新同步到K8S-Api服务器
// NewUpstreamController create UpstreamController from config
func NewUpstreamController() (*UpstreamController, error) {
cli, err := utils.KubeClient()
if err != nil {
klog.Warningf("create kube client failed with error: %s", err)
return nil, err
}
uc := &UpstreamController{
kubeClient: cli,
messageLayer: messagelayer.NewContextMessageLayer(),
}
return uc, nil
}
// NewContextMessageLayer create a ContextMessageLayer
func NewContextMessageLayer() MessageLayer {
return &ContextMessageLayer{
SendModuleName: string(config.Config.Context.SendModule),
ReceiveModuleName: string(config.Config.Context.ReceiveModule),
ResponseModuleName: string(config.Config.Context.ResponseModule),
}
}
启动UpstreamController
// Start UpstreamController
func (uc *UpstreamController) Start() error {
klog.Info("start upstream controller")
uc.nodeStatusChan = make(chan model.Message, config.Config.Buffer.UpdateNodeStatus)
uc.podStatusChan = make(chan model.Message, config.Config.Buffer.UpdatePodStatus)
uc.configMapChan = make(chan model.Message, config.Config.Buffer.QueryConfigMap)
uc.secretChan = make(chan model.Message, config.Config.Buffer.QuerySecret)
uc.serviceChan = make(chan model.Message, config.Config.Buffer.QueryService)
uc.endpointsChan = make(chan model.Message, config.Config.Buffer.QueryEndpoints)
uc.persistentVolumeChan = make(chan model.Message, config.Config.Buffer.QueryPersistentVolume)
uc.persistentVolumeClaimChan = make(chan model.Message, config.Config.Buffer.QueryPersistentVolumeClaim)
uc.volumeAttachmentChan = make(chan model.Message, config.Config.Buffer.QueryVolumeAttachment)
uc.queryNodeChan = make(chan model.Message, config.Config.Buffer.QueryNode)
uc.updateNodeChan = make(chan model.Message, config.Config.Buffer.UpdateNode)
uc.podDeleteChan = make(chan model.Message, config.Config.Buffer.DeletePod)
go uc.dispatchMessage()
for i := 0; i < int(config.Config.Load.UpdateNodeStatusWorkers); i++ {
go uc.updateNodeStatus()
}
for i := 0; i < int(config.Config.Load.UpdatePodStatusWorkers); i++ {
go uc.updatePodStatus()
}
for i := 0; i < int(config.Config.Load.QueryConfigMapWorkers); i++ {
go uc.queryConfigMap()
}
for i := 0; i < int(config.Config.Load.QuerySecretWorkers); i++ {
go uc.querySecret()
}
for i := 0; i < int(config.Config.Load.QueryServiceWorkers); i++ {
go uc.queryService()
}
for i := 0; i < int(config.Config.Load.QueryEndpointsWorkers); i++ {
go uc.queryEndpoints()
}
for i := 0; i < int(config.Config.Load.QueryPersistentVolumeWorkers); i++ {
go uc.queryPersistentVolume()
}
for i := 0; i < int(config.Config.Load.QueryPersistentVolumeClaimWorkers); i++ {
go uc.queryPersistentVolumeClaim()
}
for i := 0; i < int(config.Config.Load.QueryVolumeAttachmentWorkers); i++ {
go uc.queryVolumeAttachment()
}
for i := 0; i < int(config.Config.Load.QueryNodeWorkers); i++ {
go uc.queryNode()
}
for i := 0; i < int(config.Config.Load.UpdateNodeWorkers); i++ {
go uc.updateNode()
}
for i := 0; i < int(config.Config.Load.DeletePodWorkers); i++ {
go uc.deletePod()
}
return nil
}
5、下游控制器:将添加/更新/删除事件从K8s Api服务器同步到Edgecore
// NewDownstreamController create a DownstreamController from config
func NewDownstreamController() (*DownstreamController, error) {
lc := &manager.LocationCache{}
cli, err := utils.KubeClient()
if err != nil {
klog.Warningf("create kube client failed with error: %s", err)
return nil, err
}
var nodeName = ""
if config.Config.EdgeSiteEnable {
if config.Config.NodeName == "" {
return nil, fmt.Errorf("kubeEdge node name is not provided in edgesite controller configuration")
}
nodeName = config.Config.NodeName
}
podManager, err := manager.NewPodManager(cli, v1.NamespaceAll, nodeName)
if err != nil {
klog.Warningf("create pod manager failed with error: %s", err)
return nil, err
}
configMapManager, err := manager.NewConfigMapManager(cli, v1.NamespaceAll)
if err != nil {
klog.Warningf("create configmap manager failed with error: %s", err)
return nil, err
}
secretManager, err := manager.NewSecretManager(cli, v1.NamespaceAll)
if err != nil {
klog.Warningf("create secret manager failed with error: %s", err)
return nil, err
}
nodesManager, err := manager.NewNodesManager(cli, v1.NamespaceAll)
if err != nil {
klog.Warningf("Create nodes manager failed with error: %s", err)
return nil, err
}
serviceManager, err := manager.NewServiceManager(cli, v1.NamespaceAll)
if err != nil {
klog.Warningf("Create service manager failed with error: %s", err)
return nil, err
}
endpointsManager, err := manager.NewEndpointsManager(cli, v1.NamespaceAll)
if err != nil {
klog.Warningf("Create endpoints manager failed with error: %s", err)
return nil, err
}
dc := &DownstreamController{
kubeClient: cli,
podManager: podManager,
configmapManager: configMapManager,
secretManager: secretManager,
nodeManager: nodesManager,
serviceManager: serviceManager,
endpointsManager: endpointsManager,
messageLayer: messagelayer.NewContextMessageLayer(),
lc: lc,
}
if err := dc.initLocating(); err != nil {
return nil, err
}
return dc, nil
}
odesManager,
serviceManager: serviceManager,
endpointsManager: endpointsManager,
messageLayer: messagelayer.NewContextMessageLayer(),
lc: lc,
}
if err := dc.initLocating(); err != nil {
return nil, err
}
return dc, nil
}
更多推荐
已为社区贡献1条内容
所有评论(0)