1、Kube-APIServer 启动

APIServer 启动采用 Cobra 命令行,解析相关 flags 参数,经过 Complete(填充默认值)->Validate(校验) 逻辑后,通过 Run 启动服务。

在 Run 函数中,按序分别初始化 APIServer 链(APIExtensionsServer、KubeAPIServer、AggregatorServer),分别服务于 CRD(用户自定义资源)、K8s API(内置资源)、API Service(扩展外部的APIServer) 对应的资源请求,因此在处理API对象时,当API对象在Aggregator中找不到时,会去KubeAPIServer中找,再找不到则会去APIExtensions中找,三者通过 Aggregator –> KubeAPIServer –> APIExtensions 这样的方式顺序串联起来,这就是所谓的Chain的方式,或者Delegation的方式,实现了APIServer的扩展机制。

之后,经过非阻塞(NonBlockingRun) 方式启动 SecureServingInfo.Serve,并配置 HTTP2(默认开启) 相关传输选项,最后启动 Serve 监听客户端请求。

111

2、kube-apiserver get api资源流程

https://mp.weixin.qq.com/s/KCb8mbRg40hB6d8okgrMGA 该文章以内置资源的 handler 注册过程为线索介绍了 APiServer 的启动过程和 handler 注册过程。对APIServer的访问会先经过 filter,再路由给具体的 handler。filter 在 DefaultBuildHandlerChain 中定义,主要对请求做超时处理,认证,鉴权等操作。handler 的注册则是初始化 APIGoupInfo 并设置其 VersionedResourcesStorageMap 后作为入参,调用 GenericAPIServer.InstallAPIGroups即可完成 handler 的注册。k8s.io/apiserver/pkg/endpoints/handlers包中的代码则是对用户请求做编解码,对象版本转换,协议处理等操作,最后在交给rest.Storage 具体实现的接口进行处理。

3、server 层

Server模块对外提供服务器能力。主要包括调用调用Endpoints中APIInstaller完成路由注册,同时为apiserver的扩展做好服务器层面的支撑(主要是APIService这种形式扩展)

// 注册所有 apiGroupVersion 的处理函数 到restful.Container 中
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {
    for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
        apiGroupVersion := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
        if err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer); err != nil {
            ...
        }
    }
    return nil
}

除了路由注册到服务器的核心内容外,server模块还提供了如下内容:

  1. 路由层级的日志记录(在httplog模块)

  1. 健康检查的路由(healthz模块)

  1. 服务器级别的过滤器(filters模块),如,cors,请求数,压缩,超时等过滤器,

  1. server级别的路由(routes模块),如监控,swagger,openapi,监控等。

4、endpoint 层

位于 k8s.io/apiserver/pkg/endpoints 包下。根据Registry层返回的路径与存储逻辑的关联关系,完成服务器上路由的注册 <path,handler> ==> route ==> webservice。

// k8s.io/apiserver/pkg/endpoints/installer.go
typeAPIInstallerstruct {
    group             *APIGroupVersion
    prefix            string// Path prefix where API resources are to be registered.
    minRequestTimeouttime.Duration
}
// 一个Resource 下的 所有处理函数 都注册到 restful.WebService 中了
func (a*APIInstaller) registerResourceHandlers(pathstring, storagerest.Storage, ws*restful.WebService) (*metav1.APIResource, error) {
    // 遍历所有操作,完成路由注册
    for_, action :=rangeactions {
        ...
        switchaction.Verb {
            case"GET": // Get a resource.
                handler=restfulGetResource(getter, exporter, reqScope)
                ...
                route :=ws.GET(action.Path).To(handler).
                    Doc(doc).
                    Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
                    Returns(http.StatusOK, "OK", producedObject).
                    Writes(producedObject)
                ...
                routes=append(routes, route)
            case...
        }
        ...
        for_, route :=rangeroutes {
            ...
            ws.Route(route)
        }
    }
}
funcrestfulGetResource(rrest.Getter, erest.Exporter, scopehandlers.RequestScope) restful.RouteFunction {
    returnfunc(req*restful.Request, res*restful.Response) {
        handlers.GetResource(r, e, &scope)(res.ResponseWriter, req.Request)
    }
}
func (a*APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) {
    ws :=a.newWebService()
    ...
    for_, path :=rangepaths {
        apiResource, err :=a.registerResourceHandlers(path, a.group.Storage[path], ws)
        apiResources=append(apiResources, *apiResource)
    }
    returnapiResources, ws, errors
}
typeAPIGroupVersionstruct {
    Storagemap[string]rest.Storage// 对应上文的restStorageMap 
    Rootstring
}
// 一个APIGroupVersion 下的所有Resource处理函数 都注册到 restful.Container 中了
func (g*APIGroupVersion) InstallREST(container*restful.Container) error {
    prefix :=path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
    installer :=&APIInstaller{
        group:             g,
        prefix:            prefix,
        minRequestTimeout: g.MinRequestTimeout,
    }
    apiResources, ws, registrationErrors :=installer.Install()
    ...
    container.Add(ws)
    returnutilerrors.NewAggregate(registrationErrors)
}

同时在Endpoints还负责路径级别的操作:比如:到指定类型的认证授权,路径的调用统计,路径上的操作审计等。这部分内容通常在endpoints模块下的fileters内实现,这就是一层在http.Handler外做了一层装饰器,便于对请求进行拦截处理。

5、registry 层

实现各种资源对象的存储逻辑

  1. kubernetes/pkg/registry负责k8s内置的资源对象存储逻辑实现

  1. k8s.io/apiextensions-apiserver/pkg/registry负责crd和cr资源对象存储逻辑实现

k8s.io/apiserver/pkg/registry
    /generic
        /regisry
            /store.go       // 对storage 层封装,定义 Store struct
k8s.io/kubernetes/pkg/registry/core
    /pod
        /storage
            /storage.go     // 定义了 PodStorage struct,使用了Store struct
    /service
    /node
    /rest
        /storage_core.go

registry这一层比较分散,k8s在不同的目录下按照k8s的api组的管理方式完成各自资源对象存储逻辑的编写,主要就是定义各自的结构体,然后和Store结构体进行一次组合。

typePodStoragestruct {
    Pod                 *REST
    Log                 *podrest.LogREST
    Exec                *podrest.ExecREST
    ...
}
typeRESTstruct {
    *genericregistry.Store
    proxyTransporthttp.RoundTripper
}
func (cLegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGettergeneric.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
    ...
    // 关联路径 与各资源对象的关系
    restStorageMap :=map[string]rest.Storage{
        "pods":             podStorage.Pod,
        "pods/attach":      podStorage.Attach,
        "pods/status":      podStorage.Status,
        "pods/log":         podStorage.Log,
        "pods/exec":        podStorage.Exec,
        "pods/portforward": podStorage.PortForward,
        "pods/proxy":       podStorage.Proxy,
        "pods/binding":     podStorage.Binding,
        "bindings":         podStorage.LegacyBinding,
    }
}

6、存储层

位于 k8s.io/apiserver/pkg/storage 下,封装了对etcd 的操作,还提供了一个cacher 以减少对etcd 的访问压力(kube-apiserver本身对etcd实现了list-watch机制,将所有对象的最新状态和最近的事件存放到cacher里,所有外部组件对资源的访问都经过cacher),在Storage这一层,并不能感知到k8s资源对象之类的内容,纯粹的存储逻辑。

typeCacherstruct {
  incomingchanwatchCacheEvent  // incoming 事件管道, 会被分发给所有的watchers
  storagestorage.Interface  //storage 的底层实现
  objectTypereflect.Type   // 对象类型
  watchCache*watchCache   // watchCache 滑动窗口,维护了当前kind的所有的资源,和一个基于滑动窗口的最近的事件数组 
  reflector  *cache.Reflector  // reflector list并watch etcd 并将事件和资源存到watchCache中 
  watchersBuffer []*cacheWatcher   // watchersBuffer 代表着所有client-go客户端跟apiserver的连接
  ....
}

7、kube-apiserver 启动代码梳理

step01 apiserver启动 cmd/kube-apiserver/apiserver.go:34

step02 通过cobra启动 cmd/kube-apiserver/app/server.go:129

step03 创建 apiservers 包括APIExtensionsServer、KubeAPIServer、AggregatorServer) cmd/kube-apiserver/app/server.go:164

  • step03-01 初始化创建 KubeAPIServer 所需要的配置(apiServer启动参数配置、注册指标、ca配置、初始化认证授权配置等 cmd/kube-apiserver/app/server.go:182

  • step03-02 创建 apiExtensionsServer 实例,服务于 CRD(用户自定义资源) cmd/kube-apiserver/app/server.go:197

  • step03-02-01 注册APIGroup vendor/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go:166

  • step03-02-02 step03-06 registerResourceHandlers vendor/k8s.io/apiserver/pkg/endpoints/installer.go:191

  • step03-02-03 为每个类型的请求添加对应的 handler vendor/k8s.io/apiserver/pkg/endpoints/installer.go:693

  • step03-03 创建 kubeAPIServer 实例, 用于K8s API(内置资源) cmd/kube-apiserver/app/server.go:203

  • step03-03-01 InstallLegacyAPI pkg/controlplane/instance.go:383

  • step03-03-02 installAPIResources 为每一个 API resource 调用 apiGroupVersion.InstallREST 添加路由 vendor/k8s.io/apiserver/pkg/server/genericapiserver.go:712

  • step03-03-03 InstallREST 将 restful.WebService 对象添加到 container 中 vendor/k8s.io/apiserver/pkg/endpoints/groupversion.go:109

  • step03-03-04 installer.Install 返回最终的 restful.WebService 对象

  • step03-03-05 registerResourceHandlers 通过go-restful框架实现路由和对应handler处理逻辑的绑定 vendor/k8s.io/apiserver/pkg/endpoints/installer.go:192

  • step03-03-06 为每个类型的请求添加对应的handler vendor/k8s.io/apiserver/pkg/endpoints/installer.go:693

  • step03-04 创建AggregatorServer,用于API Service(API 扩展资源) 对应的资源请求 cmd/kube-apiserver/app/server.go:214

step04 运行前准备(健康检查、存活检查和OpenAPI路由的注册) cmd/kube-apiserver/app/server.go:170

step05 启动安全的http server提供服务 cmd/kube-apiserver/app/server.go:176

  • step05-01 启动http服务 staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go:459

  • step05-02 非阻塞式运行启动 SecureServingInfo.Serve,并配置 HTTP2(默认开启) 相关传输选项,最后启动 Serve 监听客户端请求。 staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go:535

step06 接受get请求

  • step06-01 get请求访问入口层,处理get请求的hanler vendor/k8s.io/apiserver/pkg/endpoints/handlers/get.go:86

  • step06-02 对请求数据进行处理层 vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go:743

  • step06-03 apiserver的cache层 vendor/k8s.io/apiserver/pkg/storage/cacher/cacher.go:547

  • step06-04 直接访问etcd层 vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go:125

参考文章:

资源API的注册 https://cloud.tencent.com/developer/article/2013498

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐