k8s apiserver之启动执行流程总览二

启动执行流程总览二

本文主要分析kubernetes在启动kube-apiserver的执行流程,本文不作深入分析,后续会展开

server.go

  • 函数
// 构建授权器和授权规则解析器
func BuildAuthorizer(s *options.ServerRunOptions, EgressSelector *egressselector.EgressSelector, versionedInformers clientgoinformers.SharedInformerFactory) (authorizer.Authorizer, authorizer.RuleResolver, error) {
   // 将 BuiltInAuthorizationOptions类型的对象 转换为 authorizer.Config类型对象
   authorizationConfig := s.Authorization.ToAuthorizationConfig(versionedInformers)

   // 如果配置有EgressSelector -- 用来分发访问入口选择器
   if EgressSelector != nil {
   	// 这里直接指定是ControlPlane
   	egressDialer, err := EgressSelector.Lookup(egressselector.ControlPlane.AsNetworkContext())
   	if err != nil {
   		return nil, nil, err
   	}
   	authorizationConfig.CustomDial = egressDialer
   }
   // 返回多个 authorizer.Authorizer 对象的授权链
   return authorizationConfig.New()
}

// 构建 API 优先级和公平性过滤器的核心
func BuildPriorityAndFairness(s *options.ServerRunOptions, extclient clientgoclientset.Interface, versionedInformer clientgoinformers.SharedInformerFactory) (utilflowcontrol.Interface, error) {
   // 限制变异请求的最大数量MaxMutatingRequestsInFlight和限制非变异请求的最大数量MaxRequestsInFlight之和
   // 若小于等于0,则return err
   if s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight <= 0 {
   	return nil, fmt.Errorf("invalid configuration: MaxRequestsInFlight=%d and MaxMutatingRequestsInFlight=%d; they must add up to something positive", s.GenericServerRunOptions.MaxRequestsInFlight, s.GenericServerRunOptions.MaxMutatingRequestsInFlight)
   }
   return utilflowcontrol.New(
   	versionedInformer,
   	extclient.FlowcontrolV1beta2(),
   	s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight,
   	// 这里注意,flag RequestTimeout的四分之一,设置为RequestWaitLimit(请求等待时长)
   	s.GenericServerRunOptions.RequestTimeout/4,
   ), nil
}

// 完成默认为初始化的配置。该方法在解析flags后必须被调用
func Complete(s *options.ServerRunOptions) (completedServerRunOptions, error) {
   var options completedServerRunOptions
   // 设置AdvertiseAddress字段 用于生成k8s.io中apiserver/pkg/server/config.go中Config结构体的ExternalAddress字段
   // 提供给集群外部访问
   if err := s.GenericServerRunOptions.DefaultAdvertiseAddress(s.SecureServing.SecureServingOptions); err != nil {
   	return options, err
   }

   // 解析启动参数service-cluster-ip-range,获取serverServiceIP 首选serviceIPRange和次要serviceIPRange
   apiServerServiceIP, primaryServiceIPRange, secondaryServiceIPRange, err := getServiceIPAndRanges(s.ServiceClusterIPRanges)
   if err != nil {
   	return options, err
   }
   s.PrimaryServiceClusterIPRange = primaryServiceIPRange
   s.SecondaryServiceClusterIPRange = secondaryServiceIPRange
   s.APIServerServiceIP = apiServerServiceIP

   //  可能需要生成自签名证书 - 注意是可能 如果ServerCert.CertKey中指定了  就不需要生成
   if err := s.SecureServing.MaybeDefaultWithSelfSignedCerts(s.GenericServerRunOptions.AdvertiseAddress.String(), []string{"kubernetes.default.svc", "kubernetes.default", "kubernetes"}, []net.IP{apiServerServiceIP}); err != nil {
   	return options, fmt.Errorf("error creating self-signed certificates: %v", err)
   }

   // 如果为指定ExternalHost
   if len(s.GenericServerRunOptions.ExternalHost) == 0 {
   	// 如果AdvertiseAddress不为空,则会根据其生成ExternalHost
   	if len(s.GenericServerRunOptions.AdvertiseAddress) > 0 {
   		s.GenericServerRunOptions.ExternalHost = s.GenericServerRunOptions.AdvertiseAddress.String()
   	} else {
   		// 如果AdvertiseAddress为空,则会获取系统hostname,生成ExternalHost
   		if hostname, err := os.Hostname(); err == nil {
   			s.GenericServerRunOptions.ExternalHost = hostname
   		} else {
   			return options, fmt.Errorf("error finding host name: %v", err)
   		}
   	}
   	klog.Infof("external host was not specified, using %v", s.GenericServerRunOptions.ExternalHost)
   }

   // 验证Authentication和Authorization设置是否有冲突
   s.Authentication.ApplyAuthorization(s.Authorization)
   
   // 该参数在启动kube-apiserver时指定,是一个证书来颁发用户令牌,提供用户访问k8s
   if s.ServiceAccountSigningKeyFile == "" {
   	// 如果ServiceAccounts的私钥没有指定并且ServerCert的私钥指定
   	if len(s.Authentication.ServiceAccounts.KeyFiles) == 0 && s.SecureServing.ServerCert.CertKey.KeyFile != "" {
   		// 读取对应路径的证书,并验证是否符合pem公钥(SecureServing.ServerCert.CertKey.KeyFile为私钥,使用该方法可以返回true么?)
   		if kubeauthenticator.IsValidServiceAccountKeyFile(s.SecureServing.ServerCert.CertKey.KeyFile) {
   			s.Authentication.ServiceAccounts.KeyFiles = []string{s.SecureServing.ServerCert.CertKey.KeyFile}
   		} else {
   			klog.Warning("No TLS key provided, service account token authentication disabled")
   		}
   	}
   }

   // ServiceAccountSigningKeyFile和Authentication.ServiceAccounts.Issuer不为空
   if s.ServiceAccountSigningKeyFile != "" && len(s.Authentication.ServiceAccounts.Issuers) != 0 && s.Authentication.ServiceAccounts.Issuers[0] != "" {
   	// 读取对应路径的证书,并验证是否符合pem私钥
   	sk, err := keyutil.PrivateKeyFromFile(s.ServiceAccountSigningKeyFile)
   	if err != nil {
   		return options, fmt.Errorf("failed to parse service-account-issuer-key-file: %v", err)
   	}
   	// 如果Authentication.ServiceAccounts.MaxExpiration不为0,即有过期失效
   	if s.Authentication.ServiceAccounts.MaxExpiration != 0 {
   		lowBound := time.Hour
   		upBound := time.Duration(1<<32) * time.Second
   		// 验证过期时间在1 hour and 2^32 seconds
   		if s.Authentication.ServiceAccounts.MaxExpiration < lowBound ||
   			s.Authentication.ServiceAccounts.MaxExpiration > upBound {
   			return options, fmt.Errorf("the service-account-max-token-expiration must be between 1 hour and 2^32 seconds")
   		}

   		if s.Authentication.ServiceAccounts.ExtendExpiration {
   			if s.Authentication.ServiceAccounts.MaxExpiration < serviceaccount.WarnOnlyBoundTokenExpirationSeconds*time.Second {
   				klog.Warningf("service-account-extend-token-expiration is true, in order to correctly trigger safe transition logic, service-account-max-token-expiration must be set longer than %d seconds (currently %s)", serviceaccount.WarnOnlyBoundTokenExpirationSeconds, s.Authentication.ServiceAccounts.MaxExpiration)
   			}
   			if s.Authentication.ServiceAccounts.MaxExpiration < serviceaccount.ExpirationExtensionSeconds*time.Second {
   				klog.Warningf("service-account-extend-token-expiration is true, enabling tokens valid up to %d seconds, which is longer than service-account-max-token-expiration set to %s seconds", serviceaccount.ExpirationExtensionSeconds, s.Authentication.ServiceAccounts.MaxExpiration)
   			}
   		}
   	}

   	// 获取一个TokenGenerator,用来生成jwt
   	s.ServiceAccountIssuer, err = serviceaccount.JWTTokenGenerator(s.Authentication.ServiceAccounts.Issuers[0], sk)

   	if err != nil {
   		return options, fmt.Errorf("failed to build token generator: %v", err)
   	}
   	s.ServiceAccountTokenMaxExpiration = s.Authentication.ServiceAccounts.MaxExpiration
   }

   // 默认是为true
   if s.Etcd.EnableWatchCache {
   	// DefaultWatchCacheSizes 定义应禁用 watchcache 的默认资源。
   	sizes := kubeapiserver.DefaultWatchCacheSizes()
   	// Ensure that overrides parse correctly.
   	// s.Etcd.WatchCacheSizes默认是空的
   	userSpecified, err := serveroptions.ParseWatchCacheSizes(s.Etcd.WatchCacheSizes)
   	if err != nil {
   		return options, err
   	}
   	// 覆盖sizes对应key的value
   	for resource, size := range userSpecified {
   		sizes[resource] = size
   	}
   	// map变为   group.source#num,group.source#num,覆盖Etcd.WatchCacheSizes
   	s.Etcd.WatchCacheSizes, err = serveroptions.WriteWatchCacheSizes(sizes)
   	if err != nil {
   		return options, err
   	}
   }

   // 用于定义启用和禁用 那些gv(有啥用呢? 就是我们可以把alpha 版本的resource禁用),控制是否会生成对应gvr的etcd rest
   for key, value := range s.APIEnablement.RuntimeConfig {
   	// 这里会校验是否有以下几种情况,会从PIEnablement.RuntimeConfig中删除,然后在PIEnablement.RuntimeConfig设置key="/v1",value为原始值
   	// 为什么这么做呢?单纯猜测:这几种表示方法和s.APIEnablement.RuntimeConfig["/v1"] = value是一样的  只是整理一下
   	if key == "v1" || strings.HasPrefix(key, "v1/") ||
   		key == "api/v1" || strings.HasPrefix(key, "api/v1/") {
   		delete(s.APIEnablement.RuntimeConfig, key)
   		s.APIEnablement.RuntimeConfig["/v1"] = value
   	}
   	// 如果 gv= "api/legacy",则直接从s.APIEnablement.RuntimeConfig移除,因为这是k8s遗留问题,目前来说必须有的东西
   	if key == "api/legacy" {
   		delete(s.APIEnablement.RuntimeConfig, key)
   	}
   }

   options.ServerRunOptions = s
   return options, nil
}

  • 结构体
// 在调用 Run 之前必须强制调用 Complete()。
type completedServerRunOptions struct {
   *options.ServerRunOptions
}
  • 方法
// 根据enabledAggregatorRouting构建service解析器
func buildServiceResolver(enabledAggregatorRouting bool, hostname string, informer clientgoinformers.SharedInformerFactory) webhook.ServiceResolver {
   var serviceResolver webhook.ServiceResolver
   // 开启了聚合路由
   if enabledAggregatorRouting {
   	// 这里会解析service/clusterip -- endpoint - pod
   	serviceResolver = aggregatorapiserver.NewEndpointServiceResolver(
   		informer.Core().V1().Services().Lister(),
   		informer.Core().V1().Endpoints().Lister(),
   	)
   } else {
   	// 解析service/clusterip
   	serviceResolver = aggregatorapiserver.NewClusterIPServiceResolver(
   		informer.Core().V1().Services().Lister(),
   	)
   }
   // 包装解析本地 kubernetes.default.svc
   if localHost, err := url.Parse(hostname); err == nil {
   	serviceResolver = aggregatorapiserver.NewLoopbackServiceResolver(serviceResolver, localHost)
   }
   return serviceResolver
}

// 解析启动参数service-cluster-ip-range,获取serverServiceIP 首选serviceIPRange和次要serviceIPRange
func getServiceIPAndRanges(serviceClusterIPRanges string) (net.IP, net.IPNet, net.IPNet, error) {
   serviceClusterIPRangeList := []string{}
   if serviceClusterIPRanges != "" {
   	// 以逗号分割
   	serviceClusterIPRangeList = strings.Split(serviceClusterIPRanges, ",")
   }

   // 首选ip
   var apiServerServiceIP net.IP
   // 双栈ip 第一个
   var primaryServiceIPRange net.IPNet
   // 双栈ip 第二个
   var secondaryServiceIPRange net.IPNet
   var err error
   // 用户未提供任何内容,使用默认范围(仅适用于 Primary)
   if len(serviceClusterIPRangeList) == 0 {
   	var primaryServiceClusterCIDR net.IPNet
   	// 检查 serviceClusterIPRange 标志是否为 nil,如果是则发出警告,并将服务 ip 范围设置为 options.DefaultServiceIPCIDR 中的默认值。返回服务ip范围,api服务器服务IP
   	primaryServiceIPRange, apiServerServiceIP, err = controlplane.ServiceIPRange(primaryServiceClusterCIDR)
   	if err != nil {
   		return net.IP{}, net.IPNet{}, net.IPNet{}, fmt.Errorf("error determining service IP ranges: %v", err)
   	}
   	return apiServerServiceIP, primaryServiceIPRange, net.IPNet{}, nil
   }

   //  将配置的双栈ip1 解析为 CIDR 表示法 IP 地址和长度
   _, primaryServiceClusterCIDR, err := netutils.ParseCIDRSloppy(serviceClusterIPRangeList[0])
   if err != nil {
   	return net.IP{}, net.IPNet{}, net.IPNet{}, fmt.Errorf("service-cluster-ip-range[0] is not a valid cidr")
   }

   // 检查 serviceClusterIPRange 标志是否为 nil,如果是则发出警告,并将服务 ip 范围设置为 options.DefaultServiceIPCIDR 中的默认值。返回服务ip范围,api服务器服务IP
   primaryServiceIPRange, apiServerServiceIP, err = controlplane.ServiceIPRange(*primaryServiceClusterCIDR)
   if err != nil {
   	return net.IP{}, net.IPNet{}, net.IPNet{}, fmt.Errorf("error determining service IP ranges for primary service cidr: %v", err)
   }
   
   // 用户提供了至少两个条目 注意:验证断言该列表最多包含两个双堆栈条目
   if len(serviceClusterIPRangeList) > 1 {
   	// 将配置的双栈ip2 解析为 CIDR 表示法 IP 地址和长度
   	_, secondaryServiceClusterCIDR, err := netutils.ParseCIDRSloppy(serviceClusterIPRangeList[1])
   	if err != nil {
   		return net.IP{}, net.IPNet{}, net.IPNet{}, fmt.Errorf("service-cluster-ip-range[1] is not an ip net")
   	}
   	secondaryServiceIPRange = *secondaryServiceClusterCIDR
   }
   return apiServerServiceIP, primaryServiceIPRange, secondaryServiceIPRange, nil
}
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐