「技术直达」系列又回来啦!道客船长「技术直达」系列,关注国内外云原生领域的技术和前沿趋势,为开发者和企业提供最新的理论和实践干货。近期为大家带来 K8S API-Server 源码剖析,持续更新「理论+实践」的系列干货文章。

作者简介

6f2f70c40167f512527ada599411c5ce.png

周尧

DaoCloud 后端工程师,热衷于研究云原生技术,CKA/CKAD 资格认证,Kubernetes 社区成员

在上期「探索API」的文章中,我们了解到 K8S API 的大致内容和规范。其实这些 API 都是通过 API-Server 来进行注册的,所以本次内容主要讲 API-Server 中注册这些 API 的流程,或者说一个 API-Server 整个启动的流程。

在开始之前,我们要了解到,k8s 的 API 使用的是 Go-restful 这套机制来进行注册的,也就是会涉及到 `Webservice`,`Container`, `Route` 这些概念,所以在开篇之前我们会简单了解一下 Go-restfutl。

1. Go-restful

Go-restful 中有三个非常重要的概念:     1. Container。 表示一个服务器,由多个 WebService 和一个 http.ServeMux 组成,使用 RouteSelector 进行分发。       2. WebService。 表示一个服务,由多个路由(Route)组成,他们共享同一个Root Path。

      3. Route。表示一个路由,包含三部分:URL,HTTP method,回调处理函数RouteFunction。

图示如下:

7b2f0afef9f22c68dabcc9861d1ca9a0.png

可以看到上面在同一个 Container 中有两个 WebService,在第一个 WebService 中,Root Path 是 `/users`,那么我们看它的第二个路由 `GET(/{user-id})` 也就对应的请求就应该是:

GET http://localhost:/users/123。

当然,上图还没有表示出其回调处理函数 RouteFunction 。一般的方法就是在 Route 后面跟上 To(handlefunc)。然后就可以用 HandleFunc 来作为回调了。注意,这个 HandleFunc 一定是会实现 golang 中的 http.Handler 接口:

1type Handler interface {
2    ServeHTTP(ResponseWriter, *Request)
3}

顺带一提,很多时候可能实现这个接口的时候,函数名并不叫 `ServeHTTP`,那是因为它使用的 Golang 中函数类型的特性,说白了就是一个函数装饰器,这里就不细讲了。总之如果看到了传入参数是 `ResponseWriter` 和 `Request` 的函数,那么它就是在实现这个 ServeHTTP 接口。

总之, 在使用 Go-restful 框架中,总会遵循 Container → WebService → Route → HandleFunc 的流程来实现,在下面关于 API-Server 的介绍中,我们也会基本按照这个流程在找到其运作的方式。

2. 启动 API-Server

在一般的 K8S 集群中,API-Server 都是以静态 Pod 在 `/etc/kubernetes/manifests/` 中启动的,打开文件 `kube-apiserver.yaml` 可以看到:
1spec:
2  containers:
3  - command:
4    - kube-apiserver
5    - --advertise-address=10.6.192.7
6    - --allow-privileged=true
7    - --authorization-mode=Node,RBAC

其实就是将 API-Server 的容器中执行了这么一条带有很多参数的命令就启动了 API-Server 了。 这个命令就是使用 Go 的  Cobra 框架 ,启动的函数在:
 1// cmd/kube-apiserver/app/server.go
2
3func Run(completeOptions completedServerRunOptions, stopCh chan struct{}) error {
4    // To help debugging, immediately log version
5    klog.Infof("Version: %+v", version.Get())
6    server, err := CreateServerChain(completeOptions, stopCh)
7    if err != nil {
8        return err
9    }
10    prepared, err := server.PrepareRun()
11    if err != nil {
12        return err
13    }
14    return prepared.Run(stopCh)
整个启动流程从这个函数来看就只有三步:

    1. 创建 Server 端。

    2. 准备相关信息。

    3. 启动 Server 端。

其中 `CreateServerChain` 也就是创建了一个将原生的 API-Server 和扩展的 API-Server 的聚合 API-Server,本篇内容只讨论原生的 API-Server。

另外,在创建的过程中,有很多关于认证,授权,准入相关的配置,这一点我们会在下一节中讲到。而这里的第一步,创建 Server 端,就是主要在做注册 API 这么一件事,我们回到上一篇所介绍的三类 API:

e09f1262c87000aaf632f0894b91ca54.png   

     1. Core group API( 在 /api/v1 路径下 ),在代码中叫做 LegacyAPI

    2. Named groups API( 在对应的 /apis/$NAME/$VERSION 路径下 )

    3. System-wide API(比如 /metrics,/healthz )

这三类API的注册可以在 pkg/master/master.go 中的 New 函数 k8s.io/apiserver/pkg/server/config.go 中一个 New 的函数找到:

    1. InstallLegacyAPI 注册 Core group API

    2. InstallAPIs 注册 Name groups API 

    3. installAPI 注册 System-wide API

这几个函数的目的都是配置 API 接口并将其注册到 Server 的 Container 中去,其实现过程大同小异。

同样的,为了节省篇幅,我们不去探究所有的 API 资源的注册流程,这里我们还是以 Deployment 来探索它的 API 的注册流程。

在细究其实现细节之前,我们考虑两个问题:

    1. Deployment 这种 API 资源是如何定义的?

    2. Deployment 的路由有 /apis/apps/v1beta1/namespaces/default/deployments 和 /apis/apps/v1/namespaces/default/deployments 等等,这种多版本的请求路径是怎么来的?

我们会在后面解析这些问题。

3. Deployment 定义及路由注册

关于 D eployment 的资源定义在了 pkg/apis/apps/types.go 中:

f85682acbd6e28bc456974d989a9cfc6.png

我们可以使用 `kubectl get deployment/ -o yaml` 来得到一个完整的 Deployment 的描述:

其中 TypeMeta 就是 `kind` 和 `apiVersion` 字段;objectMeta 对应其 `metadata` 之下的字段;DeploymentSpec 对应`spec`之下的字段;DeploymentStatus 对一个其 `status` 之下的字段。

3.1 Deployment 多版本

那么这个定义在 Deployment 的 API 注册中是如何被用到的呢?

这里必须要提一下 API-Server 中的 REST-Storage 的概念,这也基本上是 API 注册流程中最重要的概念,在我的理解中,它就像转换成 WebService 之前的中间对象,每一个 Rest-Storage,对应一种 GroupKind,例如 “apps/deployment”。里面定义了很多关于这种资源的使用函数,在后面的 http Method(GET/POST/PUT/DELETE) 的路由回调函数中会使用到这些函数。

我们可以用下面这个图来表示这个架构:

f2132cc00ffb14de0ba80589822b6689.png

这个图里面提到了 Schema 这个概念,它其实是实现了 K8S 中多版本转换的功能,它的实现原理中大量得使用了 Golang 中的 Reflect 机制,这一点会在之后的文章中详细介绍。

所以我们就先来看看 Deployment 的 Rest-Storage:

1// pkg/registry/apps/deployment/storage/storage.go
2
3func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST, *RollbackREST, error) {
4    store := &genericregistry.Store{
5        NewFunc:                  func() runtime.Object { return &apps.Deployment{} },
6        NewListFunc:              func() runtime.Object { return &apps.DeploymentList{} },
7

可以看到这里 NewFunc 和 NewListFunc 就是 上一节中所提到的 Deployment 的定义。除此之外 New() 方法也是返回的这个 apps.Deployment。注意这里的 Store 是一个 `genericregistry.Store`,它其实已经实现了所有的方法了,所以这个 Deployment 的 Rest-Storage 只需要改写一些方法就好。

而这个 NewRest 方法是如何被调用的呢,它的调用就会在 Group ("apps")下来进行多版本的调用,核代码如下:

 1// pkg/registry/apps/rest/storage_apps.go
2
3func (p RESTStorageProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool, error) {
4    apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apps.GroupName, legacyscheme.Scheme, legacyscheme.ParameterCodec, legacyscheme.Codecs)
5    if apiResourceConfigSource.VersionEnabled(appsapiv1beta1.SchemeGroupVersion) {
6        if storageMap, err := p.v1beta1Storage(apiResourceConfigSource, restOptionsGetter); err != nil {
7            return genericapiserver.APIGroupInfo{}, false, err
8        } else {
9            apiGroupInfo.VersionedResourcesStorageMap[appsapiv1beta1.SchemeGroupVersion.Version] = storageMap
10        }
11    }
12    if apiResourceConfigSource.VersionEnabled(appsapiv1beta2.SchemeGroupVersion) {
13        if storageMap, err := p.v1beta2Storage(apiResourceConfigSource, restOptionsGetter); err != nil {
14            return genericapiserver.APIGroupInfo{}, false, err
15        } else {
16            apiGroupInfo.VersionedResourcesStorageMap[appsapiv1beta2.SchemeGroupVersion.Version] = storageMap
17        }
18    }
19    if apiResourceConfigSource.VersionEnabled(appsapiv1.SchemeGroupVersion) {
20        if storageMap, err := p.v1Storage(apiResourceConfigSource, restOptionsGetter); err != nil {
21            return genericapiserver.APIGroupInfo{}, false, err
22        } else {
23            apiGroupInfo.VersionedResourcesStorageMap[appsapiv1.SchemeGroupVersion.Version] = storageMap
24        }
25    }
26
27    return apiGroupInfo, true, nil
28}

可以很明显得看到,这个 `VersionedResourcesStorageMap` 是一个 map 形如:

1VersionedResourcesStorageMap["v1beta1"] = v1beta1StorageMap
2
3VersionedResourcesStorageMap["v1beta2"] = v1beta2StorageMap
4
5VersionedResourcesStorageMap["v1"] = v1StorageMap

这个 StorageMap 也是一个 map,形如:

1storage["deployments"] = deploymentStorage.Deployment
2storage["deployments/status"] = deploymentStorage.Status
3storage["deployments/rollback"] = deploymentStorage.Rollback
4storage["deployments/scale"] = deploymentStorage.Scale

所以这样一来,就把上一篇所将的 Group,Version,Kind 给联系上了。那么这个这个 v1beta1 和 v1beta2 有什么区别呢?从代码中我们可以看出,在 v1beta2 之后,便废弃了 `deployment/status` 这个接口。

f0fe2e29158675aacfefdb9958d536e2.png

3.2 Deployment 方法与其策略

我们在以前提到过,这个 REST-Storage 会实现各种方法来最后提供给 HTTP 回调函数。而这个 Deployment 的 REST-Storage 是基于 genericregistry.Store 的,但是有些特殊的函数就需要改写,比如获得 Scope 的函数:

1// pkg/registry/apps/deployment/strategy.go
2
3func (deploymentStrategy) NamespaceScoped() bool {
4    return true
5}

这里就是返回 true,也就是之后在初始化路由的时候,会加上 “/namespaces/{namespaces}”,这样我们在访问 Deployment 资源的时候,就需要在请求中指明 namespace:

1// staging/src/k8s.io/apiserver/pkg/endpoints/installer.go
2
3namespaceParamName := "namespaces"
4namespacedPath := namespaceParamName + "/{namespace}/" + resource

当然,还有一些专属于 Deployment 的策略,比如我们知道在执行任何 Deployment 的变动时,其 Generation 字段都会加一,我们可以通过 `kubectl rollout history` 来查看,而实现这个加一的操作就是在 PrepareForUpdate 中实现的:

 1// pkg/registry/apps/deployment/strategy.go
2
3func (deploymentStrategy) PrepareForUpdate(ctx context.Context, obj, old runtime.Object) {
4    newDeployment := obj.(*apps.Deployment)
5    oldDeployment := old.(*apps.Deployment)
6    newDeployment.Status = oldDeployment.Status
7
8    if !apiequality.Semantic.DeepEqual(newDeployment.Spec, oldDeployment.Spec) ||
9        !apiequality.Semantic.DeepEqual(newDeployment.Annotations, oldDeployment.Annotations) {
10        newDeployment.Generation = oldDeployment.Generation + 1
11    }
12}

3.3 Deployment 封装返回值

这可能是一个并不重要的点,这里提出来是因为我之前刚好修过一个 K8S 的 bug,问题刚好就出在这里(不过不是 Deployment,是 ComponentStatus)。查询 Deployment 时,API-Server 从 etcd 拿到数据返回给客户端的时候,并不是直接返回的,而是做了一层表格的封装。

比如我们在执行 `kubectl get deployments` 的时候,得到结果是:

1[root@demo-master-a-1 ~]# kubectl get deployments/nginx
2NAME    READY   UP-TO-DATE   AVAILABLE   AGE
3nginx   1/1     1            1           8h

这个表格的字段,`UP-TO-DATE` 和 `AVAILABLE` 等,都是由一个叫 `TableConvertor` 的东西来实现的,而这个 `TableConvertor` 的初始化,也是在初始化 Deployment 的 REST-Storage 时定义好的。

1TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)}

在 pkg/printers/internalversion/printers.go 我们可以找到关于 Deployment 的表格定义:

 1deploymentColumnDefinitions := []metav1beta1.TableColumnDefinition{
2    {Name: "Name", Type: "string", Format: "name", Description: metav1.ObjectMeta{}.SwaggerDoc()["name"]},
3    {Name: "Ready", Type: "string", Description: "Number of the pod with ready state"},
4    {Name: "Up-to-date", Type: "string", Description: extensionsv1beta1.DeploymentStatus{}.SwaggerDoc()["updatedReplicas"]},
5    {Name: "Available", Type: "string", Description: extensionsv1beta1.DeploymentStatus{}.SwaggerDoc()["availableReplicas"]},
6    {Name: "Age", Type: "string", Description: metav1.ObjectMeta{}.SwaggerDoc()["creationTimestamp"]},
7    {Name: "Containers", Type: "string", Priority: 1, Description: "Names of each container in the template."},
8    {Name: "Images", Type: "string", Priority: 1, Description: "Images referenced by each container in the template."},
9    {Name: "Selector", Type: "string", Priority: 1, Description: extensionsv1beta1.DeploymentSpec{}.SwaggerDoc()["selector"]},
10}

3.4 Deployment 注册路由方法

回到了我们之前强调的 Container → WebService → Route,它最终的过程在 k8s.io/apiserver/pkg/endpoints/groupversion.go 中:

 1func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
2    prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
3    installer := &APIInstaller{
4        group:             g,
5        prefix:            prefix,
6        minRequestTimeout: g.MinRequestTimeout,
7    }
8
9    apiResources, ws, registrationErrors := installer.Install()
10    versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
11    versionDiscoveryHandler.AddToWebService(ws)
12    container.Add(ws)
13    return utilerrors.NewAggregate(registrationErrors)
14}

这个函数就是将 WebService 加入到了 Container 中,但是这个 WebService 已经是将 Route ( 路由 ) 给注册好了的,所以这个 `installer.Install()` 才是最关键的。

而这最关键的一步,注册路由,是在 k8s.io/apiserver/pkg/endpoints/installer.go 中的一个叫 `registerResourceHandlers` 中实现的,这个函数一共有 700 多行,可见其功能的重要性,不过简单来讲它也只做一件事情,就是将我们之前得到的 REST-Storage 给转换为一个个 Route,简化这个函数我们可以看到:

 1switch action.Verb {
2case "GET":
3case "LIST":
4case "PUT":
5case "PATCH":
6case "POST":
7case "DELETE":
8case "DELETECOLLECTION":
9case "WATCH":
10case "WATCHLIST":
11case "CONNECT":
12}

而这些方法调用就会最中调到etcd上的请求对数据库的操作。其本质上就是对etcd的增删查改(GET/POST/PUT/DELETE),代码的实现都在: k8s.io/apiserver/pkg/storage/etcd3/store.go 中。同样的,这部分关于 etcd 的讲解会在之后的 etcd 专题中涉及。

下期预告

上两节中,我们了解到 Kube-APIServer 中关于 API 的一些信息:

1. 有哪些 API 

2. 这个 API 是如何在代码层面实现的

但我们还未真正去追踪一个 API 请求,或者说去了解它的具体流程,在一个 API 请求进注册好的 API Handler 之后,直接将数据写进 etcd 吗?它还会经历什么呢?我们下期继续。 往期回顾

K8S API-Server 源码剖析(一)| 监听机制 List-Watch 剖析

K8S API-Server 源码剖析(二)| 探索 API

51313a4ea6a85717627ad15571728d32.png

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐