k8s apiserver之启动执行流程总览一
apiserver之启动执行流程总览启动执行流程总览apiserver之启动执行流程总览启动执行流程总览server.go启动执行流程总览本文主要分析kubernetes在启动kube-apiserver的执行流程,本文不作深入分析,后续会展开server.go函数// 启动命令行的实现func NewAPIServerCommand() *cobra.Command {// 设置默认的apise
·
k8s apiserver之启动执行流程总览一
启动执行流程总览一
本文主要分析kubernetes在启动kube-apiserver的执行流程,本文不作深入分析,后续会展开
server.go
- 函数
// 启动命令行的实现
func NewAPIServerCommand() *cobra.Command {
// 设置默认的apiserver启动选项
s := options.NewServerRunOptions()
cmd := &cobra.Command{
Use: "kube-apiserver",
Long: `The Kubernetes API server validates and configures data
for the api objects which include pods, services, replicationcontrollers, and
others. The API Server services REST operations and provides the frontend to the
cluster's shared state through which all other components interact.`,
SilenceUsage: true,
PersistentPreRunE: func(*cobra.Command, []string) error {
rest.SetDefaultWarningHandler(rest.NoWarnings{})
return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
verflag.PrintAndExitIfRequested()
fs := cmd.Flags()
if err := s.Logs.ValidateAndApply(utilfeature.DefaultFeatureGate); err != nil {
return err
}
cliflag.PrintFlags(fs)
// 完成默认为初始化的配置。该方法在解析flags后必须被调用
completedOptions, err := Complete(s)
if err != nil {
return err
}
// 1.对completedOptions中有些参数根据命令行的设置是否需要override,进行重写 2.验证completedOptions 中的选项是否正确
if errs := completedOptions.Validate(); len(errs) != 0 {
return utilerrors.NewAggregate(errs)
}
return Run(completedOptions, genericapiserver.SetupSignalHandler())
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
if len(arg) > 0 {
return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
}
}
return nil
},
}
// 设置flag标识
fs := cmd.Flags()
namedFlagSets := s.Flags()
verflag.AddFlags(namedFlagSets.FlagSet("global"))
globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name(), logs.SkipLoggingConfigurationFlags())
options.AddCustomGlobalFlags(namedFlagSets.FlagSet("generic"))
for _, f := range namedFlagSets.FlagSets {
fs.AddFlagSet(f)
}
cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
cliflag.SetUsageAndHelpFunc(cmd, namedFlagSets, cols)
return cmd
}
// Run 运行指定的 APIServer。
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
// 为了帮助调试,立即记录版本 -- 注意:此版本不是k8s的version
klog.Infof("Version: %+v", version.Get())
// 日志记录go相关的设置
klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
// 通过delegation(代理)创建server(其实是代理聚合后的server)
server, err := CreateServerChain(completeOptions, stopCh)
if err != nil {
return err
}
// 在启动apiserver 之前做的准备工作 -- 包括:通过设置 OpenAPI 规范并调用通用apiserver PrepareRun 来准备运行聚合服务。
prepared, err := server.PrepareRun()
if err != nil {
return err
}
// 启动聚合服务
return prepared.Run(stopCh)
}
// 通过delegation(代理)创建server(其实是代理聚合后的server)
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
// 创建运行 API 服务器的所有资源,但不运行任何资源
kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)
if err != nil {
return nil, err
}
// 构建apiextensionsapiserver.Config扩展配置 -- 其实就是包装了通用apiserver配置和其他额外的配置
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig, kubeAPIServerConfig.GenericConfig.TracerProvider))
if err != nil {
return nil, err
}
// 返回一个 HTTP 处理程序,该处理程序旨在在委托链的末尾执行。它检查是否在服务器安装所有已知的 HTTP 路径之前发出了请求。在这种情况下,它返回 503 响应,否则返回 404
notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
// 1.完成apiextensionsConfig的完全配置 2.创建一个包含通用apiserver(暴露group为"apiextensions.k8s.io"的api,支持crd等操作)的扩展服务
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
if err != nil {
return nil, err
}
// 创建kube-apiserver服务
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
if err != nil {
return nil, err
}
// 创建聚合配置
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, kubeAPIServerConfig.ExtraConfig.ProxyTransport, pluginInitializer)
if err != nil {
return nil, err
}
// 通过聚合配置创建聚合服务
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
if err != nil {
return nil, err
}
return aggregatorServer, nil
}
// 创建并连接一个可行的 kube-apiserver
func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
if err != nil {
return nil, err
}
return kubeAPIServer, nil
}
// 创建拨号器基础结构(隧道和传输层)以连接到节点。
func CreateProxyTransport() *http.Transport {
var proxyDialerFn utilnet.DialFunc
// 代理到 pod 和基于 IP 的services......但是不能够验证hostname主机名
proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}
// 生成传输层
proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
DialContext: proxyDialerFn,
TLSClientConfig: proxyTLSClientConfig,
})
return proxyTransport
}
// 创建运行 API 服务器的所有资源,但不运行任何资源
func CreateKubeAPIServerConfig(s completedServerRunOptions) (
*controlplane.Config,
aggregatorapiserver.ServiceResolver,
[]admission.PluginInitializer,
error,
) {
// 创建拨号器基础结构(隧道和传输层)以连接到节点。
proxyTransport := CreateProxyTransport()
// BuildGenericConfig 采用ServerRunOptions并生成与之关联的 genericapiserver.Config(kube-apiserver的通用配置)
genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
if err != nil {
return nil, nil, nil, err
}
capabilities.Initialize(capabilities.Capabilities{
// 是否允许特权
AllowPrivileged: s.AllowPrivileged,
PrivilegedSources: capabilities.PrivilegedSources{
HostNetworkSources: []string{},
HostPIDSources: []string{},
HostIPCSources: []string{},
},
// 限制每个连接的吞吐量(目前只用于proxy、exec、attach方式)
PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec,
})
// 应用指标选项 -- 执行些预置操作,比如启用 禁用指标
s.Metrics.Apply()
// 注册指标收集器
serviceaccount.RegisterMetrics()
// 构造控制平面controlplane的配置
config := &controlplane.Config{
GenericConfig: genericConfig,
ExtraConfig: controlplane.ExtraConfig{
APIResourceConfigSource: storageFactory.APIResourceConfigSource,
StorageFactory: storageFactory,
EventTTL: s.EventTTL,
KubeletClientConfig: s.KubeletConfig,
EnableLogsSupport: s.EnableLogsHandler,
ProxyTransport: proxyTransport,
ServiceIPRange: s.PrimaryServiceClusterIPRange,
APIServerServiceIP: s.APIServerServiceIP,
SecondaryServiceIPRange: s.SecondaryServiceClusterIPRange,
APIServerServicePort: 443,
ServiceNodePortRange: s.ServiceNodePortRange,
KubernetesServiceNodePort: s.KubernetesServiceNodePort,
EndpointReconcilerType: reconcilers.Type(s.EndpointReconcilerType),
MasterCount: s.MasterCount,
ServiceAccountIssuer: s.ServiceAccountIssuer,
ServiceAccountMaxExpiration: s.ServiceAccountTokenMaxExpiration,
ExtendExpiration: s.Authentication.ServiceAccounts.ExtendExpiration,
VersionedInformers: versionedInformers,
IdentityLeaseDurationSeconds: s.IdentityLeaseDurationSeconds,
IdentityLeaseRenewIntervalSeconds: s.IdentityLeaseRenewIntervalSeconds,
},
}
// 获取 用来获取并验证证书内容的提供器
clientCAProvider, err := s.Authentication.ClientCert.GetClientCAContentProvider()
if err != nil {
return nil, nil, nil, err
}
// 设置controlplane配置的验证证书内容的提供器
config.ExtraConfig.ClusterAuthenticationInfo.ClientCA = clientCAProvider
// 用来设置controlplane配置的请求头信息(包括认证证书等)
requestHeaderConfig, err := s.Authentication.RequestHeader.ToAuthenticationRequestHeaderConfig()
if err != nil {
return nil, nil, nil, err
}
if requestHeaderConfig != nil {
config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderCA = requestHeaderConfig.CAContentProvider
config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderAllowedNames = requestHeaderConfig.AllowedClientNames
config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderExtraHeaderPrefixes = requestHeaderConfig.ExtraHeaderPrefixes
config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderGroupHeaders = requestHeaderConfig.GroupHeaders
config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderUsernameHeaders = requestHeaderConfig.UsernameHeaders
}
// 添加PostStartHook钩子函数
if err := config.GenericConfig.AddPostStartHook("start-kube-apiserver-admission-initializer", admissionPostStartHook); err != nil {
return nil, nil, nil, err
}
// 分发流量 -- controlplane cluster etcd等
if config.GenericConfig.EgressSelector != nil {
// 使用config.GenericConfig.EgressSelector loop查找以找到连接到 kubelet 的拨号器
config.ExtraConfig.KubeletClientConfig.Lookup = config.GenericConfig.EgressSelector.Lookup
// 使用 config.GenericConfig.EgressSelector 查找作为“代理”子资源使用的传输
networkContext := egressselector.Cluster.AsNetworkContext()
dialer, err := config.GenericConfig.EgressSelector.Lookup(networkContext)
if err != nil {
return nil, nil, nil, err
}
c := proxyTransport.Clone()
// 这里会替换拨号器
c.DialContext = dialer
config.ExtraConfig.ProxyTransport = c
}
// 加载公钥
var pubKeys []interface{}
// 遍历serviceaccount的认证公私钥文件
for _, f := range s.Authentication.ServiceAccounts.KeyFiles {
// 从公私钥文件中获取公钥
keys, err := keyutil.PublicKeysFromFile(f)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to parse key file %q: %v", f, err)
}
// 追加到公钥数组中
pubKeys = append(pubKeys, keys...)
}
// 设置serviceaccount的证书标识及公钥
config.ExtraConfig.ServiceAccountIssuerURL = s.Authentication.ServiceAccounts.Issuers[0]
config.ExtraConfig.ServiceAccountJWKSURI = s.Authentication.ServiceAccounts.JWKSURI
config.ExtraConfig.ServiceAccountPublicKeys = pubKeys
return config, serviceResolver, pluginInitializers, nil
}
// 采用ServerRunOptions并生成与之关联的 genericapiserver.Config(kube-apiserver的通用配置)
func buildGenericConfig(
s *options.ServerRunOptions,
proxyTransport *http.Transport,
) (
genericConfig *genericapiserver.Config,
versionedInformers clientgoinformers.SharedInformerFactory,
serviceResolver aggregatorapiserver.ServiceResolver,
pluginInitializers []admission.PluginInitializer,
admissionPostStartHook genericapiserver.PostStartHookFunc,
storageFactory *serverstorage.DefaultStorageFactory,
lastErr error,
) {
// 使用apiserver 生成apiserver genericConfig
genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
// 获取默认所有的gv(enable Disable)
genericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()
// 把s.GenericServerRunOptions相关配置应用到genericConfig
if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil {
return
}
// 把s.SecureServing相关配置应用到genericConfig.SecureServing 并配置LoopbackClientConfig(具体看实现是否覆盖参数genericConfig.LoopbackClientConfig)
if lastErr = s.SecureServing.ApplyTo(&genericConfig.SecureServing, &genericConfig.LoopbackClientConfig); lastErr != nil {
return
}
// 配置是否开启debug pprof和 争用debug pprof功能
if lastErr = s.Features.ApplyTo(genericConfig); lastErr != nil {
return
}
// 将给定的 defaultAPIResourceConfig 与给定的 resourceConfigOverrides 合并。(合并原则是以defaultAPIResourceConfig为基础,以resourceConfigOverrides中的设置为目标)
if lastErr = s.APIEnablement.ApplyTo(genericConfig, controlplane.DefaultAPIResourceConfigSource(), legacyscheme.Scheme); lastErr != nil {
return
}
// 将出口选择器设置EgressSelectorOptions中的设置项添加到服务器配置中
if lastErr = s.EgressSelector.ApplyTo(genericConfig); lastErr != nil {
return
}
// 如果APIServerTracing对应在FeatureGate中设置为true(开启)
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
// 使用设置的追踪选项TracingOptions配置apiserver的跟踪配置
if lastErr = s.Traces.ApplyTo(genericConfig.EgressSelector, genericConfig); lastErr != nil {
return
}
}
// 包装定义以恢复禁用功能的任何更改 用来生成k8s的open api(类似于swagger)
getOpenAPIDefinitions := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)
// 使用DefaultOpenAPIConfig设置为OpenAPIConfig的默认值
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(getOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))
// 设置OpenAPIConfig的标题为Kubernetes
genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
// 如果默认开启了OpenAPIV3功能
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.OpenAPIV3) {
// 使用DefaultOpenAPIV3Config设置为OpenAPIV3Config的默认值
genericConfig.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(getOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))
genericConfig.OpenAPIV3Config.Info.Title = "Kubernetes"
}
// 判断是否是长运行请求(就是保持长期会话,比如watch动作,就需要持续监听)的方法
genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
)
// 版本信息(这里主要是git和go的版本信息)
kubeVersion := version.Get()
genericConfig.Version = &kubeVersion
// 构建一个设置了必要资源覆盖的StorageFactoryConfig。
storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
// 设置APIResourceConfig,用来指示启用了哪个 groupVersion 及其资源是否启用
storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
// 使用提供的 etcdOptions配置StorageFactoryConfig,返回包装了StorageFactoryConfig的completedStorageFactoryConfig
completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
if err != nil {
lastErr = err
return
}
// 根据completedStorageFactoryConfig创建DefaultStorageFactory -- 用来生成对应GroupResource的存储后端
storageFactory, lastErr = completedStorageFactoryConfig.New()
if lastErr != nil {
return
}
// 分发流量
if genericConfig.EgressSelector != nil {
// 配置EgressLookup -- 包含controlplane etcd cluster模式
storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
}
// 配置链路追踪
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) && genericConfig.TracerProvider != nil {
storageFactory.StorageConfig.Transport.TracerProvider = genericConfig.TracerProvider
}
// 向c中添加etcd的健康检查并修改c中用于获取gr对应的RESTOptions的RESTOptionsGetter,覆盖s中StorageConfig.StorageObjectCountTracker属性
if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
return
}
// 使用 protobufs 进行自我通信。由于不是每个通用 apiserver 都必须支持 protobufs,所以我们不能在通用 apiserver 中默认使用它,需要在 kube-apiserver 中显式设置它。
genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf"
// 因为我们将在一个快速的本地网络上,所以这里会禁用自我通信的压缩功能
genericConfig.LoopbackClientConfig.DisableCompression = true
kubeClientConfig := genericConfig.LoopbackClientConfig
// 获取所有对象的rest client
clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig)
if err != nil {
lastErr = fmt.Errorf("failed to create real external clientset: %v", err)
return
}
// 通过client-go获取SharedInformerFactory -- 用于reconcile
versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)
// s.Authentication中的一些配置信息应用到各个参数对应的信息中
if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, genericConfig.OpenAPIV3Config, clientgoExternalClient, versionedInformers); lastErr != nil {
return
}
// 构建授权器和授权规则解析器
genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
if err != nil {
lastErr = fmt.Errorf("invalid authorization config: %v", err)
return
}
// 如果授权mode中没有RABC模式
if !sets.NewString(s.Authorization.Modes...).Has(modes.ModeRBAC) {
// 将"rbac/bootstrap-roles"插入到在不可用的PostStartHook集合中
genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName)
}
// 应用Audit日志审计功能到genericConfig
lastErr = s.Audit.ApplyTo(genericConfig)
if lastErr != nil {
return
}
// 初始化准入插件所需的配置
admissionConfig := &kubeapiserveradmission.Config{
ExternalInformers: versionedInformers,
LoopbackClientConfig: genericConfig.LoopbackClientConfig,
CloudConfigFile: s.CloudProvider.CloudConfigFile,
}
// 根据enabledAggregatorRouting构建service解析器
serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)
// 获取准入所需的插件和开始挂钩
pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver, genericConfig.TracerProvider)
if err != nil {
lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err)
return
}
// 将准入链选项添加到服务器配置中
err = s.Admission.ApplyTo(
genericConfig,
versionedInformers,
kubeClientConfig,
utilfeature.DefaultFeatureGate,
pluginInitializers...)
if err != nil {
lastErr = fmt.Errorf("failed to initialize admission: %v", err)
return
}
// 如果APIPriorityAndFairness功能开启且 s.GenericServerRunOptions.EnablePriorityAndFairness = true(默认为true)
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && s.GenericServerRunOptions.EnablePriorityAndFairness {
// 构建 API 优先级和公平性过滤器的核心
genericConfig.FlowControl, lastErr = BuildPriorityAndFairness(s, clientgoExternalClient, versionedInformers)
}
return
}
更多推荐
已为社区贡献15条内容
所有评论(0)