k8s源码分析-----kubelet启动流程分析
k8s源码分析-----kubelet启动流程分析
源码为k8s v1.3.0稳定版本
一、代码结构
(1) 代码起始位置
kubernetes\cmd\kubelet\kubelet.go
(2) 业务逻辑代码
二 、初始化启动过程
(1) main函数的处理
main函数的过程,基本属于k8s的统一风格,代码位于 kubernetes\cmd\kubelet\kubelet.go中
关注一下NewKubeletServer
// NewKubeletServer will create a new KubeletServer with default values. func NewKubeletServer() *KubeletServer { glog.Errorf("NewKubeletServer is called") config := componentconfig.KubeletConfiguration{} api.Scheme.Convert(&v1alpha1.KubeletConfiguration{}, &config) return &KubeletServer{ AuthPath: util.NewStringFlag("/var/lib/kubelet/kubernetes_auth"), // deprecated KubeConfig: util.NewStringFlag("/var/lib/kubelet/kubeconfig"), KubeletConfiguration: config, } }
// KubeletServer encapsulates all of the parameters necessary for starting up // a kubelet. These can either be set via command line or directly. type KubeletServer struct { componentconfig.KubeletConfiguration AuthPath util.StringFlag // Deprecated -- use KubeConfig instead KubeConfig util.StringFlag APIServerList []string RunOnce bool // Insert a probability of random errors during calls to the master. ChaosChance float64 // Crash immediately, rather than eating panics. ReallyCrashForTesting bool }
KubeletConfiguration的定义:
type KubeletConfiguration struct { unversioned.TypeMeta // config is the path to the config file or directory of files Config string `json:"config"` // syncFrequency is the max period between synchronizing running // containers and config SyncFrequency unversioned.Duration `json:"syncFrequency"` // fileCheckFrequency is the duration between checking config files for // new data FileCheckFrequency unversioned.Duration `json:"fileCheckFrequency"` // httpCheckFrequency is the duration between checking http for new data HTTPCheckFrequency unversioned.Duration `json:"httpCheckFrequency"` // manifestURL is the URL for accessing the container manifest ManifestURL string `json:"manifestURL"` // manifestURLHeader is the HTTP header to use when accessing the manifest // URL, with the key separated from the value with a ':', as in 'key:value' ManifestURLHeader string `json:"manifestURLHeader"` // enableServer enables the Kubelet's server EnableServer bool `json:"enableServer"` // address is the IP address for the Kubelet to serve on (set to 0.0.0.0 // for all interfaces) Address string `json:"address"` // port is the port for the Kubelet to serve on. Port int32 `json:"port"` // readOnlyPort is the read-only port for the Kubelet to serve on with // no authentication/authorization (set to 0 to disable) ReadOnlyPort int32 `json:"readOnlyPort"` // tlsCertFile is the file containing x509 Certificate for HTTPS. (CA cert, // if any, concatenated after server cert). If tlsCertFile and // tlsPrivateKeyFile are not provided, a self-signed certificate // and key are generated for the public address and saved to the directory // passed to certDir. TLSCertFile string `json:"tlsCertFile"` // tlsPrivateKeyFile is the ile containing x509 private key matching // tlsCertFile. TLSPrivateKeyFile string `json:"tlsPrivateKeyFile"` // certDirectory is the directory where the TLS certs are located (by // default /var/run/kubernetes). If tlsCertFile and tlsPrivateKeyFile // are provided, this flag will be ignored. CertDirectory string `json:"certDirectory"` // hostnameOverride is the hostname used to identify the kubelet instead // of the actual hostname. HostnameOverride string `json:"hostnameOverride"` // podInfraContainerImage is the image whose network/ipc namespaces // containers in each pod will use. PodInfraContainerImage string `json:"podInfraContainerImage"` // dockerEndpoint is the path to the docker endpoint to communicate with. DockerEndpoint string `json:"dockerEndpoint"` // rootDirectory is the directory path to place kubelet files (volume // mounts,etc). RootDirectory string `json:"rootDirectory"` // seccompProfileRoot is the directory path for seccomp profiles. SeccompProfileRoot string `json:"seccompProfileRoot"` // allowPrivileged enables containers to request privileged mode. // Defaults to false. AllowPrivileged bool `json:"allowPrivileged"` // hostNetworkSources is a comma-separated list of sources from which the // Kubelet allows pods to use of host network. Defaults to "*". Valid // options are "file", "http", "api", and "*" (all sources). HostNetworkSources []string `json:"hostNetworkSources"` // hostPIDSources is a comma-separated list of sources from which the // Kubelet allows pods to use the host pid namespace. Defaults to "*". HostPIDSources []string `json:"hostPIDSources"` // hostIPCSources is a comma-separated list of sources from which the // Kubelet allows pods to use the host ipc namespace. Defaults to "*". HostIPCSources []string `json:"hostIPCSources"` // registryPullQPS is the limit of registry pulls per second. If 0, // unlimited. Set to 0 for no limit. Defaults to 5.0. RegistryPullQPS int32 `json:"registryPullQPS"` // registryBurst is the maximum size of a bursty pulls, temporarily allows // pulls to burst to this number, while still not exceeding registryQps. // Only used if registryQPS > 0. RegistryBurst int32 `json:"registryBurst"` // eventRecordQPS is the maximum event creations per second. If 0, there // is no limit enforced. EventRecordQPS int32 `json:"eventRecordQPS"` // eventBurst is the maximum size of a bursty event records, temporarily // allows event records to burst to this number, while still not exceeding // event-qps. Only used if eventQps > 0 EventBurst int32 `json:"eventBurst"` // enableDebuggingHandlers enables server endpoints for log collection // and local running of containers and commands EnableDebuggingHandlers bool `json:"enableDebuggingHandlers"` // minimumGCAge is the minimum age for a finished container before it is // garbage collected. MinimumGCAge unversioned.Duration `json:"minimumGCAge"` // maxPerPodContainerCount is the maximum number of old instances to // retain per container. Each container takes up some disk space. MaxPerPodContainerCount int32 `json:"maxPerPodContainerCount"` // maxContainerCount is the maximum number of old instances of containers // to retain globally. Each container takes up some disk space. MaxContainerCount int32 `json:"maxContainerCount"` // cAdvisorPort is the port of the localhost cAdvisor endpoint CAdvisorPort int32 `json:"cAdvisorPort"` // healthzPort is the port of the localhost healthz endpoint HealthzPort int32 `json:"healthzPort"` // healthzBindAddress is the IP address for the healthz server to serve // on. HealthzBindAddress string `json:"healthzBindAddress"` // oomScoreAdj is The oom-score-adj value for kubelet process. Values // must be within the range [-1000, 1000]. OOMScoreAdj int32 `json:"oomScoreAdj"` // registerNode enables automatic registration with the apiserver. RegisterNode bool `json:"registerNode"` // clusterDomain is the DNS domain for this cluster. If set, kubelet will // configure all containers to search this domain in addition to the // host's search domains. ClusterDomain string `json:"clusterDomain"` // masterServiceNamespace is The namespace from which the kubernetes // master services should be injected into pods. MasterServiceNamespace string `json:"masterServiceNamespace"` // clusterDNS is the IP address for a cluster DNS server. If set, kubelet // will configure all containers to use this for DNS resolution in // addition to the host's DNS servers ClusterDNS string `json:"clusterDNS"` // streamingConnectionIdleTimeout is the maximum time a streaming connection // can be idle before the connection is automatically closed. StreamingConnectionIdleTimeout unversioned.Duration `json:"streamingConnectionIdleTimeout"` // nodeStatusUpdateFrequency is the frequency that kubelet posts node // status to master. Note: be cautious when changing the constant, it // must work with nodeMonitorGracePeriod in nodecontroller. NodeStatusUpdateFrequency unversioned.Duration `json:"nodeStatusUpdateFrequency"` // imageMinimumGCAge is the minimum age for a unused image before it is // garbage collected. ImageMinimumGCAge unversioned.Duration `json:"imageMinimumGCAge"` // imageGCHighThresholdPercent is the percent of disk usage after which // image garbage collection is always run. ImageGCHighThresholdPercent int32 `json:"imageGCHighThresholdPercent"` // imageGCLowThresholdPercent is the percent of disk usage before which // image garbage collection is never run. Lowest disk usage to garbage // collect to. ImageGCLowThresholdPercent int32 `json:"imageGCLowThresholdPercent"` // lowDiskSpaceThresholdMB is the absolute free disk space, in MB, to // maintain. When disk space falls below this threshold, new pods would // be rejected. LowDiskSpaceThresholdMB int32 `json:"lowDiskSpaceThresholdMB"` // How frequently to calculate and cache volume disk usage for all pods VolumeStatsAggPeriod unversioned.Duration `json:"volumeStatsAggPeriod"` // networkPluginName is the name of the network plugin to be invoked for // various events in kubelet/pod lifecycle NetworkPluginName string `json:"networkPluginName"` // networkPluginDir is the full path of the directory in which to search // for network plugins NetworkPluginDir string `json:"networkPluginDir"` // volumePluginDir is the full path of the directory in which to search // for additional third party volume plugins VolumePluginDir string `json:"volumePluginDir"` // cloudProvider is the provider for cloud services. CloudProvider string `json:"cloudProvider,omitempty"` // cloudConfigFile is the path to the cloud provider configuration file. CloudConfigFile string `json:"cloudConfigFile,omitempty"` // KubeletCgroups is the absolute name of cgroups to isolate the kubelet in. KubeletCgroups string `json:"kubeletCgroups,omitempty"` // Enable QoS based Cgroup hierarchy: top level cgroups for QoS Classes // And all Burstable and BestEffort pods are brought up under their // specific top level QoS cgroup. CgroupsPerQOS bool `json:"CgroupsPerQOS,omitempty"` // Cgroups that container runtime is expected to be isolated in. RuntimeCgroups string `json:"runtimeCgroups,omitempty"` // SystemCgroups is absolute name of cgroups in which to place // all non-kernel processes that are not already in a container. Empty // for no container. Rolling back the flag requires a reboot. SystemCgroups string `json:"systemCgroups,omitempty"` // CgroupRoot is the root cgroup to use for pods. // If CgroupsPerQOS is enabled, this is the root of the QoS cgroup hierarchy. CgroupRoot string `json:"cgroupRoot,omitempty"` // containerRuntime is the container runtime to use. ContainerRuntime string `json:"containerRuntime"` // runtimeRequestTimeout is the timeout for all runtime requests except long running // requests - pull, logs, exec and attach. RuntimeRequestTimeout unversioned.Duration `json:"runtimeRequestTimeout,omitempty"` // rktPath is the path of rkt binary. Leave empty to use the first rkt in // $PATH. RktPath string `json:"rktPath,omitempty"` // rktApiEndpoint is the endpoint of the rkt API service to communicate with. RktAPIEndpoint string `json:"rktAPIEndpoint,omitempty"` // rktStage1Image is the image to use as stage1. Local paths and // http/https URLs are supported. RktStage1Image string `json:"rktStage1Image,omitempty"` // lockFilePath is the path that kubelet will use to as a lock file. // It uses this file as a lock to synchronize with other kubelet processes // that may be running. LockFilePath string `json:"lockFilePath"` // ExitOnLockContention is a flag that signifies to the kubelet that it is running // in "bootstrap" mode. This requires that 'LockFilePath' has been set. // This will cause the kubelet to listen to inotify events on the lock file, // releasing it and exiting when another process tries to open that file. ExitOnLockContention bool `json:"exitOnLockContention"` // configureCBR0 enables the kublet to configure cbr0 based on // Node.Spec.PodCIDR. ConfigureCBR0 bool `json:"configureCbr0"` // How should the kubelet configure the container bridge for hairpin packets. // Setting this flag allows endpoints in a Service to loadbalance back to // themselves if they should try to access their own Service. Values: // "promiscuous-bridge": make the container bridge promiscuous. // "hairpin-veth": set the hairpin flag on container veth interfaces. // "none": do nothing. // Setting --configure-cbr0 to false implies that to achieve hairpin NAT // one must set --hairpin-mode=veth-flag, because bridge assumes the // existence of a container bridge named cbr0. HairpinMode string `json:"hairpinMode"` // The node has babysitter process monitoring docker and kubelet. BabysitDaemons bool `json:"babysitDaemons"` // maxPods is the number of pods that can run on this Kubelet. MaxPods int32 `json:"maxPods"` // nvidiaGPUs is the number of NVIDIA GPU devices on this node. NvidiaGPUs int32 `json:"nvidiaGPUs"` // dockerExecHandlerName is the handler to use when executing a command // in a container. Valid values are 'native' and 'nsenter'. Defaults to // 'native'. DockerExecHandlerName string `json:"dockerExecHandlerName"` // The CIDR to use for pod IP addresses, only used in standalone mode. // In cluster mode, this is obtained from the master. PodCIDR string `json:"podCIDR"` // ResolverConfig is the resolver configuration file used as the basis // for the container DNS resolution configuration."), [] ResolverConfig string `json:"resolvConf"` // cpuCFSQuota is Enable CPU CFS quota enforcement for containers that // specify CPU limits CPUCFSQuota bool `json:"cpuCFSQuota"` // containerized should be set to true if kubelet is running in a container. Containerized bool `json:"containerized"` // maxOpenFiles is Number of files that can be opened by Kubelet process. MaxOpenFiles int64 `json:"maxOpenFiles"` // reconcileCIDR is Reconcile node CIDR with the CIDR specified by the // API server. No-op if register-node or configure-cbr0 is false. ReconcileCIDR bool `json:"reconcileCIDR"` // registerSchedulable tells the kubelet to register the node as // schedulable. No-op if register-node is false. RegisterSchedulable bool `json:"registerSchedulable"` // contentType is contentType of requests sent to apiserver. ContentType string `json:"contentType"` // kubeAPIQPS is the QPS to use while talking with kubernetes apiserver KubeAPIQPS int32 `json:"kubeAPIQPS"` // kubeAPIBurst is the burst to allow while talking with kubernetes // apiserver KubeAPIBurst int32 `json:"kubeAPIBurst"` // serializeImagePulls when enabled, tells the Kubelet to pull images one // at a time. We recommend *not* changing the default value on nodes that // run docker daemon with version < 1.9 or an Aufs storage backend. // Issue #10959 has more details. SerializeImagePulls bool `json:"serializeImagePulls"` // experimentalFlannelOverlay enables experimental support for starting the // kubelet with the default overlay network (flannel). Assumes flanneld // is already running in client mode. ExperimentalFlannelOverlay bool `json:"experimentalFlannelOverlay"` // outOfDiskTransitionFrequency is duration for which the kubelet has to // wait before transitioning out of out-of-disk node condition status. OutOfDiskTransitionFrequency unversioned.Duration `json:"outOfDiskTransitionFrequency,omitempty"` // nodeIP is IP address of the node. If set, kubelet will use this IP // address for the node. NodeIP string `json:"nodeIP,omitempty"` // nodeLabels to add when registering the node in the cluster. NodeLabels map[string]string `json:"nodeLabels"` // nonMasqueradeCIDR configures masquerading: traffic to IPs outside this range will use IP masquerade. NonMasqueradeCIDR string `json:"nonMasqueradeCIDR"` // enable gathering custom metrics. EnableCustomMetrics bool `json:"enableCustomMetrics"` // Comma-delimited list of hard eviction expressions. For example, 'memory.available<300Mi'. EvictionHard string `json:"evictionHard,omitempty"` // Comma-delimited list of soft eviction expressions. For example, 'memory.available<300Mi'. EvictionSoft string `json:"evictionSoft,omitempty"` // Comma-delimeted list of grace periods for each soft eviction signal. For example, 'memory.available=30s'. EvictionSoftGracePeriod string `json:"evictionSoftGracePeriod,omitempty"` // Duration for which the kubelet has to wait before transitioning out of an eviction pressure condition. EvictionPressureTransitionPeriod unversioned.Duration `json:"evictionPressureTransitionPeriod,omitempty"` // Maximum allowed grace period (in seconds) to use when terminating pods in response to a soft eviction threshold being met. EvictionMaxPodGracePeriod int32 `json:"evictionMaxPodGracePeriod,omitempty"` // Maximum number of pods per core. Cannot exceed MaxPods PodsPerCore int32 `json:"podsPerCore"` // enableControllerAttachDetach enables the Attach/Detach controller to // manage attachment/detachment of volumes scheduled to this node, and // disables kubelet from executing any attach/detach operations EnableControllerAttachDetach bool `json:"enableControllerAttachDetach"` // A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=150G) pairs // that describe resources reserved for non-kubernetes components. // Currently only cpu and memory are supported. [default=none] // See http://releases.k8s.io/HEAD/docs/user-guide/compute-resources.md for more detail. SystemReserved utilconfig.ConfigurationMap `json:"systemReserved"` // A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=150G) pairs // that describe resources reserved for kubernetes system components. // Currently only cpu and memory are supported. [default=none] // See http://releases.k8s.io/HEAD/docs/user-guide/compute-resources.md for more detail. KubeReserved utilconfig.ConfigurationMap `json:"kubeReserved"` }
(2) 进入到Run函数中:
// Run runs the specified KubeletServer for the given KubeletConfig. This should never exit. // The kcfg argument may be nil - if so, it is initialized from the settings on KubeletServer. // Otherwise, the caller is assumed to have set up the KubeletConfig object and all defaults // will be ignored. func Run(s *options.KubeletServer, kcfg *KubeletConfig) error { glog.Errorf(" begin to KubeletServer Run.") err := run(s, kcfg) if err != nil { glog.Errorf("Failed running kubelet: %v", err) }else { glog.Errorf(" running kubelet succeed.") } return err }
func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) { if s.ExitOnLockContention && s.LockFilePath == "" { return errors.New("cannot exit on lock file contention: no lock file specified") } done := make(chan struct{}) if s.LockFilePath != "" { glog.Infof("aquiring lock on %q", s.LockFilePath) if err := flock.Acquire(s.LockFilePath); err != nil { return fmt.Errorf("unable to aquire file lock on %q: %v", s.LockFilePath, err) } if s.ExitOnLockContention { glog.Infof("watching for inotify events for: %v", s.LockFilePath) if err := watchForLockfileContention(s.LockFilePath, done); err != nil { return err } } } if c, err := configz.New("componentconfig"); err == nil { c.Set(s.KubeletConfiguration) } else { glog.Errorf("unable to register configz: %s", err) } if kcfg == nil { cfg, err := UnsecuredKubeletConfig(s) if err != nil { return err } kcfg = cfg clientConfig, err := CreateAPIServerClientConfig(s) if err == nil { kcfg.KubeClient, err = clientset.NewForConfig(clientConfig) // make a separate client for events eventClientConfig := *clientConfig eventClientConfig.QPS = float32(s.EventRecordQPS) eventClientConfig.Burst = int(s.EventBurst) kcfg.EventClient, err = clientset.NewForConfig(&eventClientConfig) } if err != nil && len(s.APIServerList) > 0 { glog.Warningf("No API client: %v", err) } if s.CloudProvider == kubeExternal.AutoDetectCloudProvider { kcfg.AutoDetectCloudProvider = true } else { cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) if err != nil { return err } glog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile) kcfg.Cloud = cloud } } if kcfg.CAdvisorInterface == nil { kcfg.CAdvisorInterface, err = cadvisor.New(uint(s.CAdvisorPort), kcfg.ContainerRuntime) if err != nil { return err } } if kcfg.ContainerManager == nil { if kcfg.SystemCgroups != "" && kcfg.CgroupRoot == "" { return fmt.Errorf("invalid configuration: system container was specified and cgroup root was not specified") } kcfg.ContainerManager, err = cm.NewContainerManager(kcfg.Mounter, kcfg.CAdvisorInterface, cm.NodeConfig{ RuntimeCgroupsName: kcfg.RuntimeCgroups, SystemCgroupsName: kcfg.SystemCgroups, KubeletCgroupsName: kcfg.KubeletCgroups, ContainerRuntime: kcfg.ContainerRuntime, CgroupsPerQOS: kcfg.CgroupsPerQOS, CgroupRoot: kcfg.CgroupRoot, }) if err != nil { return err } } runtime.ReallyCrash = s.ReallyCrashForTesting rand.Seed(time.Now().UTC().UnixNano()) // TODO(vmarmol): Do this through container config. oomAdjuster := kcfg.OOMAdjuster if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil { glog.Warning(err) } if err := RunKubelet(kcfg); err != nil { return err } if s.HealthzPort > 0 { healthz.DefaultHealthz() go wait.Until(func() { err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), nil) if err != nil { glog.Errorf("Starting health server failed: %v", err) } }, 5*time.Second, wait.NeverStop) } if s.RunOnce { return nil } <-done return nil }
run函数的主要步骤有:
step1: 创建一个done的channel,实现主Goroutine的运行
step2:KubeletConfiguration的转换
step3: UnsecuredKubeletConfig调用,里面创建KubeletConfig(重点)
step4:CreateAPIServerClientConfig
step5: CloudProvider的初始化
step6: cadvisor对象的初始化
step7: NewContainerManager
step8: oomAdjuster对象的初始化
step9: RunKubelet (下一级的执行函数)
step10: healthz.DefaultHealthz (这里启动了一个协程)
(3) UnsecuredKubeletConfig分析
// UnsecuredKubeletConfig returns a KubeletConfig suitable for being run, or an error if the server setup // is not valid. It will not start any background processes, and does not include authentication/authorization func UnsecuredKubeletConfig(s *options.KubeletServer) (*KubeletConfig, error) { hostNetworkSources, err := kubetypes.GetValidatedSources(s.HostNetworkSources) if err != nil { return nil, err } hostPIDSources, err := kubetypes.GetValidatedSources(s.HostPIDSources) if err != nil { return nil, err } hostIPCSources, err := kubetypes.GetValidatedSources(s.HostIPCSources) if err != nil { return nil, err } mounter := mount.New() var writer io.Writer = &io.StdWriter{} if s.Containerized { glog.V(2).Info("Running kubelet in containerized mode (experimental)") mounter = mount.NewNsenterMounter() writer = &io.NsenterWriter{} } tlsOptions, err := InitializeTLS(s) if err != nil { return nil, err } var dockerExecHandler dockertools.ExecHandler switch s.DockerExecHandlerName { case "native": dockerExecHandler = &dockertools.NativeExecHandler{} case "nsenter": dockerExecHandler = &dockertools.NsenterExecHandler{} default: glog.Warningf("Unknown Docker exec handler %q; defaulting to native", s.DockerExecHandlerName) dockerExecHandler = &dockertools.NativeExecHandler{} } imageGCPolicy := kubelet.ImageGCPolicy{ MinAge: s.ImageMinimumGCAge.Duration, HighThresholdPercent: int(s.ImageGCHighThresholdPercent), LowThresholdPercent: int(s.ImageGCLowThresholdPercent), } diskSpacePolicy := kubelet.DiskSpacePolicy{ DockerFreeDiskMB: int(s.LowDiskSpaceThresholdMB), RootFreeDiskMB: int(s.LowDiskSpaceThresholdMB), } manifestURLHeader := make(http.Header) if s.ManifestURLHeader != "" { pieces := strings.Split(s.ManifestURLHeader, ":") if len(pieces) != 2 { return nil, fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", s.ManifestURLHeader) } manifestURLHeader.Set(pieces[0], pieces[1]) } reservation, err := parseReservation(s.KubeReserved, s.SystemReserved) if err != nil { return nil, err } thresholds, err := eviction.ParseThresholdConfig(s.EvictionHard, s.EvictionSoft, s.EvictionSoftGracePeriod) if err != nil { return nil, err } evictionConfig := eviction.Config{ PressureTransitionPeriod: s.EvictionPressureTransitionPeriod.Duration, MaxPodGracePeriodSeconds: int64(s.EvictionMaxPodGracePeriod), Thresholds: thresholds, } return &KubeletConfig{ Address: net.ParseIP(s.Address), AllowPrivileged: s.AllowPrivileged, Auth: nil, // default does not enforce auth[nz] CAdvisorInterface: nil, // launches background processes, not set here VolumeStatsAggPeriod: s.VolumeStatsAggPeriod.Duration, CgroupRoot: s.CgroupRoot, Cloud: nil, // cloud provider might start background processes ClusterDNS: net.ParseIP(s.ClusterDNS), ClusterDomain: s.ClusterDomain, ConfigFile: s.Config, ConfigureCBR0: s.ConfigureCBR0, ContainerManager: nil, ContainerRuntime: s.ContainerRuntime, RuntimeRequestTimeout: s.RuntimeRequestTimeout.Duration, CPUCFSQuota: s.CPUCFSQuota, DiskSpacePolicy: diskSpacePolicy, DockerClient: dockertools.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration), // TODO(random-liu): Set RuntimeRequestTimeout for rkt. RuntimeCgroups: s.RuntimeCgroups, DockerExecHandler: dockerExecHandler, EnableControllerAttachDetach: s.EnableControllerAttachDetach, EnableCustomMetrics: s.EnableCustomMetrics, EnableDebuggingHandlers: s.EnableDebuggingHandlers, CgroupsPerQOS: s.CgroupsPerQOS, EnableServer: s.EnableServer, EventBurst: int(s.EventBurst), EventRecordQPS: float32(s.EventRecordQPS), FileCheckFrequency: s.FileCheckFrequency.Duration, HostnameOverride: s.HostnameOverride, HostNetworkSources: hostNetworkSources, HostPIDSources: hostPIDSources, HostIPCSources: hostIPCSources, HTTPCheckFrequency: s.HTTPCheckFrequency.Duration, ImageGCPolicy: imageGCPolicy, KubeClient: nil, ManifestURL: s.ManifestURL, ManifestURLHeader: manifestURLHeader, MasterServiceNamespace: s.MasterServiceNamespace, MaxContainerCount: int(s.MaxContainerCount), MaxOpenFiles: uint64(s.MaxOpenFiles), MaxPerPodContainerCount: int(s.MaxPerPodContainerCount), MaxPods: int(s.MaxPods), NvidiaGPUs: int(s.NvidiaGPUs), MinimumGCAge: s.MinimumGCAge.Duration, Mounter: mounter, NetworkPluginName: s.NetworkPluginName, NetworkPlugins: ProbeNetworkPlugins(s.NetworkPluginDir), NodeLabels: s.NodeLabels, NodeStatusUpdateFrequency: s.NodeStatusUpdateFrequency.Duration, NonMasqueradeCIDR: s.NonMasqueradeCIDR, OOMAdjuster: oom.NewOOMAdjuster(), OSInterface: kubecontainer.RealOS{}, PodCIDR: s.PodCIDR, ReconcileCIDR: s.ReconcileCIDR, PodInfraContainerImage: s.PodInfraContainerImage, Port: uint(s.Port), ReadOnlyPort: uint(s.ReadOnlyPort), RegisterNode: s.RegisterNode, RegisterSchedulable: s.RegisterSchedulable, RegistryBurst: int(s.RegistryBurst), RegistryPullQPS: float64(s.RegistryPullQPS), ResolverConfig: s.ResolverConfig, Reservation: *reservation, KubeletCgroups: s.KubeletCgroups, RktPath: s.RktPath, RktAPIEndpoint: s.RktAPIEndpoint, RktStage1Image: s.RktStage1Image, RootDirectory: s.RootDirectory, SeccompProfileRoot: s.SeccompProfileRoot, Runonce: s.RunOnce, SerializeImagePulls: s.SerializeImagePulls, StandaloneMode: (len(s.APIServerList) == 0), StreamingConnectionIdleTimeout: s.StreamingConnectionIdleTimeout.Duration, SyncFrequency: s.SyncFrequency.Duration, SystemCgroups: s.SystemCgroups, TLSOptions: tlsOptions, Writer: writer, VolumePlugins: ProbeVolumePlugins(s.VolumePluginDir), OutOfDiskTransitionFrequency: s.OutOfDiskTransitionFrequency.Duration, HairpinMode: s.HairpinMode, BabysitDaemons: s.BabysitDaemons, ExperimentalFlannelOverlay: s.ExperimentalFlannelOverlay, NodeIP: net.ParseIP(s.NodeIP), EvictionConfig: evictionConfig, PodsPerCore: int(s.PodsPerCore), }, nil }
step1: 根据配置创建hostNetworkSources,hostPIDSources,hostIPCSources对象,该函数后面会用到
step2: mounter,writer,tlsOptions对象的创建
step3: dockerExecHandler的初始化
step4:imageGCPolicy初始化
step5: diskSpacePolicy初始化
step6: manifestURLHeader,reservation,thresholds,evictionConfig对象的初始化
step7: 构建KubeletConfig对象,在这个构建函数中,同时构建了几个关键的对象;
例如:
dockertools.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration)
ProbeNetworkPlugins(s.NetworkPluginDir)
ProbeVolumePlugins(s.VolumePluginDir)
oom.NewOOMAdjuster()
(4) ConnectToDockerOrDie分析
// ConnectToDockerOrDie creates docker client connecting to docker daemon. // If the endpoint passed in is "fake://", a fake docker client // will be returned. The program exits if error occurs. The requestTimeout // is the timeout for docker requests. If timeout is exceeded, the request // will be cancelled and throw out an error. If requestTimeout is 0, a default // value will be applied. func ConnectToDockerOrDie(dockerEndpoint string, requestTimeout time.Duration) DockerInterface { if dockerEndpoint == "fake://" { return NewFakeDockerClient() } client, err := getDockerClient(dockerEndpoint) if err != nil { glog.Fatalf("Couldn't connect to docker: %v", err) } glog.Infof("Start docker client with request timeout=%v", requestTimeout) return newKubeDockerClient(client, requestTimeout) }
// newKubeDockerClient creates an kubeDockerClient from an existing docker client. If requestTimeout is 0, // defaultTimeout will be applied. func newKubeDockerClient(dockerClient *dockerapi.Client, requestTimeout time.Duration) DockerInterface { if requestTimeout == 0 { requestTimeout = defaultTimeout } return &kubeDockerClient{ client: dockerClient, timeout: requestTimeout, } }
// DockerInterface is an abstract interface for testability. It abstracts the interface of docker client. type DockerInterface interface { ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error) InspectContainer(id string) (*dockertypes.ContainerJSON, error) CreateContainer(dockertypes.ContainerCreateConfig) (*dockertypes.ContainerCreateResponse, error) StartContainer(id string) error StopContainer(id string, timeout int) error RemoveContainer(id string, opts dockertypes.ContainerRemoveOptions) error InspectImage(image string) (*dockertypes.ImageInspect, error) ListImages(opts dockertypes.ImageListOptions) ([]dockertypes.Image, error) PullImage(image string, auth dockertypes.AuthConfig, opts dockertypes.ImagePullOptions) error RemoveImage(image string, opts dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDelete, error) ImageHistory(id string) ([]dockertypes.ImageHistory, error) Logs(string, dockertypes.ContainerLogsOptions, StreamOptions) error Version() (*dockertypes.Version, error) Info() (*dockertypes.Info, error) CreateExec(string, dockertypes.ExecConfig) (*dockertypes.ContainerExecCreateResponse, error) StartExec(string, dockertypes.ExecStartCheck, StreamOptions) error InspectExec(id string) (*dockertypes.ContainerExecInspect, error) AttachToContainer(string, dockertypes.ContainerAttachOptions, StreamOptions) error ResizeContainerTTY(id string, height, width int) error ResizeExecTTY(id string, height, width int) error }
该函数中初始化了一个kubeDockerClient对象,该对象包括一组DockerInterface。后续处理中,关于DockerInterface的操作,实际的执行体为该对象。
(5) RunKubelet 函数分析
// RunKubelet is responsible for setting up and running a kubelet. It is used in three different applications: // 1 Integration tests // 2 Kubelet binary // 3 Standalone 'kubernetes' binary // Eventually, #2 will be replaced with instances of #3 func RunKubelet(kcfg *KubeletConfig) error { kcfg.Hostname = nodeutil.GetHostname(kcfg.HostnameOverride) if len(kcfg.NodeName) == 0 { // Query the cloud provider for our node name, default to Hostname nodeName := kcfg.Hostname if kcfg.Cloud != nil { var err error instances, ok := kcfg.Cloud.Instances() if !ok { return fmt.Errorf("failed to get instances from cloud provider") } nodeName, err = instances.CurrentNodeName(kcfg.Hostname) if err != nil { return fmt.Errorf("error fetching current instance name from cloud provider: %v", err) } glog.V(2).Infof("cloud provider determined current node name to be %s", nodeName) } kcfg.NodeName = nodeName } eventBroadcaster := record.NewBroadcaster() kcfg.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kubelet", Host: kcfg.NodeName}) eventBroadcaster.StartLogging(glog.V(3).Infof) if kcfg.EventClient != nil { glog.V(4).Infof("Sending events to api server.") eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kcfg.EventClient.Events("")}) } else { glog.Warning("No api server defined - no events will be sent to API server.") } privilegedSources := capabilities.PrivilegedSources{ HostNetworkSources: kcfg.HostNetworkSources, HostPIDSources: kcfg.HostPIDSources, HostIPCSources: kcfg.HostIPCSources, } capabilities.Setup(kcfg.AllowPrivileged, privilegedSources, 0) credentialprovider.SetPreferredDockercfgPath(kcfg.RootDirectory) glog.V(2).Infof("Using root directory: %v", kcfg.RootDirectory) builder := kcfg.Builder if builder == nil { builder = CreateAndInitKubelet } if kcfg.OSInterface == nil { kcfg.OSInterface = kubecontainer.RealOS{} } k, podCfg, err := builder(kcfg) if err != nil { return fmt.Errorf("failed to create kubelet: %v", err) } util.ApplyRLimitForSelf(kcfg.MaxOpenFiles) // TODO(dawnchen): remove this once we deprecated old debian containervm images. // This is a workaround for issue: https://github.com/opencontainers/runc/issues/726 // The current chosen number is consistent with most of other os dist. const maxkeysPath = "/proc/sys/kernel/keys/root_maxkeys" const minKeys uint64 = 1000000 key, err := ioutil.ReadFile(maxkeysPath) if err != nil { glog.Errorf("Cannot read keys quota in %s", maxkeysPath) } else { fields := strings.Fields(string(key)) nkey, _ := strconv.ParseUint(fields[0], 10, 64) if nkey < minKeys { glog.Infof("Setting keys quota in %s to %d", maxkeysPath, minKeys) err = ioutil.WriteFile(maxkeysPath, []byte(fmt.Sprintf("%d", uint64(minKeys))), 0644) if err != nil { glog.Warningf("Failed to update %s: %v", maxkeysPath, err) } } } const maxbytesPath = "/proc/sys/kernel/keys/root_maxbytes" const minBytes uint64 = 25000000 bytes, err := ioutil.ReadFile(maxbytesPath) if err != nil { glog.Errorf("Cannot read keys bytes in %s", maxbytesPath) } else { fields := strings.Fields(string(bytes)) nbyte, _ := strconv.ParseUint(fields[0], 10, 64) if nbyte < minBytes { glog.Infof("Setting keys bytes in %s to %d", maxbytesPath, minBytes) err = ioutil.WriteFile(maxbytesPath, []byte(fmt.Sprintf("%d", uint64(minBytes))), 0644) if err != nil { glog.Warningf("Failed to update %s: %v", maxbytesPath, err) } } } // process pods and exit. if kcfg.Runonce { if _, err := k.RunOnce(podCfg.Updates()); err != nil { return fmt.Errorf("runonce failed: %v", err) } glog.Infof("Started kubelet %s as runonce", version.Get().String()) } else { startKubelet(k, podCfg, kcfg) glog.Errorf("Started kubelet %s", version.Get().String()) } return nil }
step1: KubeletConfig中NodeName的设置
step2: NewBroadcaster与API Server交互(待验证)
step3: privilegedSources capabilities初始化
step4: SetPreferredDockercfgPath设置
step5: CreateAndInitKubelet,该函数的作用从名字中就可以看出,创建并初始化kubelet(重要)
step6: ApplyRLimitForSelf
step7: 中间的代码,好像是是修改了系统的某些参数,解决一个debug
step8: startKubelet进入下一级的初始化过程
(6) CreateAndInitKubelet函数分析
CreateAndInitKubelet主要做了两件事情,创建了kubecontainer.ContainerGCPolicy,然后进行到NewMainKubelet中进行kubelet大量的对象创建工作func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.PodConfig, err error) { // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop // up into "per source" synchronizations // TODO: KubeletConfig.KubeClient should be a client interface, but client interface misses certain methods // used by kubelet. Since NewMainKubelet expects a client interface, we need to make sure we are not passing // a nil pointer to it when what we really want is a nil interface. var kubeClient clientset.Interface if kc.KubeClient != nil { kubeClient = kc.KubeClient // TODO: remove this when we've refactored kubelet to only use clientset. } gcPolicy := kubecontainer.ContainerGCPolicy{ MinAge: kc.MinimumGCAge, MaxPerPodContainer: kc.MaxPerPodContainerCount, MaxContainers: kc.MaxContainerCount, } daemonEndpoints := &api.NodeDaemonEndpoints{ KubeletEndpoint: api.DaemonEndpoint{Port: int32(kc.Port)}, } pc = kc.PodConfig if pc == nil { pc = makePodSourceConfig(kc) } k, err = kubelet.NewMainKubelet( kc.Hostname, kc.NodeName, kc.DockerClient, kubeClient, kc.RootDirectory, kc.SeccompProfileRoot, kc.PodInfraContainerImage, kc.SyncFrequency, float32(kc.RegistryPullQPS), kc.RegistryBurst, kc.EventRecordQPS, kc.EventBurst, gcPolicy, pc.SeenAllSources, kc.RegisterNode, kc.RegisterSchedulable, kc.StandaloneMode, kc.ClusterDomain, kc.ClusterDNS, kc.MasterServiceNamespace, kc.VolumePlugins, kc.NetworkPlugins, kc.NetworkPluginName, kc.StreamingConnectionIdleTimeout, kc.Recorder, kc.CAdvisorInterface, kc.ImageGCPolicy, kc.DiskSpacePolicy, kc.Cloud, kc.AutoDetectCloudProvider, kc.NodeLabels, kc.NodeStatusUpdateFrequency, kc.OSInterface, kc.CgroupsPerQOS, kc.CgroupRoot, kc.ContainerRuntime, kc.RuntimeRequestTimeout, kc.RktPath, kc.RktAPIEndpoint, kc.RktStage1Image, kc.Mounter, kc.Writer, kc.ConfigureCBR0, kc.NonMasqueradeCIDR, kc.PodCIDR, kc.ReconcileCIDR, kc.MaxPods, kc.PodsPerCore, kc.NvidiaGPUs, kc.DockerExecHandler, kc.ResolverConfig, kc.CPUCFSQuota, daemonEndpoints, kc.OOMAdjuster, kc.SerializeImagePulls, kc.ContainerManager, kc.OutOfDiskTransitionFrequency, kc.ExperimentalFlannelOverlay, kc.NodeIP, kc.Reservation, kc.EnableCustomMetrics, kc.VolumeStatsAggPeriod, kc.ContainerRuntimeOptions, kc.HairpinMode, kc.BabysitDaemons, kc.EvictionConfig, kc.Options, kc.EnableControllerAttachDetach, ) if err != nil { return nil, nil, err } k.BirthCry() k.StartGarbageCollection() return k, pc, nil }
step1: kubecontainer.ContainerGCPolicy对象创建
step2: api.NodeDaemonEndpoints赋值
step3:NewMainKubelet函数调用,并返回一个Kubelet对象 (重要,该函数类创建了大多数kubelet需要用到的对象)
step4:BirthCry,发送一个事件,通知kubelet已经启动
C:\Go\src\k8s.io\kubernetes\pkg\kubelet\kubelet.go:3556
step5:启动GarbageCollect(垃圾桶收集)的协程
step6: Kubelet对象返回C:
未完\Go\src\k8s.io\kubernetes\pk\kubelet\kubelet.go
未完待续
参考大牛的博文:k8s源码分析-----kubelet(1)主要流程
http://blog.csdn.net/screscent/article/details/51086684
更多推荐
所有评论(0)