【kubernetes/k8s源码分析】kube-apiserver 启动
kubernetes v1.12一. 序言主要实现了功能一个是请求的路由和处理,简单说就是监听一个端口,把接收到的请求正确地转到相应的处理逻辑上,另一个功能就是认证及权限控制集群管理资源配额控制集群安全机制kube-apiserver --admission-control=NamespaceLifecycle,LimitRanger,ServiceAccount,Defa...
kubernetes v1.12
一. 序言
主要实现了功能
- 一个是请求的路由和处理,简单说就是监听一个端口,把接收到的请求正确地转到相应的处理逻辑上,另一个功能就是认证及权限控制
- 集群管理
- 资源配额控制
- 集群安全机制
kube-apiserver
--admission-control=NamespaceLifecycle,LimitRanger,ServiceAccount,DefaultStorageClass,ResourceQuota
--advertise-address=10.12.40.82
--bind-address=0.0.0.0
--insecure-bind-address=0.0.0.0
--authorization-mode=Node,RBAC
--runtime-config=rbac.authorization.k8s.io/v1alpha1
--kubelet-https=true
--experimental-bootstrap-token-auth
--token-auth-file=/etc/kubernetes/token.csv
--service-cluster-ip-range=10.254.0.0/16
--service-node-port-range=30000-32766
--tls-cert-file=/etc/kubernetes/ssl/kubernetes.pem
--tls-private-key-file=/etc/kubernetes/ssl/kubernetes-key.pem
--client-ca-file=/etc/kubernetes/ssl/ca.pem
--service-account-key-file=/etc/kubernetes/ssl/ca-key.pem
--etcd-cafile=/etc/kubernetes/ssl/ca.pem
--etcd-certfile=/etc/kubernetes/ssl/kubernetes.pem
--etcd-keyfile=/etc/kubernetes/ssl/kubernetes-key.pem
--etcd-servers=https://10.12.40.82:2379
--enable-swagger-ui=true --allow-privileged=true
--apiserver-count=2
--audit-log-maxage=30
--audit-log-maxbackup=3
--audit-log-maxsize=100
--audit-log-path=/var/lib/audit.log
--event-ttl=1h --logtostderr=false
--v=8
--log-dir=/var/log/kubernetes/kube-apiserver
--insecure-port=8090
查看支持的资源对象的种类:
# curl localhost:8080/api/v1集群中的Pod列表、Service列表、RC列表
# curl localhost:8080/api/v1/pods
# curl localhost:8080/api/v1/services
# curl localhost:8080/api/v1/replicationcontrollers
Proxy API 接口:
代理REST请求,把收到的REST请求转发到某个Node上kubelet守护进程的REST端口上,由该Kubelet进程负责响应。
1.1 go-restful
Route: 快速路由CurlyRouter支持正则表达式和动态参数,Route的设定包含:请求方法(http Method),请求路径(URL Path),输入输出类型(JSON/YAML)以及对应的回掉函数restful.RouteFunction,响应内容类型(Accept)等。
apiserver是增删改查watch的http/restful式服务端,数据最终存储在etc。server是无状态的,提供对数据访问的认证鉴权、缓存、版本适配转换等功能。
二. 关键数据结构
2.1 ServerRunOptions
位于 cmd/kube-apiserver/app/options/options.go,主要是运行apiserver的参数选项。
// ServerRunOptions runs a kubernetes api server.
type ServerRunOptions struct {
//服务器通用的参数选项
GenericServerRunOptions *genericoptions.ServerRunOptions
Etcd *genericoptions.EtcdOptions
SecureServing *genericoptions.SecureServingOptions
InsecureServing *kubeoptions.InsecureServingOptions
Audit *genericoptions.AuditLogOptions
Features *genericoptions.FeatureOptions
Authentication *kubeoptions.BuiltInAuthenticationOptions
Authorization *kubeoptions.BuiltInAuthorizationOptions
CloudProvider *kubeoptions.CloudProviderOptions
StorageSerialization *kubeoptions.StorageSerializationOptions
APIEnablement *kubeoptions.APIEnablementOptions
// 是否允许Pod中容器拥有超级权限
AllowPrivileged bool
// 事件可以保存事件,默认为1小时
EventTTL time.Duration
// kubelet配置
KubeletConfig kubeletclient.KubeletClientConfig
// 服务的节点端口
KubernetesServiceNodePort int
MasterCount int
// 每秒最大连接数
MaxConnectionBytesPerSec int64
// 服务集群IP范围,节点端口范围
ServiceClusterIPRange net.IPNet // TODO: make this a list
ServiceNodePortRange utilnet.PortRange
// 如果设置,可以使用SSH用户名和私钥对Node访问
SSHKeyfile string
SSHUser string
ProxyClientCertFile string
ProxyClientKeyFile string
2.2 ServerRunOptions
位于 k8s.io/apiserver/pkg/server/options/server_run_options.go,主要是运行apiserver的通用参数选项。
// ServerRunOptions contains the options while running a generic api server.
type ServerRunOptions struct {
// 准入控制, AlwaysAdmint / LimitRanger / ResourceQuota
AdmissionControl string
// 准入控制配置文件
AdmissionControlConfigFile string
// 暴露给集群中成员自己的IP地址,默认使用--bind-address
AdvertiseAddress net.IP
// Cors 跨域资源共享
CorsAllowedOriginList []string
// master 对外的地址
ExternalHost string
// 同时处理的最大数,默认400。仅用于长请求
MaxRequestsInFlight int
MaxMutatingRequestsInFlight int
// 最小请求超时时间,默认1800s,仅用于wantch
MinRequestTimeout int
TargetRAMMB int
// 设置资源对象wantch缓存大小列表,以逗号分割
WatchCacheSizes []string
}
2.3 kubeletClientConfig
位于 pkg/kubelet/client。Port运行的kubelet默认10250,
type KubeletClientConfig struct {
// Default port - used if no information about Kubelet port can be found in Node.NodeStatus.DaemonEndpoints.
Port uint
ReadOnlyPort uint
EnableHttps bool
// PreferredAddressTypes - used to select an address from Node.NodeStatus.Addresses
PreferredAddressTypes []string
// TLSClientConfig contains settings to enable transport layer security
restclient.TLSClientConfig
// Server requires Bearer authentication
BearerToken string
// HTTPTimeout is used by the client to timeout http requests to Kubelet.
HTTPTimeout time.Duration
// Dial is a custom dialer used for the client
Dial utilnet.DialFunc
}
三. apiserver启动-main入口函数
位于 cmd/kube-apiserver/apiserver.go
rand.Seed设置随机数种子,加上这行代码,可以保证每次随机都是随机的
func main() {
rand.Seed(time.Now().UTC().UnixNano())
command := app.NewAPIServerCommand(server.SetupSignalHandler())
// TODO: once we switch everything over to Cobra commands, we can go back to calling
// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
// normalize func and add the go flag set by hand.
pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)
pflag.CommandLine.AddGoFlagSet(goflag.CommandLine)
// utilflag.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()
if err := command.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
}
3.1 NewAPIServerCommand函数
使用了comand cobra包,NewServerRunOptions设置默认参数,实例化serverRunOptions对象
Complete函数设置默认的参数(Should be called after kube-apiserver flags parsed)
// NewAPIServerCommand creates a *cobra.Command object with default parameters
func NewAPIServerCommand(stopCh <-chan struct{}) *cobra.Command {
s := options.NewServerRunOptions()
cmd := &cobra.Command{
RunE: func(cmd *cobra.Command, args []string) error {
verflag.PrintAndExitIfRequested()
utilflag.PrintFlags(cmd.Flags())
// set default options
completedOptions, err := Complete(s)
if err != nil {
return err
}
// validate options
if errs := completedOptions.Validate(); len(errs) != 0 {
return utilerrors.NewAggregate(errs)
}
return Run(completedOptions, stopCh)
},
}
fs := cmd.Flags()
namedFlagSets := s.Flags()
for _, f := range namedFlagSets.FlagSets {
fs.AddFlagSet(f)
}
return cmd
}
3.1.1 NewServerRunOptions函数
NewServerRunOptions函数创建一个serverRunOptions结构体,其中包括初始化kube-apiserver运行参数,etcd参数,kubelet客户端参数配置等
// NewServerRunOptions creates a new ServerRunOptions object with default parameters
func NewServerRunOptions() *ServerRunOptions {
s := ServerRunOptions{
GenericServerRunOptions: genericoptions.NewServerRunOptions(),
Etcd: genericoptions.NewEtcdOptions(storagebackend.NewDefaultConfig(kubeoptions.DefaultEtcdPathPrefix, nil)),
SecureServing: kubeoptions.NewSecureServingOptions(),
InsecureServing: kubeoptions.NewInsecureServingOptions(),
Audit: genericoptions.NewAuditOptions(),
Features: genericoptions.NewFeatureOptions(),
Admission: kubeoptions.NewAdmissionOptions(),
Authentication: kubeoptions.NewBuiltInAuthenticationOptions().WithAll(),
Authorization: kubeoptions.NewBuiltInAuthorizationOptions(),
CloudProvider: kubeoptions.NewCloudProviderOptions(),
StorageSerialization: kubeoptions.NewStorageSerializationOptions(),
APIEnablement: genericoptions.NewAPIEnablementOptions(),
EnableLogsHandler: true,
EventTTL: 1 * time.Hour,
MasterCount: 1,
EndpointReconcilerType: string(reconcilers.LeaseEndpointReconcilerType),
KubeletConfig: kubeletclient.KubeletClientConfig{
Port: ports.KubeletPort,
ReadOnlyPort: ports.KubeletReadOnlyPort,
PreferredAddressTypes: []string{
// --override-hostname
string(api.NodeHostName),
// internal, preferring DNS if reported
string(api.NodeInternalDNS),
string(api.NodeInternalIP),
// external, preferring DNS if reported
string(api.NodeExternalDNS),
string(api.NodeExternalIP),
},
EnableHttps: true,
HTTPTimeout: time.Duration(5) * time.Second,
},
ServiceNodePortRange: kubeoptions.DefaultServiceNodePortRange,
}
s.ServiceClusterIPRange = kubeoptions.DefaultServiceIPCIDR
// Overwrite the default for storage data format.
s.Etcd.DefaultStorageMediaType = "application/vnd.kubernetes.protobuf"
return &s
}
四. 启动初始配置
4.1 NewServerRunOptions 函数
NewServerRunOptions初始化apiserver的基本配置,主要在NewConfig函数中
func NewServerRunOptions() *ServerRunOptions {
defaults := server.NewConfig(serializer.CodecFactory{})
return &ServerRunOptions{
MaxRequestsInFlight: defaults.MaxRequestsInFlight,
MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight,
RequestTimeout: defaults.RequestTimeout,
MinRequestTimeout: defaults.MinRequestTimeout,
}
}
// NewConfig returns a Config struct with the default values
func NewConfig(codecs serializer.CodecFactory) *Config {
return &Config{
Serializer: codecs,
ReadWritePort: 443,
RequestContextMapper: apirequest.NewRequestContextMapper(),
BuildHandlerChainFunc: DefaultBuildHandlerChain,
HandlerChainWaitGroup: new(utilwaitgroup.SafeWaitGroup),
LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix),
DisabledPostStartHooks: sets.NewString(),
HealthzChecks: []healthz.HealthzChecker{healthz.PingHealthz},
EnableIndex: true,
EnableDiscovery: true,
EnableProfiling: true,
MaxRequestsInFlight: 400,
MaxMutatingRequestsInFlight: 200,
RequestTimeout: time.Duration(60) * time.Second,
MinRequestTimeout: 1800,
EnableAPIResponseCompression: utilfeature.DefaultFeatureGate.Enabled(features.APIResponseCompression),
// Default to treating watch as a long-running operation
// Generic API servers have no inherent long-running subresources
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
}
}
4.2 NewEtcdOptions 函数
NewEtcdOptions初始化etcd后端配置,etcd默认路径前缀为/registry,默认存储为json类型
func NewEtcdOptions(backendConfig *storagebackend.Config) *EtcdOptions {
return &EtcdOptions{
StorageConfig: *backendConfig,
DefaultStorageMediaType: "application/json",
DeleteCollectionWorkers: 1,
EnableGarbageCollection: true,
EnableWatchCache: true,
DefaultWatchCacheSize: 100,
}
}
4.3 NewSecureServingOptions 函数
NewSecureServingOptions绑定网卡地址,默认为0.0.0.0,默认端口为6443
func NewSecureServingOptions() *genericoptions.SecureServingOptions {
return &genericoptions.SecureServingOptions{
BindAddress: net.ParseIP("0.0.0.0"),
BindPort: 6443,
ServerCert: genericoptions.GeneratableKeyCert{
PairName: "apiserver",
CertDirectory: "/var/run/kubernetes",
},
}
}
4.4 NewInSecureServingOptions 函数
NewInSecureServingOptions,主要默认绑定127.0.0.1,使用8080端口,没有验证,不安全端口
// NewInsecureServingOptions is for creating an unauthenticated, unauthorized, insecure port.
// No one should be using these anymore.
func NewInsecureServingOptions() *InsecureServingOptions {
return &InsecureServingOptions{
BindAddress: net.ParseIP("127.0.0.1"),
BindPort: 8080,
}
}
4.5 NewFeatureOptions 函数
NewFeatureOptions函数,主要调用NewConfig函数生成默认得配置
func NewFeatureOptions() *FeatureOptions {
defaults := server.NewConfig(serializer.CodecFactory{})
return &FeatureOptions{
EnableProfiling: defaults.EnableProfiling,
EnableContentionProfiling: defaults.EnableContentionProfiling,
EnableSwaggerUI: defaults.EnableSwaggerUI,
}
}
4.5.1 NewConfig函数
NewConfig生成默认配置,包括序列化,读写端口443,服务限流控制(MaxRequestsInFlight)等,请求超时时间等
// NewConfig returns a Config struct with the default values
func NewConfig(codecs serializer.CodecFactory) *Config {
return &Config{
Serializer: codecs,
ReadWritePort: 443,
RequestContextMapper: apirequest.NewRequestContextMapper(),
BuildHandlerChainFunc: DefaultBuildHandlerChain,
LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix),
DisabledPostStartHooks: sets.NewString(),
HealthzChecks: []healthz.HealthzChecker{healthz.PingHealthz},
EnableIndex: true,
EnableDiscovery: true,
EnableProfiling: true,
MaxRequestsInFlight: 400,
MaxMutatingRequestsInFlight: 200,
RequestTimeout: time.Duration(60) * time.Second,
MinRequestTimeout: 1800,
EnableAPIResponseCompression: utilfeature.DefaultFeatureGate.Enabled(features.APIResponseCompression),
// Default to treating watch as a long-running operation
// Generic API servers have no inherent long-running subresources
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
}
}
4.6 NewAdmissionOptions 函数
NewAdmissionOptions函数注册权限控制
func NewAdmissionOptions() *AdmissionOptions {
options := &AdmissionOptions{
Plugins: &admission.Plugins{},
PluginNames: []string{},
}
server.RegisterAllAdmissionPlugins(options.Plugins)
return options
}
kube-apiServer的运行参数初始化步骤讲解大部分,s.AddFlags(pflag.CommandLine)获取命令行的信息,对初始化参数进行更新
五. run函数启动
Run函数位于cmd/kube-apiserver/app/server.go,启动实例不退出的运行
// Run runs the specified APIServer. This should never exit.
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
glog.Infof("Version: %+v", version.Get())
server, err := CreateServerChain(completeOptions, stopCh)
if err != nil {
return err
}
return server.PrepareRun().Run(stopCh)
}
5.1 CreateServerChain 函数
CreateNodeDialer: 创建到节点拨号连接。设置网络隧道,如果在云平台中,则需要安装本机的SSH Key到Kubernetes集群中所有节点上,可通过用户名和私钥,SSH到node节点
// CreateNodeDialer creates the dialer infrastructure to connect to the nodes.
func CreateNodeDialer(s *options.ServerRunOptions) (tunneler.Tunneler, *http.Transport, error) {
// Setup nodeTunneler if needed
var nodeTunneler tunneler.Tunneler
var proxyDialerFn utilnet.DialFunc
if len(s.SSHUser) > 0 {
。。。。。。。。。
}
// Proxying to pods and services is IP-based... don't expect to be able to verify the hostname
proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}
proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
Dial: proxyDialerFn,
TLSClientConfig: proxyTLSClientConfig,
})
return nodeTunneler, proxyTransport, nil
}
5.2 CreateKubeAPIServerConfig 函数
CreateKubeAPIServerConfig函数创建运行API Server的配置,设置默认的advertise address,service Ip range,storage,etcd等设置
5.2.1 CreateKubeServer 函数
CreateKubeServer函数比较核心,作用:
- 1. 对
kubeAPIServerConfig
配置信息的检查,如果有空缺的字段则填补- 2. install
/api/v1
开头的的REST API,函数在pkg/master/master.go
文件中的InstallLegacyAPI()
方法
// CreateKubeAPIServer creates and wires a workable kube-apiserver
func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget, sharedInformers informers.SharedInformerFactory, versionedInformers clientgoinformers.SharedInformerFactory) (*master.Master, error) {
kubeAPIServer, err := kubeAPIServerConfig.Complete(versionedInformers).New(delegateAPIServer)
if err != nil {
return nil, err
}
kubeAPIServer.GenericAPIServer.AddPostStartHook("start-kube-apiserver-informers", func(context genericapiserver.PostStartHookContext) error {
sharedInformers.Start(context.StopCh)
return nil
})
return kubeAPIServer, nil
}
5.2.2 shareInformers 缓存器
如kube-scheduler、kubelet等跟apiserver都是通过缓存间接通信的,SharedInformer
一方面收集客户端和其它组件的请求,一方面负责通知该事件的关注者,还有它会把缓存中的数据同步到ETCD中。
/ SharedInformerFactory provides shared informers for resources in all known
// API group versions.
type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
Admissionregistration() admissionregistration.Interface
Apps() apps.Interface
Autoscaling() autoscaling.Interface
Batch() batch.Interface
Certificates() certificates.Interface
Core() core.Interface
Extensions() extensions.Interface
Networking() networking.Interface
Policy() policy.Interface
Rbac() rbac.Interface
Scheduling() scheduling.Interface
Settings() settings.Interface
Storage() storage.Interface
}
前面巴拉巴拉整了一大队初始化配置等,直接略过,直奔主要的解析
六. NonBlockingRun函数启动
6.1 NonBlockingRun 函数
主要函数serveSecurely创建安全的HTTP server,在指定端口监听请求
// NonBlockingRun spawns the secure http server. An error is
// returned if the secure port cannot be listened on.
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
// Use an internal stop channel to allow cleanup of the listeners on error.
internalStopCh := make(chan struct{})
if s.SecureServingInfo != nil && s.Handler != nil {
if err := s.serveSecurely(internalStopCh); err != nil {
close(internalStopCh)
return err
}
}
。。。。。。。
}
s.Handler是所有REST API对应的handler,读取配置中提供的证书、CA、私钥等,注入到Server对象中,最后调用RunServer在指定端口serve
func (s *GenericAPIServer) serveSecurely(stopCh <-chan struct{}) error {
secureServer := &http.Server{
Addr: s.SecureServingInfo.BindAddress,
Handler: s.Handler,
MaxHeaderBytes: 1 << 20,
TLSConfig: &tls.Config{
NameToCertificate: s.SecureServingInfo.SNICerts,
// Can't use SSLv3 because of POODLE and BEAST
// Can't use TLSv1.0 because of POODLE and BEAST using CBC cipher
// Can't use TLSv1.1 because of RC4 cipher usage
MinVersion: tls.VersionTLS12,
// enable HTTP2 for go's 1.7 HTTP Server
NextProtos: []string{"h2", "http/1.1"},
},
}
if s.SecureServingInfo.MinTLSVersion > 0 {
secureServer.TLSConfig.MinVersion = s.SecureServingInfo.MinTLSVersion
}
if len(s.SecureServingInfo.CipherSuites) > 0 {
secureServer.TLSConfig.CipherSuites = s.SecureServingInfo.CipherSuites
}
if s.SecureServingInfo.Cert != nil {
secureServer.TLSConfig.Certificates = []tls.Certificate{*s.SecureServingInfo.Cert}
}
// append all named certs. Otherwise, the go tls stack will think no SNI processing
// is necessary because there is only one cert anyway.
// Moreover, if ServerCert.CertFile/ServerCert.KeyFile are not set, the first SNI
// cert will become the default cert. That's what we expect anyway.
for _, c := range s.SecureServingInfo.SNICerts {
secureServer.TLSConfig.Certificates = append(secureServer.TLSConfig.Certificates, *c)
}
if s.SecureServingInfo.ClientCA != nil {
// Populate PeerCertificates in requests, but don't reject connections without certificates
// This allows certificates to be validated by authenticators, while still allowing other auth types
secureServer.TLSConfig.ClientAuth = tls.RequestClientCert
// Specify allowed CAs for client certificates
secureServer.TLSConfig.ClientCAs = s.SecureServingInfo.ClientCA
}
glog.Infof("Serving securely on %s", s.SecureServingInfo.BindAddress)
var err error
s.effectiveSecurePort, err = RunServer(secureServer, s.SecureServingInfo.BindNetwork, stopCh)
return err
}
七. API 重要结构体
7.1 APIGroupVersion 结构体
APIGroupVersion 对API资源的组织:
- Storage是etcd的接口,map类型,每一种资源都与etcd建立一个连接
- GroupVersion属于哪个Group version
- Serializer用于序列化,反序列化
- Convertor提供不同版本转化的接口
- Mapper实现了RESTMapper接口
// APIGroupVersion is a helper for exposing rest.Storage objects as http.Handlers via go-restful
// It handles URLs of the form:
// /${storage_key}[/${object_name}]
// Where 'storage_key' points to a rest.Storage object stored in storage.
// This object should contain all parameterization necessary for running a particular API version
type APIGroupVersion struct {
Storage map[string]rest.Storage
Root string
// GroupVersion is the external group version
GroupVersion schema.GroupVersion
// OptionsExternalVersion controls the Kubernetes APIVersion used for common objects in the apiserver
// schema like api.Status, api.DeleteOptions, and metav1.ListOptions. Other implementors may
// define a version "v1beta1" but want to use the Kubernetes "v1" internal objects. If
// empty, defaults to GroupVersion.
OptionsExternalVersion *schema.GroupVersion
// MetaGroupVersion defaults to "meta.k8s.io/v1" and is the scheme group version used to decode
// common API implementations like ListOptions. Future changes will allow this to vary by group
// version (for when the inevitable meta/v2 group emerges).
MetaGroupVersion *schema.GroupVersion
Mapper meta.RESTMapper
// Serializer is used to determine how to convert responses from API methods into bytes to send over
// the wire.
Serializer runtime.NegotiatedSerializer
ParameterCodec runtime.ParameterCodec
Typer runtime.ObjectTyper
Creater runtime.ObjectCreater
Convertor runtime.ObjectConvertor
Copier runtime.ObjectCopier
Defaulter runtime.ObjectDefaulter
Linker runtime.SelfLinker
UnsafeConvertor runtime.ObjectConvertor
Admit admission.Interface
Context request.RequestContextMapper
MinRequestTimeout time.Duration
// EnableAPIResponseCompression indicates whether API Responses should support compression
// if the client requests it via Accept-Encoding
EnableAPIResponseCompression bool
}
创建接口newAPIGroupVersion,位于staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go
7.2 APIGroupInfo 结构体
APIGroupInfo 结构体位于staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go
GroupMeta: 元信息
VersionedResourceStrorageMap: 不同版本的storage
// Info about an API group.
type APIGroupInfo struct {
GroupMeta apimachinery.GroupMeta
// Info about the resources in this group. Its a map from version to resource to the storage.
VersionedResourcesStorageMap map[string]map[string]rest.Storage
// OptionsExternalVersion controls the APIVersion used for common objects in the
// schema like api.Status, api.DeleteOptions, and metav1.ListOptions. Other implementors may
// define a version "v1beta1" but want to use the Kubernetes "v1" internal objects.
// If nil, defaults to groupMeta.GroupVersion.
// TODO: Remove this when https://github.com/kubernetes/kubernetes/issues/19018 is fixed.
OptionsExternalVersion *schema.GroupVersion
// MetaGroupVersion defaults to "meta.k8s.io/v1" and is the scheme group version used to decode
// common API implementations like ListOptions. Future changes will allow this to vary by group
// version (for when the inevitable meta/v2 group emerges).
MetaGroupVersion *schema.GroupVersion
// Scheme includes all of the types used by this group and how to convert between them (or
// to convert objects from outside of this group that are accepted in this API).
// TODO: replace with interfaces
Scheme *runtime.Scheme
// NegotiatedSerializer controls how this group encodes and decodes data
NegotiatedSerializer runtime.NegotiatedSerializer
// ParameterCodec performs conversions for query parameters passed to API calls
ParameterCodec runtime.ParameterCodec
}
7.3 Scheme 结构体
Scheme结构体用于API资源之间的序列化、反序列化、版本转换。
// Schemes are not expected to change at runtime and are only threadsafe after
// registration is complete.
type Scheme struct {
// versionMap allows one to figure out the go type of an object with
// the given version and name.
gvkToType map[schema.GroupVersionKind]reflect.Type
// typeToGroupVersion allows one to find metadata for a given go object.
// The reflect.Type we index by should *not* be a pointer.
typeToGVK map[reflect.Type][]schema.GroupVersionKind
// unversionedTypes are transformed without conversion in ConvertToVersion.
unversionedTypes map[reflect.Type]schema.GroupVersionKind
// unversionedKinds are the names of kinds that can be created in the context of any group
// or version
// TODO: resolve the status of unversioned types.
unversionedKinds map[string]reflect.Type
// Map from version and resource to the corresponding func to convert
// resource field labels in that version to internal version.
fieldLabelConversionFuncs map[string]map[string]FieldLabelConversionFunc
// defaulterFuncs is an array of interfaces to be called with an object to provide defaulting
// the provided object must be a pointer.
defaulterFuncs map[reflect.Type]func(interface{})
// converter stores all registered conversion functions. It also has
// default coverting behavior.
converter *conversion.Converter
// cloner stores all registered copy functions. It also has default
// deep copy behavior.
cloner *conversion.Cloner
}
八. API 资源注册
当API资源初始化完成以后,需要将这些API资源注册为restful api,用来接收用户的请求。
kube-apiServer使用了go-restful框架,主要包括三种对象:
- Container: 一个Container包含多个WebService
- WebService: 一个WebService包含多条route
- Route: 一条route包含一个method(GET、POST、DELETE等),一条具体的path(URL)以及一个响应的handler function
API注册的入口函数有两个: m.InstallAPIs 和 m.InstallLegacyAPI。分别注册/api和/apis的API,这些接口都是在config.Complete().New()函数调用:路径pkg/master/master.go
8.1 New 函数
New函数路径:/pkg/master/master.go
New函数根据配置参数创建一个master实例,InstallLegacyAPI进行 /api安装
// New returns a new instance of Master from the given config.
// Certain config fields will be set to a default value if unset.
// Certain config fields must be specified, including:
// KubeletClientConfig
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {
// install legacy rest storage
if c.ExtraConfig.APIResourceConfigSource.AnyResourcesForVersionEnabled(apiv1.SchemeGroupVersion) {
legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
StorageFactory: c.ExtraConfig.StorageFactory,
ProxyTransport: c.ExtraConfig.ProxyTransport,
KubeletClientConfig: c.ExtraConfig.KubeletClientConfig,
EventTTL: c.ExtraConfig.EventTTL,
ServiceIPRange: c.ExtraConfig.ServiceIPRange,
ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
}
m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider)
}
m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...)
return m, nil
}
8.2 InstallLegacyAPI 函数
func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) {
legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
if err != nil {
glog.Fatalf("Error building core storage: %v", err)
}
if c.ExtraConfig.EnableCoreControllers {
controllerName := "bootstrap-controller"
coreClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient)
m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)
}
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
glog.Fatalf("Error in registering group versions: %v", err)
}
}
8.3 NewRestLegacyStorage 函数
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
"pods/attach": podStorage.Attach,
..........................
"persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
"configMaps": configMapStorage,
"componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
}
if legacyscheme.Registry.IsEnabledVersion(schema.GroupVersion{Group: "autoscaling", Version: "v1"}) {
restStorageMap["replicationControllers/scale"] = controllerStorage.Scale
}
if legacyscheme.Registry.IsEnabledVersion(schema.GroupVersion{Group: "policy", Version: "v1beta1"}) {
restStorageMap["pods/eviction"] = podStorage.Eviction
}
apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap
8.4 InstallLegacyAPIGroup 函数
func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
if !s.legacyAPIGroupPrefixes.Has(apiPrefix) {
return fmt.Errorf("%q is not in the allowed legacy API prefixes: %v", apiPrefix, s.legacyAPIGroupPrefixes.List())
}
if err := s.installAPIResources(apiPrefix, apiGroupInfo); err != nil {
return err
}
// setup discovery
apiVersions := []string{}
for _, groupVersion := range apiGroupInfo.GroupMeta.GroupVersions {
apiVersions = append(apiVersions, groupVersion.Version)
}
// Install the version handler.
// Add a handler at /<apiPrefix> to enumerate the supported api versions.
s.Handler.GoRestfulContainer.Add(discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix, apiVersions, s.requestContextMapper).WebService())
return nil
}
8.5 installAPIResource 函数
// installAPIResources is a private method for installing the REST storage backing each api groupversionresource
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
for _, groupVersion := range apiGroupInfo.GroupMeta.GroupVersions {
if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
glog.Warningf("Skipping API %v because it has no resources.", groupVersion)
continue
}
apiGroupVersion := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
if apiGroupInfo.OptionsExternalVersion != nil {
apiGroupVersion.OptionsExternalVersion = apiGroupInfo.OptionsExternalVersion
}
if err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer); err != nil {
return fmt.Errorf("Unable to setup API %v: %v", apiGroupInfo, err)
}
}
return nil
}
// InstallAPIs will install the APIs for the restStorageProviders if they are enabled.
func (m *Master) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) {
apiGroupsInfo := []genericapiserver.APIGroupInfo{}
for _, restStorageBuilder := range restStorageProviders {
apiGroupInfo, enabled := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo)
}
for i := range apiGroupsInfo {
if err := m.GenericAPIServer.InstallAPIGroup(&apiGroupsInfo[i]); err != nil {
glog.Fatalf("Error in registering group versions: %v", err)
}
}
}
更多推荐
所有评论(0)