使用KubeSphere的应用商店在k8s中部署应用过程代码解读

1.kubesphere应用商店工作原理图

在这里插入图片描述

以根据应用市场的应用模板部署实例为例:
1.用户通过KubeSphere API发起部署实例请求;
2.由KuveSphere API路由请求到Openpitrix API,发起创建Cluster请求;
3.Openpitrix中的Cluster Service收到请求后,向Job Service发起创建Cluster的Job;
4.Job Controler监听到新的Job被创建,解析Job类型,构建相关信息,向Task Service发起创建Cluster的请求;
5.Task Controller监听到新的Task被创建,解析到为创建Cluster的任务,构建请求信息,向Openpitrix中的Helm Proxy service发起创建Cluster请求
6.Helm Proxy Service转发请求到Helm Service,发起创建实例请求。
最终由Job Service维护Cluster的创建状态,若Task执行失败,则修改Cluster状态为Failed,将transition status修改为空。

2.KubeSphere API启动流程

2.1 根据启动参数构建apiserver实例

cmd/ks-apiserver包的main方法中,构建启动命令行,代码如下:

func main() {

	cmd := app.NewAPIServerCommand()   // 构建的启动参数

	if err := cmd.Execute(); err != nil {
		log.Fatalln(err)
	}
}

有代码可以看到是通过app包中的NewAPIServerCommand方法构建的启动参数,具体代码如下:

func NewAPIServerCommand() *cobra.Command {
	s := options.NewServerRunOptions()  // 构建Server启动可以识别的命令行参数

	// Load configuration from file
	conf, err := apiserverconfig.TryLoadFromDisk() // 从磁盘加载配置文件内容
    ...
    ...
	cmd := &cobra.Command{
		Use: "ks-apiserver",
		Long: `The KubeSphere API server validates and configures data for the API objects. 
The API Server services REST operations and provides the frontend to the
cluster's shared state through which all other components interact.`,
		RunE: func(cmd *cobra.Command, args []string) error {
			if errs := s.Validate(); len(errs) != 0 {  // 校验命令行参数或配置文件配置项
				return utilerrors.NewAggregate(errs)
			}

			return Run(s, signals.SetupSignalHandler()) 
		},
		SilenceUsage: true,
	}    // 构建命令行启动的匿名方法

	fs := cmd.Flags()
	namedFlagSets := s.Flags()
	for _, f := range namedFlagSets.FlagSets {
		fs.AddFlagSet(f)
	}

	usageFmt := "Usage:\n  %s\n"
	cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
	cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
		fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
		cliflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols)
	})
	return cmd
}

在上述方法中,通过构建server启动参数,返回了一个cobra.Command的实例cmd,通过在main方法中执行cmd.Execute方法,可调用cmdRun或者RunE方法,通过上述代码可以看到,在cmd实例的RUNE方法中构建了一个匿名函数,并同时返回了下述的Run方法

func Run(s *options.ServerRunOptions, stopCh <-chan struct{}) error {

	initializeServicemeshConfig(s)   // 初始化服务网格配置

	apiserver, err := s.NewAPIServer(stopCh)   // 构建apiserver实例
	......
	err = apiserver.PrepareRun(stopCh)    // 准备apiserver运行所需的资源,包括注册API路由等
	......

	return apiserver.Run(stopCh)   // 启动ApiServer
}

此处不概述初始化服务网格相关的配置的过程,有兴趣的可自行在ks-apiserver/app包中的server.go中查看该方法的实现流程;接下来看构建apiserver实例的过程,此方法在ks-apiserver/app/options包中,代码如下:

// NewAPIServer creates an APIServer instance using given options
func (s *ServerRunOptions) NewAPIServer(stopCh <-chan struct{}) (*apiserver.APIServer, error) {
	apiServer := &apiserver.APIServer{   // 初始化APIServer实例
		Config: s.Config,
	}
	......   // 省略部分为加载其他相关服务的客户端过程
	......
	if s.OpenPitrixOptions != nil && !s.OpenPitrixOptions.IsEmpty() {  // 加载应用商店客户端信息
		opClient, err := openpitrix.NewClient(s.OpenPitrixOptions)
		if err != nil {
			return nil, fmt.Errorf("failed to connect to openpitrix, please check openpitrix status, error: %v", err)
		}
		apiServer.OpenpitrixClient = opClient
	}

	server := &http.Server{
		Addr: fmt.Sprintf(":%d", s.GenericServerRunOptions.InsecurePort),
	}

	if s.GenericServerRunOptions.SecurePort != 0 {
		certificate, err := tls.LoadX509KeyPair(s.GenericServerRunOptions.TlsCertFile, s.GenericServerRunOptions.TlsPrivateKey)
		if err != nil {
			return nil, err
		}
		server.TLSConfig.Certificates = []tls.Certificate{certificate}
	}   // 构建使用证书的安全端口

	apiServer.Server = server

	return apiServer, nil
}

2.3 准备启动资源,注册路由

从上述过程中可以看到构建完APIServer的实例apiserver后,紧接着执行apiserver.PrepareRun方法,准备启动资源并注册路由信息,代码在kubesphere/pkg/apiserver包,具体如下:

func (s *APIServer) PrepareRun(stopCh <-chan struct{}) error {

	s.container = restful.NewContainer()
	s.container.Filter(logRequestAndResponse)  // 构建请求和响应记录方法
	s.container.Router(restful.CurlyRouter{})
	s.container.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
		logStackOnRecover(panicReason, httpWriter)
	})   // 注册错误记录方法

	s.installKubeSphereAPIs()  // 注册路由信息
	......

	s.buildHandlerChain(stopCh)  // 构建准入控制方法

	return nil
}

进一步查看注册路由的方法installKubeSphereAPIs():

// Install all KubeSphere api groups
// Installation happens before all informers start to cache objects, so
//   any attempt to list objects using listers will get empty results.
func (s *APIServer) installKubeSphereAPIs() {
	......
	urlruntime.Must(openpitrixv1.AddToContainer(s.container, s.InformerFactory, s.OpenpitrixClient))
	......
}

在此方法中通过调用openpitrixv1.AddToContainer方法注册应用商店openpitrix相关的路由信息,此方法在pkg/kapis/openpitrix包中,代码如下:

const (
	GroupName = "openpitrix.io"
)

var GroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1"}

func AddToContainer(c *restful.Container, factory informers.InformerFactory, op op.Client) error {
	if op == nil {
		return nil
	}
	mimePatch := []string{restful.MIME_JSON, runtime.MimeMergePatchJson, runtime.MimeJsonPatchJson}
	webservice := runtime.NewWebService(GroupVersion)
	handler := newOpenpitrixHandler(factory, op)
	...... // 注册其他路由信息
        webservice.Route(webservice.POST("/workspaces/{workspace}/clusters/{cluster}/namespaces/{namespace}/applications").
		To(handler.CreateApplication).
		Doc("Deploy a new application").
		Metadata(restfulspec.KeyOpenAPITags, []string{constants.NamespaceResourcesTag}).
		Reads(openpitrix2.CreateClusterRequest{}).
		Returns(http.StatusOK, api.StatusOK, errors.Error{}).
		Param(webservice.PathParameter("cluster", "the name of the cluster.").Required(true)).
		Param(webservice.PathParameter("namespace", "the name of the project").Required(true)))  // 注册创建应用的路由
	......
	...... // 注册其他路由信息

	c.Add(webservice)

	return nil
}

由此可见,当访问路径为/kapis/openpitrix.io/v1//workspaces/{workspace}/clusters/{cluster}/namespaces/{namespace}/applicationsPOST请求时会被路由到handler.CreateApplication方法。

2.3 启动APIServer

通过执行apiserver.Run方法启动apiserver:

func (s *APIServer) Run(stopCh <-chan struct{}) (err error) {

	err = s.waitForResourceSync(stopCh)  // 缓存资源
	.....

	klog.V(0).Infof("Start listening on %s", s.Server.Addr)
	if s.Server.TLSConfig != nil {
		err = s.Server.ListenAndServeTLS("", "")  // 监听端口
	} else {
		err = s.Server.ListenAndServe()
	}

	return err
}

先缓存下需要监听的资源,再启动apiserver进程,监听指定端口。

3.根据应用模板创建应用实例流程

3.1 KubeSphere API

以下代码为KubeSphere API接收到创建应用实例请求后所做的工作:

func (h *openpitrixHandler) CreateApplication(req *restful.Request, resp *restful.Response) {
	clusterName := req.PathParameter("cluster")  // 获取请求信息
	namespace := req.PathParameter("namespace")  
	var createClusterRequest openpitrix.CreateClusterRequest  
	err := req.ReadEntity(&createClusterRequest)  // 构建向openpitrix发起请求的请求体
	......  // 异常处理
	createClusterRequest.Username = req.HeaderParameter(constants.UserNameHeader)

	err = h.openpitrix.CreateApplication(clusterName, namespace, createClusterRequest) // 通过openpitirx客户端发起创建应用实例请求
	......  // 异常处理
	resp.WriteEntity(errors.None)
}

工作内容主要包括:

1.获取请求信息
2.构建向openpitrix发起请求的请求体
3.通过openpitirx客户端发起创建应用实例请求

h.openpitrix.CreateApplication方法定义在pkg/models/openpitrix包中,有兴趣的可自行查看,此方法只是调用openpitrix客户端的CreateCluster方法

3.2 Openpitrix Cluster API

func (p *Server) CreateCluster(ctx context.Context, req *pb.CreateClusterRequest) (*pb.CreateClusterResponse, error) {
	return p.createCluster(ctx, req, false)   // 调用createCluster方法
}

Openpitrix Cluster接收到请求后,会直接调用自身的createCluster方法,内容如下:

func (p *Server) createCluster(ctx context.Context, req *pb.CreateClusterRequest, debug bool) (*pb.CreateClusterResponse, error) {
	s := ctxutil.GetSender(ctx)
	...... 
    ...... // 解析请求数据,获取创建Job所需的信息

	newJob := models.NewJob(
		constants.PlaceHolder,
		clusterId,
		appId,
		versionId,
		constants.ActionCreateCluster,
		directive,
		runtime.Runtime.Provider,
		s.GetOwnerPath(),
		runtimeId,
	) // 创建Job实例
 
	jobId, err := jobclient.SendJob(ctx, newJob)  // 发送创建Job请求
	...... // 异常处理
	res := &pb.CreateClusterResponse{
		ClusterId: pbutil.ToProtoString(clusterId),
		JobId:     pbutil.ToProtoString(jobId),
	}
	return res, nil
}

Cluster服务接收都创建应用集群实例后,所做如下工作:

1.解析请求数据,获取创建Job所需的信息
2.创建Job实例
3.发送创建Job请求

3.3 Job Server

Job Server服务中创建Job代码如下:

func (p *Server) CreateJob(ctx context.Context, req *pb.CreateJobRequest) (*pb.CreateJobResponse, error) {
	s := ctxutil.GetSender(ctx)
	newJob := models.NewJob(
		"",
		req.GetClusterId().GetValue(),
		req.GetAppId().GetValue(),
		req.GetVersionId().GetValue(),
		req.GetJobAction().GetValue(),
		req.GetDirective().GetValue(),
		req.GetProvider().GetValue(),
		s.GetOwnerPath(),
		req.GetRuntimeId().GetValue(),
	)  // 初始化Job数据

	_, err := pi.Global().DB(ctx).
		InsertInto(constants.TableJob).
		Record(newJob).
		Exec()  // 将Job信息插入数据库job表中
	...... // 异常处理

	err = p.controller.queue.Enqueue(newJob.JobId)  // 将Job添加到Job controller所监听的对列中
	...... // 异常处理

	res := &pb.CreateJobResponse{
		JobId:     pbutil.ToProtoString(newJob.JobId),
		ClusterId: pbutil.ToProtoString(newJob.ClusterId),
		AppId:     pbutil.ToProtoString(newJob.AppId),
		VersionId: pbutil.ToProtoString(newJob.VersionId),
	}  // 构建创建结果
	return res, nil
}

Job Server所做的内容如下:

1.初始化Job数据
2.将Job信息插入数据库job表中
3.将Job添加到Job controller所监听的对列中
4.构建创建结果并返回

3.3 Job Controller

func (c *Controller) HandleJob(ctx context.Context, jobId string, cb func()) error {
	ctx = ctxutil.AddMessageId(ctx, jobId)

	defer cb()

	job := &models.Job{
		JobId:  jobId,
		Status: constants.StatusWorking,
	}   // 初始化Job信息
	......
	......
	err = func() error {
		......  // 数据库查询Job信息,并做Job处理之前的预先处理

		providerClient, err := providerclient.NewRuntimeProviderManagerClient()  // 获取应用商店的类型,默认为Helm
		if err != nil {
			return gerr.NewWithDetail(ctx, gerr.Internal, err, gerr.ErrorInternalError)
		}
		response, err := providerClient.SplitJobIntoTasks(ctx, &pb.SplitJobIntoTasksRequest{
			RuntimeId: pbutil.ToProtoString(job.RuntimeId),
			Job:       models.JobToPb(job),
		})   // 通过Helm Proxy Server将Job拆分为Task
		...... // 获取Task Service客户端
		successful := true
		module.WalkTree(func(parent *models.TaskLayer, current *models.TaskLayer) {
			...... // 处理子任务

			if current != nil {   // 处理当前任务
				for _, currentTask := range current.Tasks {
					if !successful {
						currentTask.Status = constants.StatusFailed
					}
					currentTask.TaskId, err = taskClient.SendTask(ctx, currentTask)  // 创建Task
				...... 
				if current.IsLeaf() {
					for _, currentTask := range current.Tasks {
						err = taskClient.WaitTask(ctx, currentTask.TaskId, currentTask.GetTimeout(constants.MaxTaskTimeout), constants.WaitTaskInterval) // 等待Task处理完成
                        ......
                        ......

		processor.Job.Status = constants.StatusSuccessful
		return processor.Post(ctx)  // Task完成后需要做的部分
	}()
	...... //根据处理结果定义Job的状态

	err = c.updateJobAttributes(ctx, jobId, map[string]interface{}{
		constants.ColumnStatus:     status,
		constants.ColumnStatusTime: time.Now(),
	})  // 修改Job状态
	if err != nil {
		logger.Error(ctx, "Failed to update job: %+v", err)
	}

	return err
}

Job Controller所做的工作如下:

1.初始化Job信息
2.数据库查询Job信息,并做Job处理之前的预先处理
3.获取应用商店的类型,默认为Helm
4.通过Helm Proxy Server将Job拆分为Task
5.获取Task Service客户端
6.创建Task
7.等待Task处理完成

3.4 Task Contoller

task controller监听task创建的queue,当监听到有新的task创建后,会在HandleTask方法中进行相关的处理,处理流程如下:

func (c *Controller) HandleTask(ctx context.Context, taskId string, cb func()) error {
	......
    ...... // 根据参数中的TaskId从数据库查询对应的Task记录,并更新状态为working
	err = func() error {
		processor := NewProcessor(task)  // 初始化Task的处理实例
		err = processor.Pre(ctx)  // 做预先处理,创建cluster时不做任何处理
		if err != nil {
			logger.Error(ctx, "Executing task pre processor failed: %+v", err)
			return err
		}

		pilotClient, err := pilotclient.NewClient()
		if err != nil {
			logger.Error(ctx, "Connect to pilot service failed: %+v", err)
			return err
		}

		if task.Target == constants.TargetPilot {
			......
            ...... //创建cluster时Target为default
		} else {
			providerClient, err := providerclient.NewRuntimeProviderManagerClient() // 获取Helm Proxy Service客户端
			handleResponse, err := providerClient.HandleSubtask(withTimeoutCtx, &pb.HandleSubtaskRequest{
				RuntimeId: pbutil.ToProtoString(task.Target),
				Task:      models.TaskToPb(task),
			})  // 调用Helm Proxy Servcie处理任务
            ...... // 异常处理
			waitResponse, err := providerClient.WaitSubtask(withTimeoutCtx, &pb.WaitSubtaskRequest{
				RuntimeId: handleResponse.Task.Target,
				Task:      handleResponse.Task,
			})  // 等待task处理完成
			......
            ......
	}()
    ......
    ......// 修改任务状态
}

Task Controller所做内容如下:

1.根据参数中的TaskId从数据库查询对应的Task记录,并更新状态为working
2.初始化Task的Processor处理实例
3.做预先处理,创建cluster时不做任何处理
4.获取Helm Proxy Service客户端
5.调用Helm Proxy Servcie的`HandleTask`方法开始处理任务
6.调用Helm Proxy Servcie的`WaitSubtask`方法等待task处理完成
7.修改Task的状态

3.5 Helm Proxy Server

Task Controller调用HandleTask后,Helm Proxy servcie开始处理task,代码定义在openpitrix/pkg/service/helm/handler.go中,处理流程如下:

func (s *Server) HandleSubtask(ctx context.Context, req *pb.HandleSubtaskRequest) (*pb.HandleSubtaskResponse, error) {
	task := models.PbToTask(req.GetTask())  // 从请求信息中序列化task信息
	directive, err := decodeTaskDirective(task.Directive)
	......
    ......
	proxy := NewProxy(ctx, directive.RuntimeId)  // 获取helm客户端
	cfg, _ := proxy.GetHelmConfig(directive.Namespace)

	switch task.TaskAction {
	case constants.ActionCreateCluster:
		c, _, err := getChartAndAppId(ctx, directive.VersionId) // 获取chart信息和appId
		if err != nil {
			return nil, err
		}

		rawVals := make(map[string]interface{})
		err = jsonutil.Decode([]byte(directive.Values), &rawVals)
		if err != nil {
			return nil, err
		}
		rawVals = trimStringInMap(rawVals)
		logger.Debug(ctx, "Install helm release with name [%+v], namespace [%+v], values [%s]", directive.ClusterName, directive.Namespace, rawVals)

		err = proxy.InstallReleaseFromChart(cfg, c, rawVals, directive.ClusterName, directive.Namespace)  // 通过Helm安装应用
		if err != nil {
			return nil, err
		}
	...... // 处理其他动作的任务
	return &pb.HandleSubtaskResponse{
		Task: models.TaskToPb(task),
	}, nil  // 返回处理结果
}

HandleTask所做的内容如下:

1.从请求信息中序列化task信息
2.获取helm客户端
3.获取chart信息和appId
4.通过Helm安装应用
5.返回处理结果

Task Controller调用HandleTask后,Helm Proxy Servcie会开始通过helm安装应用实例,并返回安装接口的调用结果,接下来会通过WaitTask监听任务的执行情况,在该方法中由于是创建应用cluster,因此主要内容便是根据所创建的容器达到Ready状态的数量与预期数量是否一直判断任务是否完成。

以上便是根据KubeSphere的应用市场在k8s部署实例的整个过程的具体实现。

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐