apiserver

k8s中最重要的一个通信节点就是apiserver,是一个中心节点连接着每一环,是kubelet,kube-proxy和control-manager的交互的中心点,提供基于API服务来管理每一步的流程,后端采用高可用的etcd等组件作为数据库来提供数据的高可用。从介绍来看,apiserver的整个架构也基于上基于传统的http服务端来实现,这对外可提供友好的接口进行二次开发。

apiserver流程
func main() {
	runtime.GOMAXPROCS(runtime.NumCPU())
	rand.Seed(time.Now().UTC().UnixNano())

	s := app.NewAPIServer()   	// 生成服务实例
	s.AddFlags(pflag.CommandLine)

	util.InitFlags()
	util.InitLogs()
	defer util.FlushLogs()

	verflag.PrintAndExitIfRequested()

	if err := s.Run(pflag.CommandLine.Args()); err != nil {  // 服务端实例运行
		fmt.Fprintf(os.Stderr, "%v\n", err)
		os.Exit(1)
	}
}

一切都从Run函数开始运行。

// Run runs the specified APIServer.  This should never exit.
func (s *APIServer) Run(_ []string) error {
	s.verifyClusterIPFlags()

	// If advertise-address is not specified, use bind-address. If bind-address
	// is not usable (unset, 0.0.0.0, or loopback), setDefaults() in
	// pkg/master/master.go will do the right thing and use the host's default
	// interface.
	if s.AdvertiseAddress == nil || s.AdvertiseAddress.IsUnspecified() {
		s.AdvertiseAddress = s.BindAddress
	}

	if (s.EtcdConfigFile != "" && len(s.EtcdServerList) != 0) || (s.EtcdConfigFile == "" && len(s.EtcdServerList) == 0) {
		glog.Fatalf("specify either --etcd-servers or --etcd-config")
	}   // 检查etcd配置

	capabilities.Initialize(capabilities.Capabilities{
		AllowPrivileged: s.AllowPrivileged,
		// TODO(vmarmol): Implement support for HostNetworkSources.
		PrivilegedSources: capabilities.PrivilegedSources{
			HostNetworkSources: []string{},
			HostPIDSources:     []string{},
			HostIPCSources:     []string{},
		},
		PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec,
	})

	cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) // 初始化云服务
	if err != nil {
		glog.Fatalf("Cloud provider could not be initialized: %v", err)
	}

	// Setup tunneler if needed
	var tunneler master.Tunneler
	var proxyDialerFn apiserver.ProxyDialerFunc
	if len(s.SSHUser) > 0 {  // 判断是否是SSH方式
		// Get ssh key distribution func, if supported
		var installSSH master.InstallSSHKey
		if cloud != nil {
			if instances, supported := cloud.Instances(); supported {
				installSSH = instances.AddSSHKeyToAllInstances
			}
		}

		// Set up the tunneler
		tunneler = master.NewSSHTunneler(s.SSHUser, s.SSHKeyfile, installSSH)

		// Use the tunneler's dialer to connect to the kubelet
		s.KubeletConfig.Dial = tunneler.Dial
		// Use the tunneler's dialer when proxying to pods, services, and nodes
		proxyDialerFn = tunneler.Dial
	}

	// Proxying to pods and services is IP-based... don't expect to be able to verify the hostname
	proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}

	kubeletClient, err := client.NewKubeletClient(&s.KubeletConfig) // 生成kubelet连接客户端
	if err != nil {
		glog.Fatalf("Failure to start kubelet client: %v", err)
	}

	apiGroupVersionOverrides, err := s.parseRuntimeConfig()  // 生成api组信息
	if err != nil {
		glog.Fatalf("error in parsing runtime-config: %s", err)
	}

	clientConfig := &client.Config{
		Host:    net.JoinHostPort(s.InsecureBindAddress.String(), strconv.Itoa(s.InsecurePort)),
		Version: s.DeprecatedStorageVersion,
	}
	client, err := client.New(clientConfig)  
	if err != nil {
		glog.Fatalf("Invalid server address: %v", err)
	}

	legacyV1Group, err := latest.Group("")  
	if err != nil {
		return err
	}

	storageDestinations := master.NewStorageDestinations()  // 生成保存的实例

	storageVersions := generateStorageVersionMap(s.DeprecatedStorageVersion, s.StorageVersions)
	if _, found := storageVersions[legacyV1Group.Group]; !found {
		glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", legacyV1Group.Group, storageVersions)
	}    // 保存的规则
	etcdStorage, err := newEtcd(s.EtcdConfigFile, s.EtcdServerList, legacyV1Group.InterfacesFor, storageVersions[legacyV1Group.Group], s.EtcdPathPrefix) // 获取基于etcd的保存方式
	if err != nil {
		glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err)
	}
	storageDestinations.AddAPIGroup("", etcdStorage)  // 将当前组添加到通过etcd存储

	if !apiGroupVersionOverrides["extensions/v1beta1"].Disable {
		expGroup, err := latest.Group("extensions")
		if err != nil {
			glog.Fatalf("Extensions API is enabled in runtime config, but not enabled in the environment variable KUBE_API_VERSIONS. Error: %v", err)
		}
		if _, found := storageVersions[expGroup.Group]; !found {
			glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", expGroup.Group, storageVersions)
		}
		expEtcdStorage, err := newEtcd(s.EtcdConfigFile, s.EtcdServerList, expGroup.InterfacesFor, storageVersions[expGroup.Group], s.EtcdPathPrefix)
		if err != nil {
			glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err)
		}
		storageDestinations.AddAPIGroup("extensions", expEtcdStorage)
	}

	updateEtcdOverrides(s.EtcdServersOverrides, storageVersions, s.EtcdPathPrefix, &storageDestinations, newEtcd)  // 查看是否有更新的配置信息

	n := s.ServiceClusterIPRange

	// Default to the private server key for service account token signing
	if s.ServiceAccountKeyFile == "" && s.TLSPrivateKeyFile != "" {
		if apiserver.IsValidServiceAccountKeyFile(s.TLSPrivateKeyFile) {
			s.ServiceAccountKeyFile = s.TLSPrivateKeyFile
		} else {
			glog.Warning("no RSA key provided, service account token authentication disabled")
		}
	}
	authenticator, err := apiserver.NewAuthenticator(apiserver.AuthenticatorConfig{
		BasicAuthFile:         s.BasicAuthFile,
		ClientCAFile:          s.ClientCAFile,
		TokenAuthFile:         s.TokenAuthFile,
		OIDCIssuerURL:         s.OIDCIssuerURL,
		OIDCClientID:          s.OIDCClientID,
		OIDCCAFile:            s.OIDCCAFile,
		OIDCUsernameClaim:     s.OIDCUsernameClaim,
		ServiceAccountKeyFile: s.ServiceAccountKeyFile,
		ServiceAccountLookup:  s.ServiceAccountLookup,
		Storage:               etcdStorage,
		KeystoneURL:           s.KeystoneURL,
	})  // 认证方式

	if err != nil {
		glog.Fatalf("Invalid Authentication Config: %v", err)
	}

	authorizationModeNames := strings.Split(s.AuthorizationMode, ",")
	authorizer, err := apiserver.NewAuthorizerFromAuthorizationConfig(authorizationModeNames, s.AuthorizationPolicyFile)
	if err != nil {
		glog.Fatalf("Invalid Authorization Config: %v", err)
	}

	admissionControlPluginNames := strings.Split(s.AdmissionControl, ",")
	admissionController := admission.NewFromPlugins(client, admissionControlPluginNames, s.AdmissionControlConfigFile)

	if len(s.ExternalHost) == 0 {
		// TODO: extend for other providers
		if s.CloudProvider == "gce" {   // 判断是否是gce平台的cloud
			instances, supported := cloud.Instances()
			if !supported {
				glog.Fatalf("gce cloud provider has no instances.  this shouldn't happen. exiting.")
			}
			name, err := os.Hostname()
			if err != nil {
				glog.Fatalf("failed to get hostname: %v", err)
			}
			addrs, err := instances.NodeAddresses(name)
			if err != nil {
				glog.Warningf("unable to obtain external host address from cloud provider: %v", err)
			} else {
				for _, addr := range addrs {
					if addr.Type == api.NodeExternalIP {
						s.ExternalHost = addr.Address
					}
				}
			}
		}
	}

	config := &master.Config{
		StorageDestinations:      storageDestinations,
		StorageVersions:          storageVersions,
		EventTTL:                 s.EventTTL,
		KubeletClient:            kubeletClient,
		ServiceClusterIPRange:    &n,
		EnableCoreControllers:    true,
		EnableLogsSupport:        s.EnableLogsSupport,
		EnableUISupport:          true,
		EnableSwaggerSupport:     true,
		EnableProfiling:          s.EnableProfiling,
		EnableWatchCache:         s.EnableWatchCache,
		EnableIndex:              true,
		APIPrefix:                s.APIPrefix,
		APIGroupPrefix:           s.APIGroupPrefix,
		CorsAllowedOriginList:    s.CorsAllowedOriginList,
		ReadWritePort:            s.SecurePort,
		PublicAddress:            s.AdvertiseAddress,
		Authenticator:            authenticator,
		SupportsBasicAuth:        len(s.BasicAuthFile) > 0,
		Authorizer:               authorizer,
		AdmissionControl:         admissionController,
		APIGroupVersionOverrides: apiGroupVersionOverrides,
		MasterServiceNamespace:   s.MasterServiceNamespace,
		ClusterName:              s.ClusterName,
		ExternalHost:             s.ExternalHost,
		MinRequestTimeout:        s.MinRequestTimeout,
		ProxyDialer:              proxyDialerFn,
		ProxyTLSClientConfig:     proxyTLSClientConfig,
		Tunneler:                 tunneler,
		ServiceNodePortRange:     s.ServiceNodePortRange,
	}   // 生成master配置
	m := master.New(config)  // 生成一个Master实例

	// We serve on 2 ports.  See docs/accessing_the_api.md
	secureLocation := ""
	if s.SecurePort != 0 {
		secureLocation = net.JoinHostPort(s.BindAddress.String(), strconv.Itoa(s.SecurePort))
	}
	insecureLocation := net.JoinHostPort(s.InsecureBindAddress.String(), strconv.Itoa(s.InsecurePort))

	// See the flag commentary to understand our assumptions when opening the read-only and read-write ports.

	var sem chan bool
	if s.MaxRequestsInFlight > 0 {
		sem = make(chan bool, s.MaxRequestsInFlight)
	}

	longRunningRE := regexp.MustCompile(s.LongRunningRequestRE)
	longRunningTimeout := func(req *http.Request) (<-chan time.Time, string) {
		// TODO unify this with apiserver.MaxInFlightLimit
		if longRunningRE.MatchString(req.URL.Path) || req.URL.Query().Get("watch") == "true" {
			return nil, ""
		}
		return time.After(time.Minute), ""
	}

	if secureLocation != "" {
		handler := apiserver.TimeoutHandler(m.Handler, longRunningTimeout)  // 设置超时handler安全的监听流程
		secureServer := &http.Server{
			Addr:           secureLocation,
			Handler:        apiserver.MaxInFlightLimit(sem, longRunningRE, apiserver.RecoverPanics(handler)),
			MaxHeaderBytes: 1 << 20,
			TLSConfig: &tls.Config{
				// Change default from SSLv3 to TLSv1.0 (because of POODLE vulnerability)
				MinVersion: tls.VersionTLS10,
			},
		}

		if len(s.ClientCAFile) > 0 {
			clientCAs, err := util.CertPoolFromFile(s.ClientCAFile)
			if err != nil {
				glog.Fatalf("unable to load client CA file: %v", err)
			}
			// Populate PeerCertificates in requests, but don't reject connections without certificates
			// This allows certificates to be validated by authenticators, while still allowing other auth types
			secureServer.TLSConfig.ClientAuth = tls.RequestClientCert
			// Specify allowed CAs for client certificates
			secureServer.TLSConfig.ClientCAs = clientCAs
		}

		glog.Infof("Serving securely on %s", secureLocation)
		if s.TLSCertFile == "" && s.TLSPrivateKeyFile == "" {
			s.TLSCertFile = path.Join(s.CertDirectory, "apiserver.crt")
			s.TLSPrivateKeyFile = path.Join(s.CertDirectory, "apiserver.key")
			// TODO (cjcullen): Is PublicAddress the right address to sign a cert with?
			alternateIPs := []net.IP{config.ServiceReadWriteIP}
			alternateDNS := []string{"kubernetes.default.svc", "kubernetes.default", "kubernetes"}
			// It would be nice to set a fqdn subject alt name, but only the kubelets know, the apiserver is clueless
			// alternateDNS = append(alternateDNS, "kubernetes.default.svc.CLUSTER.DNS.NAME")
			if err := util.GenerateSelfSignedCert(config.PublicAddress.String(), s.TLSCertFile, s.TLSPrivateKeyFile, alternateIPs, alternateDNS); err != nil {
				glog.Errorf("Unable to generate self signed cert: %v", err)
			} else {
				glog.Infof("Using self-signed cert (%s, %s)", s.TLSCertFile, s.TLSPrivateKeyFile)
			}
		}

		go func() {
			defer util.HandleCrash()
			for {
				// err == systemd.SdNotifyNoSocket when not running on a systemd system
				if err := systemd.SdNotify("READY=1\n"); err != nil && err != systemd.SdNotifyNoSocket {
					glog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)
				}
				if err := secureServer.ListenAndServeTLS(s.TLSCertFile, s.TLSPrivateKeyFile); err != nil {
					glog.Errorf("Unable to listen for secure (%v); will try again.", err)
				}
				time.Sleep(15 * time.Second)
			}
		}()
	}
	handler := apiserver.TimeoutHandler(m.InsecureHandler, longRunningTimeout) // 添加handler
	http := &http.Server{
		Addr:           insecureLocation,
		Handler:        apiserver.RecoverPanics(handler),
		MaxHeaderBytes: 1 << 20,
	}
	if secureLocation == "" {
		// err == systemd.SdNotifyNoSocket when not running on a systemd system
		if err := systemd.SdNotify("READY=1\n"); err != nil && err != systemd.SdNotifyNoSocket {
			glog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)
		}
	}
	glog.Infof("Serving insecurely on %s", insecureLocation)
	glog.Fatal(http.ListenAndServe())  // 运行server
	return nil
}

最主要的逻辑流程就是通过输入的参数来初始化(例如监听端口,有关服务生成的端口范围等等),生成对应服务的API接口信息,并初始化对应API接口访问数据后端的存储实例(通过etcd来存储),一切就绪后开始启动服务,有关API接口的详细信息都在master.New的函数中去实现的。

func New(c *Config) *Master {
	setDefaults(c)
	if c.KubeletClient == nil {
		glog.Fatalf("master.New() called with config.KubeletClient == nil")
	}

	m := &Master{
		serviceClusterIPRange:    c.ServiceClusterIPRange,  // 服务的IP范围
		serviceNodePortRange:     c.ServiceNodePortRange,   //Node的Port的范围
		rootWebService:           new(restful.WebService),
		enableCoreControllers:    c.EnableCoreControllers,  
		enableLogsSupport:        c.EnableLogsSupport,
		enableUISupport:          c.EnableUISupport,
		enableSwaggerSupport:     c.EnableSwaggerSupport,
		enableProfiling:          c.EnableProfiling,
		enableWatchCache:         c.EnableWatchCache,
		apiPrefix:                c.APIPrefix,       // API前缀
		apiGroupPrefix:           c.APIGroupPrefix,  // 分组前缀
		corsAllowedOriginList:    c.CorsAllowedOriginList,
		authenticator:            c.Authenticator,
		authorizer:               c.Authorizer,
		admissionControl:         c.AdmissionControl,
		apiGroupVersionOverrides: c.APIGroupVersionOverrides,
		requestContextMapper:     c.RequestContextMapper,

		cacheTimeout:      c.CacheTimeout,   // 缓存超时时间
		minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,

		masterCount:         c.MasterCount,
		externalHost:        c.ExternalHost,
		clusterIP:           c.PublicAddress,
		publicReadWritePort: c.ReadWritePort,
		serviceReadWriteIP:  c.ServiceReadWriteIP,
		// TODO: serviceReadWritePort should be passed in as an argument, it may not always be 443
		serviceReadWritePort: 443,

		tunneler: c.Tunneler,
	}

	var handlerContainer *restful.Container
	if c.RestfulContainer != nil {
		m.mux = c.RestfulContainer.ServeMux
		handlerContainer = c.RestfulContainer
	} else {
		mux := http.NewServeMux()
		m.mux = mux
		handlerContainer = NewHandlerContainer(mux)
	}
	m.handlerContainer = handlerContainer
	// Use CurlyRouter to be able to use regular expressions in paths. Regular expressions are required in paths for example for proxy (where the path is proxy/{kind}/{name}/{*})
	m.handlerContainer.Router(restful.CurlyRouter{})  // 生成服务的实现信息 后续只需要添加到该container种
	m.muxHelper = &apiserver.MuxHelper{Mux: m.mux, RegisteredPaths: []string{}}

	m.init(c)  // 初始化所有的API信息并设置存储信息

	return m
}

...

// init initializes master.
func (m *Master) init(c *Config) {

	if c.ProxyDialer != nil || c.ProxyTLSClientConfig != nil {
		m.proxyTransport = util.SetTransportDefaults(&http.Transport{
			Dial:            c.ProxyDialer,
			TLSClientConfig: c.ProxyTLSClientConfig,
		})
	}

	healthzChecks := []healthz.HealthzChecker{}  // 健康检查列表

	dbClient := func(resource string) storage.Interface { return c.StorageDestinations.get("", resource) }
	podStorage := podetcd.NewStorage(dbClient("pods"), c.EnableWatchCache, c.KubeletClient, m.proxyTransport)   // 生成pods的etcd保存者

	podTemplateStorage := podtemplateetcd.NewREST(dbClient("podTemplates"))  // 生成模板的保存者

	eventStorage := eventetcd.NewREST(dbClient("events"), uint64(c.EventTTL.Seconds())) // 生成事件的保存者
	limitRangeStorage := limitrangeetcd.NewREST(dbClient("limitRanges"))

	resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotaetcd.NewREST(dbClient("resourceQuotas"))
	secretStorage := secretetcd.NewREST(dbClient("secrets"))
	serviceAccountStorage := serviceaccountetcd.NewREST(dbClient("serviceAccounts"))  // 生成serviceAccounts的保存者
	persistentVolumeStorage, persistentVolumeStatusStorage := pvetcd.NewREST(dbClient("persistentVolumes"))  // 生成一致容器卷的保存者
	persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage := pvcetcd.NewREST(dbClient("persistentVolumeClaims"))

	namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(dbClient("namespaces")) //生成命名空间的保存者
	m.namespaceRegistry = namespace.NewRegistry(namespaceStorage)

	endpointsStorage := endpointsetcd.NewREST(dbClient("endpoints"), c.EnableWatchCache)
	m.endpointRegistry = endpoint.NewRegistry(endpointsStorage) // 生成endpoint的保存者

	nodeStorage, nodeStatusStorage := nodeetcd.NewREST(dbClient("nodes"), c.EnableWatchCache, c.KubeletClient, m.proxyTransport)
	m.nodeRegistry = node.NewRegistry(nodeStorage) // 生成node的保存者

	serviceStorage := serviceetcd.NewREST(dbClient("services"))
	m.serviceRegistry = service.NewRegistry(serviceStorage)  // 生成services的保存者

	var serviceClusterIPRegistry service.RangeRegistry
	serviceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(m.serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface {
		mem := allocator.NewAllocationMap(max, rangeSpec)
		etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", "serviceipallocation", dbClient("services"))
		serviceClusterIPRegistry = etcd
		return etcd
	})
	m.serviceClusterIPAllocator = serviceClusterIPRegistry

	var serviceNodePortRegistry service.RangeRegistry
	serviceNodePortAllocator := portallocator.NewPortAllocatorCustom(m.serviceNodePortRange, func(max int, rangeSpec string) allocator.Interface {
		mem := allocator.NewAllocationMap(max, rangeSpec)
		etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", "servicenodeportallocation", dbClient("services"))
		serviceNodePortRegistry = etcd
		return etcd
	})
	m.serviceNodePortAllocator = serviceNodePortRegistry

	controllerStorage, controllerStatusStorage := controlleretcd.NewREST(dbClient("replicationControllers"))  // 生成replicationControllers的保存者

	// TODO: Factor out the core API registration
	m.storage = map[string]rest.Storage{
		"pods":             podStorage.Pod,
		"pods/attach":      podStorage.Attach,
		"pods/status":      podStorage.Status,
		"pods/log":         podStorage.Log,
		"pods/exec":        podStorage.Exec,
		"pods/portforward": podStorage.PortForward,
		"pods/proxy":       podStorage.Proxy,
		"pods/binding":     podStorage.Binding,
		"bindings":         podStorage.Binding,

		"podTemplates": podTemplateStorage,

		"replicationControllers":        controllerStorage,
		"replicationControllers/status": controllerStatusStorage,
		"services":                      service.NewStorage(m.serviceRegistry, m.endpointRegistry, serviceClusterIPAllocator, serviceNodePortAllocator, m.proxyTransport),
		"endpoints":                     endpointsStorage,
		"nodes":                         nodeStorage,
		"nodes/status":                  nodeStatusStorage,
		"events":                        eventStorage,

		"limitRanges":                   limitRangeStorage,
		"resourceQuotas":                resourceQuotaStorage,
		"resourceQuotas/status":         resourceQuotaStatusStorage,
		"namespaces":                    namespaceStorage,
		"namespaces/status":             namespaceStatusStorage,
		"namespaces/finalize":           namespaceFinalizeStorage,
		"secrets":                       secretStorage,
		"serviceAccounts":               serviceAccountStorage,
		"persistentVolumes":             persistentVolumeStorage,
		"persistentVolumes/status":      persistentVolumeStatusStorage,
		"persistentVolumeClaims":        persistentVolumeClaimStorage,
		"persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,

		"componentStatuses": componentstatus.NewStorage(func() map[string]apiserver.Server { return m.getServersToValidate(c) }),
	} // 路由匹配数组

	if m.tunneler != nil {
		m.tunneler.Run(m.getNodeAddresses)
		healthzChecks = append(healthzChecks, healthz.NamedCheck("SSH Tunnel Check", m.IsTunnelSyncHealthy))
		prometheus.NewGaugeFunc(prometheus.GaugeOpts{
			Name: "apiserver_proxy_tunnel_sync_latency_secs",
			Help: "The time since the last successful synchronization of the SSH tunnels for proxy requests.",
		}, func() float64 { return float64(m.tunneler.SecondsSinceSync()) })
	}

	apiVersions := []string{}  // 获取API版本信息
	// Install v1 unless disabled.
	if !m.apiGroupVersionOverrides["api/v1"].Disable {
		if err := m.api_v1().InstallREST(m.handlerContainer); err != nil {
			glog.Fatalf("Unable to setup API v1: %v", err)
		}
		apiVersions = append(apiVersions, "v1")
	}

	apiserver.InstallSupport(m.muxHelper, m.rootWebService, c.EnableProfiling, healthzChecks...)
	apiserver.AddApiWebService(m.handlerContainer, c.APIPrefix, apiVersions) // 注册处理信息
	defaultVersion := m.defaultAPIGroupVersion()
	requestInfoResolver := &apiserver.APIRequestInfoResolver{APIPrefixes: sets.NewString(strings.TrimPrefix(defaultVersion.Root, "/")), RestMapper: defaultVersion.Mapper}
	apiserver.InstallServiceErrorHandler(m.handlerContainer, requestInfoResolver, apiVersions)

	// allGroups records all supported groups at /apis
	allGroups := []unversioned.APIGroup{} // 生成所有的apis的接口信息
	// Install extensions unless disabled.
	if !m.apiGroupVersionOverrides["extensions/v1beta1"].Disable {
		m.thirdPartyStorage = c.StorageDestinations.APIGroups["extensions"].Default
		m.thirdPartyResources = map[string]*thirdpartyresourcedataetcd.REST{}

		expVersion := m.experimental(c)

		if err := expVersion.InstallREST(m.handlerContainer); err != nil {
			glog.Fatalf("Unable to setup experimental api: %v", err)
		}
		g, err := latest.Group("extensions")
		if err != nil {
			glog.Fatalf("Unable to setup experimental api: %v", err)
		}
		expAPIVersions := []unversioned.GroupVersion{
			{
				GroupVersion: expVersion.Version,
				Version:      apiutil.GetVersion(expVersion.Version),
			},
		}
		storageVersion, found := c.StorageVersions[g.Group]
		if !found {
			glog.Fatalf("Couldn't find storage version of group %v", g.Group)
		}
		group := unversioned.APIGroup{
			Name:             g.Group,
			Versions:         expAPIVersions,
			PreferredVersion: unversioned.GroupVersion{GroupVersion: storageVersion, Version: apiutil.GetVersion(storageVersion)},
		}
		apiserver.AddGroupWebService(m.handlerContainer, c.APIGroupPrefix+"/"+latest.GroupOrDie("extensions").Group, group)
		allGroups = append(allGroups, group)
		expRequestInfoResolver := &apiserver.APIRequestInfoResolver{APIPrefixes: sets.NewString(strings.TrimPrefix(expVersion.Root, "/")), RestMapper: expVersion.Mapper}
		apiserver.InstallServiceErrorHandler(m.handlerContainer, expRequestInfoResolver, []string{expVersion.Version})
	}

	// This should be done after all groups are registered
	// TODO: replace the hardcoded "apis".
	apiserver.AddApisWebService(m.handlerContainer, "/apis", allGroups)  // 注册到框架中处理

	// Register root handler.
	// We do not register this using restful Webservice since we do not want to surface this in api docs.
	// Allow master to be embedded in contexts which already have something registered at the root
	if c.EnableIndex {
		m.mux.HandleFunc("/", apiserver.IndexHandler(m.handlerContainer, m.muxHelper))
	}

	if c.EnableLogsSupport {
		apiserver.InstallLogsSupport(m.muxHelper)
	}
	if c.EnableUISupport {
		ui.InstallSupport(m.muxHelper, m.enableSwaggerSupport)
	}

	if c.EnableProfiling {  // 是否启动调试接口
		m.mux.HandleFunc("/debug/pprof/", pprof.Index)
		m.mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
		m.mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
	}

	handler := http.Handler(m.mux.(*http.ServeMux)) // 生成处理hanndler

	// TODO: handle CORS and auth using go-restful
	// See github.com/emicklei/go-restful/blob/master/examples/restful-CORS-filter.go, and
	// github.com/emicklei/go-restful/blob/master/examples/restful-basic-authentication.go

	if len(c.CorsAllowedOriginList) > 0 {
		allowedOriginRegexps, err := util.CompileRegexps(c.CorsAllowedOriginList)
		if err != nil {
			glog.Fatalf("Invalid CORS allowed origin, --cors-allowed-origins flag was set to %v - %v", strings.Join(c.CorsAllowedOriginList, ","), err)
		}
		handler = apiserver.CORS(handler, allowedOriginRegexps, nil, nil, "true")
	}

	m.InsecureHandler = handler  // 保存handler

	attributeGetter := apiserver.NewRequestAttributeGetter(m.requestContextMapper, latest.GroupOrDie("").RESTMapper, "api")
	handler = apiserver.WithAuthorizationCheck(handler, attributeGetter, m.authorizer)

	// Install Authenticator
	if c.Authenticator != nil {
		authenticatedHandler, err := handlers.NewRequestAuthenticator(m.requestContextMapper, c.Authenticator, handlers.Unauthorized(c.SupportsBasicAuth), handler)
		if err != nil {
			glog.Fatalf("Could not initialize authenticator: %v", err)
		}
		handler = authenticatedHandler
	}

	// Install root web services
	m.handlerContainer.Add(m.rootWebService)   // 注册

	// TODO: Make this optional?  Consumers of master depend on this currently.
	m.Handler = handler

	if m.enableSwaggerSupport {
		m.InstallSwaggerAPI()
	}

	// After all wrapping is done, put a context filter around both handlers
	if handler, err := api.NewRequestContextFilter(m.requestContextMapper, m.Handler); err != nil {
		glog.Fatalf("Could not initialize request context filter: %v", err)
	} else {
		m.Handler = handler
	}

	if handler, err := api.NewRequestContextFilter(m.requestContextMapper, m.InsecureHandler); err != nil {
		glog.Fatalf("Could not initialize request context filter: %v", err)
	} else {
		m.InsecureHandler = handler
	}

	// TODO: Attempt clean shutdown?
	if m.enableCoreControllers {
		m.NewBootstrapController().Start()
	}
}

从master的初始化过程中可知,针对node、service、endpoint等相关的操作都通过了一个storage来进行处理保存,并且在每一个生成的storage中通过服务的get、post等路由操作,直接操作到了基于etcd存储的相关操作中。相关的操作的代码都位于pkg/registry目录下面。

API的注册流程
// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
// It is expected that the provided path root prefix will serve all operations. Root MUST NOT end
// in a slash.
func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
	installer := g.newInstaller()  // 生成一个注册者 
	ws := installer.NewWebService()   // 获取service
	apiResources, registrationErrors := installer.Install(ws)  // 注册路由信息
	// TODO: g.Version only contains "version" now, it will contain "group/version" in the near future.
	AddSupportedResourcesWebService(ws, g.Version, apiResources)
	container.Add(ws)   // 添加服务
	return errors.NewAggregate(registrationErrors)
}

主要的路由的写入工作都是由installer的Install流程。

// Installs handlers for API resources.
func (a *APIInstaller) Install(ws *restful.WebService) (apiResources []unversioned.APIResource, errors []error) {
	errors = make([]error, 0)

	proxyHandler := (&ProxyHandler{a.prefix + "/proxy/", a.group.Storage, a.group.Codec, a.group.Context, a.info})  

	// Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
	paths := make([]string, len(a.group.Storage))  // 生成对应大小的列表
	var i int = 0
	for path := range a.group.Storage {
		paths[i] = path
		i++
	}
	sort.Strings(paths)
	for _, path := range paths {
		apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws, proxyHandler)   // 注册路径信息
		if err != nil {
			errors = append(errors, err)
		}
		if apiResource != nil {
			apiResources = append(apiResources, *apiResource)
		}
	}
	return apiResources, errors
}

...

func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService, proxyHandler http.Handler) (*unversioned.APIResource, error) {
	...

	// what verbs are supported by the storage, used to know what verbs we support per path
	creater, isCreater := storage.(rest.Creater)  // 转换从storage对应过来的方法
	namedCreater, isNamedCreater := storage.(rest.NamedCreater)
	lister, isLister := storage.(rest.Lister)
	getter, isGetter := storage.(rest.Getter)
	getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
	deleter, isDeleter := storage.(rest.Deleter)
	gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
	updater, isUpdater := storage.(rest.Updater)
	patcher, isPatcher := storage.(rest.Patcher)
	watcher, isWatcher := storage.(rest.Watcher)
	_, isRedirector := storage.(rest.Redirector)
	connecter, isConnecter := storage.(rest.Connecter)
	storageMeta, isMetadata := storage.(rest.StorageMetadata)
	if !isMetadata {
		storageMeta = defaultStorageMetadata{}
	}

	.. 
  
	var ctxFn ContextFunc  // 生成传递的上下文信息
	ctxFn = func(req *restful.Request) api.Context {
		if context == nil {
			return api.NewContext()
		}
		if ctx, ok := context.Get(req.Request); ok {
			return ctx
		}
		return api.NewContext()
	}

	allowWatchList := isWatcher && isLister // watching on lists is allowed only for kinds that support both watch and list.
	scope := mapping.Scope  // 获取对应的参数信息
	nameParam := ws.PathParameter("name", "name of the "+kind).DataType("string")
	pathParam := ws.PathParameter("path", "path to the resource").DataType("string")
	params := []*restful.Parameter{}
	actions := []action{}

	var apiResource unversioned.APIResource
	// Get the list of actions for the given scope.
	switch scope.Name() {
	case meta.RESTScopeNameRoot:  // 获取对应的路径名称 根据传入的路径来判断不同的资源操作
		// Handle non-namespace scoped resources like nodes.
		resourcePath := resource
		resourceParams := params
		itemPath := resourcePath + "/{name}"
		nameParams := append(params, nameParam)
		proxyParams := append(nameParams, pathParam)
		if hasSubresource {
			itemPath = itemPath + "/" + subresource
			resourcePath = itemPath
			resourceParams = nameParams
		}
		apiResource.Name = path
		apiResource.Namespaced = false
		namer := rootScopeNaming{scope, a.group.Linker, gpath.Join(a.prefix, itemPath)}

		// Handler for standard REST verbs (GET, PUT, POST and DELETE).
		// Add actions at the resource path: /api/apiVersion/resource
		actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer}, isLister)  // 检查是否是list操作
		actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer}, isCreater)  // 是否是POST操作
		actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer}, allowWatchList)

		// Add actions at the item path: /api/apiVersion/resource/{name}
		actions = appendIf(actions, action{"GET", itemPath, nameParams, namer}, isGetter) // 是否get操作
		if getSubpath {
			actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer}, isGetter)
		}
		actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer}, isUpdater)
		actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer}, isPatcher)
		actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer}, isDeleter)
		actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer}, isWatcher)
		actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath + "/{path:*}", proxyParams, namer}, isRedirector)
		actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath, nameParams, namer}, isRedirector)
		actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer}, isConnecter)
		actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer}, isConnecter && connectSubpath)
		break
	case meta.RESTScopeNameNamespace:
		// Handler for standard REST verbs (GET, PUT, POST and DELETE).
		namespaceParam := ws.PathParameter(scope.ArgumentName(), scope.ParamDescription()).DataType("string")
		namespacedPath := scope.ParamName() + "/{" + scope.ArgumentName() + "}/" + resource
		namespaceParams := []*restful.Parameter{namespaceParam}

		resourcePath := namespacedPath
		resourceParams := namespaceParams
		itemPath := namespacedPath + "/{name}"
		nameParams := append(namespaceParams, nameParam)
		proxyParams := append(nameParams, pathParam)
		if hasSubresource {
			itemPath = itemPath + "/" + subresource
			resourcePath = itemPath
			resourceParams = nameParams
		}
		apiResource.Name = path
		apiResource.Namespaced = true
		namer := scopeNaming{scope, a.group.Linker, gpath.Join(a.prefix, itemPath), false}

		actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer}, isLister)
		actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer}, isCreater)
		// DEPRECATED
		actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer}, allowWatchList)

		actions = appendIf(actions, action{"GET", itemPath, nameParams, namer}, isGetter)
		if getSubpath {
			actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer}, isGetter)
		}
		actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer}, isUpdater)
		actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer}, isPatcher)
		actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer}, isDeleter)
		actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer}, isWatcher)
		actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath + "/{path:*}", proxyParams, namer}, isRedirector)
		actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath, nameParams, namer}, isRedirector)
		actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer}, isConnecter)
		actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer}, isConnecter && connectSubpath)

		// list or post across namespace.
		// For ex: LIST all pods in all namespaces by sending a LIST request at /api/apiVersion/pods.
		// TODO: more strongly type whether a resource allows these actions on "all namespaces" (bulk delete)
		if !hasSubresource {
			namer = scopeNaming{scope, a.group.Linker, gpath.Join(a.prefix, itemPath), true}
			actions = appendIf(actions, action{"LIST", resource, params, namer}, isLister)
			actions = appendIf(actions, action{"WATCHLIST", "watch/" + resource, params, namer}, allowWatchList)
		}
		break
	default:
		return nil, fmt.Errorf("unsupported restscope: %s", scope.Name())
	}

	...
  
	for _, action := range actions {  // 根据判断的actions来依次执行获取结果
		reqScope.Namer = action.Namer
		m := monitorFilter(action.Verb, resource)
		namespaced := ""
		if strings.Contains(action.Path, scope.ArgumentName()) {
			namespaced = "Namespaced"
		}
		switch action.Verb {
		case "GET": // Get a resource.
			var handler restful.RouteFunction
			if isGetterWithOptions {
				handler = GetResourceWithOptions(getterWithOptions, reqScope, getOptionsKind, getSubpath, getSubpathKey)
			} else {
				handler = GetResource(getter, reqScope)
			}
			doc := "read the specified " + kind
			if hasSubresource {
				doc = "read " + subresource + " of the specified " + kind
			}
			route := ws.GET(action.Path).To(handler).
				Filter(m).
				Doc(doc).
				Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
				Operation("read"+namespaced+kind+strings.Title(subresource)).
				Produces(append(storageMeta.ProducesMIMETypes(action.Verb), "application/json")...).
				Returns(http.StatusOK, "OK", versionedObject).
				Writes(versionedObject)
			if isGetterWithOptions {
				if err := addObjectParams(ws, route, versionedGetOptions); err != nil {
					return nil, err
				}
			}
			addParams(route, action.Params)
			ws.Route(route)
		case "LIST": // List all resources of a kind.
			doc := "list objects of kind " + kind
			if hasSubresource {
				doc = "list " + subresource + " of objects of kind " + kind
			}
			route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, false, a.minRequestTimeout)).
				Filter(m).
				Doc(doc).
				Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
				Operation("list"+namespaced+kind+strings.Title(subresource)).
				Produces("application/json").
				Returns(http.StatusOK, "OK", versionedList).
				Writes(versionedList)
			if err := addObjectParams(ws, route, versionedListOptions); err != nil {
				return nil, err
			}
			switch {
			case isLister && isWatcher:
				doc := "list or watch objects of kind " + kind
				if hasSubresource {
					doc = "list or watch " + subresource + " of objects of kind " + kind
				}
				route.Doc(doc)
			case isWatcher:
				doc := "watch objects of kind " + kind
				if hasSubresource {
					doc = "watch " + subresource + "of objects of kind " + kind
				}
				route.Doc(doc)
			}
			addParams(route, action.Params)
			ws.Route(route)
		case "PUT": // Update a resource.
			doc := "replace the specified " + kind
			if hasSubresource {
				doc = "replace " + subresource + " of the specified " + kind
			}
			route := ws.PUT(action.Path).To(UpdateResource(updater, reqScope, a.group.Typer, admit)).
				Filter(m).
				Doc(doc).
				Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
				Operation("replace"+namespaced+kind+strings.Title(subresource)).
				Produces(append(storageMeta.ProducesMIMETypes(action.Verb), "application/json")...).
				Returns(http.StatusOK, "OK", versionedObject).
				Reads(versionedObject).
				Writes(versionedObject)
			addParams(route, action.Params)
			ws.Route(route)
		case "PATCH": // Partially update a resource
			doc := "partially update the specified " + kind
			if hasSubresource {
				doc = "partially update " + subresource + " of the specified " + kind
			}
			route := ws.PATCH(action.Path).To(PatchResource(patcher, reqScope, a.group.Typer, admit, mapping.ObjectConvertor)).
				Filter(m).
				Doc(doc).
				Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
				Consumes(string(api.JSONPatchType), string(api.MergePatchType), string(api.StrategicMergePatchType)).
				Operation("patch"+namespaced+kind+strings.Title(subresource)).
				Produces(append(storageMeta.ProducesMIMETypes(action.Verb), "application/json")...).
				Returns(http.StatusOK, "OK", versionedObject).
				Reads(unversioned.Patch{}).
				Writes(versionedObject)
			addParams(route, action.Params)
			ws.Route(route)
		case "POST": // Create a resource.
			var handler restful.RouteFunction
			if isNamedCreater {
				handler = CreateNamedResource(namedCreater, reqScope, a.group.Typer, admit)
			} else {
				handler = CreateResource(creater, reqScope, a.group.Typer, admit)
			}
			doc := "create a " + kind
			if hasSubresource {
				doc = "create " + subresource + " of a " + kind
			}
			route := ws.POST(action.Path).To(handler).
				Filter(m).
				Doc(doc).
				Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
				Operation("create"+namespaced+kind+strings.Title(subresource)).
				Produces(append(storageMeta.ProducesMIMETypes(action.Verb), "application/json")...).
				Returns(http.StatusOK, "OK", versionedObject).
				Reads(versionedObject).
				Writes(versionedObject)
			addParams(route, action.Params)
			ws.Route(route)
		case "DELETE": // Delete a resource.
			doc := "delete a " + kind
			if hasSubresource {
				doc = "delete " + subresource + " of a " + kind
			}
			route := ws.DELETE(action.Path).To(DeleteResource(gracefulDeleter, isGracefulDeleter, reqScope, admit)).
				Filter(m).
				Doc(doc).
				Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
				Operation("delete"+namespaced+kind+strings.Title(subresource)).
				Produces(append(storageMeta.ProducesMIMETypes(action.Verb), "application/json")...).
				Writes(versionedStatus).
				Returns(http.StatusOK, "OK", versionedStatus)
			if isGracefulDeleter {
				route.Reads(versionedDeleterObject)
			}
			addParams(route, action.Params)
			ws.Route(route)
		// TODO: deprecated
		case "WATCH": // Watch a resource.
			doc := "watch changes to an object of kind " + kind
			if hasSubresource {
				doc = "watch changes to " + subresource + " of an object of kind " + kind
			}
			route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, true, a.minRequestTimeout)).
				Filter(m).
				Doc(doc).
				Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
				Operation("watch"+namespaced+kind+strings.Title(subresource)).
				Produces("application/json").
				Returns(http.StatusOK, "OK", watchjson.WatchEvent{}).
				Writes(watchjson.WatchEvent{})
			if err := addObjectParams(ws, route, versionedListOptions); err != nil {
				return nil, err
			}
			addParams(route, action.Params)
			ws.Route(route)
		// TODO: deprecated
		case "WATCHLIST": // Watch all resources of a kind.
			doc := "watch individual changes to a list of " + kind
			if hasSubresource {
				doc = "watch individual changes to a list of " + subresource + " of " + kind
			}
			route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, true, a.minRequestTimeout)).
				Filter(m).
				Doc(doc).
				Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
				Operation("watch"+namespaced+kind+strings.Title(subresource)+"List").
				Produces("application/json").
				Returns(http.StatusOK, "OK", watchjson.WatchEvent{}).
				Writes(watchjson.WatchEvent{})
			if err := addObjectParams(ws, route, versionedListOptions); err != nil {
				return nil, err
			}
			addParams(route, action.Params)
			ws.Route(route)
		case "PROXY": // Proxy requests to a resource.
			// Accept all methods as per http://issue.k8s.io/3996
			addProxyRoute(ws, "GET", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params)
			addProxyRoute(ws, "PUT", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params)
			addProxyRoute(ws, "POST", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params)
			addProxyRoute(ws, "DELETE", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params)
			addProxyRoute(ws, "HEAD", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params)
			addProxyRoute(ws, "OPTIONS", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params)
		case "CONNECT":
			for _, method := range connecter.ConnectMethods() {
				doc := "connect " + method + " requests to " + kind
				if hasSubresource {
					doc = "connect " + method + " requests to " + subresource + " of " + kind
				}
				route := ws.Method(method).Path(action.Path).
					To(ConnectResource(connecter, reqScope, admit, connectOptionsKind, path, connectSubpath, connectSubpathKey)).
					Filter(m).
					Doc(doc).
					Operation("connect" + strings.Title(strings.ToLower(method)) + namespaced + kind + strings.Title(subresource)).
					Produces("*/*").
					Consumes("*/*").
					Writes("string")
				if versionedConnectOptions != nil {
					if err := addObjectParams(ws, route, versionedConnectOptions); err != nil {
						return nil, err
					}
				}
				addParams(route, action.Params)
				ws.Route(route)
			}
		default:
			return nil, fmt.Errorf("unrecognized action verb: %s", action.Verb)
		}
		// Note: update GetAttribs() when adding a custom handler.
	}
	return &apiResource, nil
}

从路由的注册流程来看,实现还是相对复杂,但是最终都是根据匹配后的路由信息,调用传入的storage的对应的创建,删除等方法。

etcd的操作流程

根据上文中传入的storage类型,分解的相关的方法,就是调用在初始化过程中生成的实例。例如像endpoint对应的storage代码如下。

// NewREST returns a RESTStorage object that will work against endpoints.
func NewREST(s storage.Interface, useCacher bool) *REST {
	prefix := "/services/endpoints"

	storageInterface := s
	if useCacher {
		config := storage.CacherConfig{
			CacheCapacity:  1000,
			Storage:        s,
			Type:           &api.Endpoints{},
			ResourcePrefix: prefix,
			KeyFunc: func(obj runtime.Object) (string, error) {
				return storage.NamespaceKeyFunc(prefix, obj)
			},
			NewListFunc: func() runtime.Object { return &api.EndpointsList{} },
		}
		storageInterface = storage.NewCacher(config)
	}

	store := &etcdgeneric.Etcd{  // 生成etcd的对应的方法
		NewFunc:     func() runtime.Object { return &api.Endpoints{} },
		NewListFunc: func() runtime.Object { return &api.EndpointsList{} },
		KeyRootFunc: func(ctx api.Context) string {
			return etcdgeneric.NamespaceKeyRootFunc(ctx, prefix)
		},
		KeyFunc: func(ctx api.Context, name string) (string, error) {
			return etcdgeneric.NamespaceKeyFunc(ctx, prefix, name)
		},
		ObjectNameFunc: func(obj runtime.Object) (string, error) {
			return obj.(*api.Endpoints).Name, nil
		},
		PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher {
			return endpoint.MatchEndpoints(label, field)
		},
		EndpointName: "endpoints",   // 设置endpoints前缀

		CreateStrategy: endpoint.Strategy,  // 设置更新策略和更新策略
		UpdateStrategy: endpoint.Strategy,

		Storage: storageInterface,   // 设置保存的接口
	}
	return &REST{store}
}



...


// Create inserts a new item according to the unique key from the object.
func (e *Etcd) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) {
	trace := util.NewTrace("Create " + reflect.TypeOf(obj).String())
	defer trace.LogIfLong(time.Second)
	if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
		return nil, err
	}
	name, err := e.ObjectNameFunc(obj)  // 获取对象名称
	if err != nil {
		return nil, err
	}
	key, err := e.KeyFunc(ctx, name)   // 调用key的类型
	if err != nil {
		return nil, err
	}
	ttl, err := e.calculateTTL(obj, 0, false)
	if err != nil {
		return nil, err
	}
	trace.Step("About to create object")
	out := e.NewFunc()   	
	if err := e.Storage.Create(key, obj, out, ttl); err != nil {  // 创建数据
		err = etcderr.InterpretCreateError(err, e.EndpointName, name)
		err = rest.CheckGeneratedNameError(e.CreateStrategy, err, obj)
		return nil, err
	}
	trace.Step("Object created")
	if e.AfterCreate != nil {
		if err := e.AfterCreate(out); err != nil {  // 检查是否有回调函数如果有则执行
			return nil, err
		}
	}
	if e.Decorator != nil {
		if err := e.Decorator(obj); err != nil {
			return nil, err
		}
	}
	return out, nil
}

...

// Get retrieves the item from etcd.
func (e *Etcd) Get(ctx api.Context, name string) (runtime.Object, error) {
	obj := e.NewFunc()
	trace := util.NewTrace("Get " + reflect.TypeOf(obj).String())  
	defer trace.LogIfLong(time.Second)
	key, err := e.KeyFunc(ctx, name)    // 获取key的执行方法
	if err != nil {
		return nil, err
	}
	trace.Step("About to read object")
	if err := e.Storage.Get(key, obj, false); err != nil {  // 查询key的内容
		return nil, etcderr.InterpretGetError(err, e.EndpointName, name)
	}
	trace.Step("Object read")
	if e.Decorator != nil {
		if err := e.Decorator(obj); err != nil {
			return nil, err
		}
	}
	return obj, nil
}

通过路由到存储的过程中,大概的过程就是如上所示,整个的代码的流程相对比较繁琐,不过实现起来确实非常的优秀。

总结

通过前几篇的k8s的基础概念的学习,本文的apiserver的实现的主要逻辑相对比较简单,但是实现起来比较有扩展性,故代码的设计上面相对繁多,即最核心的流程就是将API上对应的操作逻辑映射到etcd中,并实现一定的业务逻辑。由于本人才疏学浅,如有错误请批评指正。

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐