源码版本 v1.3.0-alpha.5


1.3.0 版本相对1.2.4版本最明显的区别的是将各k8s 部件的二进制可执行文件合到一个文件hyperkube中。通过命令行参数,启动不同的k8s部件。

这里按照代码执行顺序简单的走读kubelet源码。

程序入口: cmd/kubelet/kubelet.go 文件中main()  函数实例化NewKubeletServer,命令行参数处理,日志初始化,调用cmd/kubelet/app/server.go文件中Run函数。(如果使用hyperkube启动入口也在这)

func run(s *options.KubeletServer, kcfg *KubeletConfig) (errerror). 

在run函数中主要做的事是:

kcfg.KubeClient, err = clientset.NewForConfig(clientConfig)//创建API 客户端

cloud, err :=cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)//云provider初始化,一般本地执行不涉及云

kcfg.CAdvisorInterface, err = cadvisor.New(s.CAdvisorPort,kcfg.ContainerRuntime)

kcfg.ContainerManager, err =cm.NewContainerManager(kcfg.Mounter, kcfg.CAdvisorInterface,cm.NodeConfig{  //容器管理器

RunKubelet(kcfg)  //运行kubket

http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress,strconv.Itoa(int(s.HealthzPort))), nil)//启动health web服务

 

对于RunKubelet(kcfg),主要完成:

eventBroadcaster := record.NewBroadcaster() 
kcfg.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component:"kubelet", Host: kcfg.NodeName}) 

builder := kcfg.Builder
if builder == nil {
   builder = CreateAndInitKubelet
}
if kcfg.OSInterface == nil {
   kcfg.OSInterface =kubecontainer.RealOS{}
}
k, podCfg, err := builder(kcfg)

 

其中CreateAndInitKubelet调用makePodSourceConfig,makePodSourceConfig主要设置pod信息的来源,pod信息可以来源ConfigFile,ManifestURL,KubeClient。一般默认信息来源是通过KubeClient定时轮询kubernets Apiserver 获取,获取信息发送至通道里。(早期版本通过watchetcd 获取,后来因为安全问题,变为api轮询)。

在pkg\kubelets\config\apiserver.go中

func newSourceApiserverFromLW(lw cache.ListerWatcher,updates chan<- interface{}) {
   send := func(objs []interface{}) {
      var pods []*api.Pod
      for _, o := range objs {
         pods = append(pods,o.(*api.Pod))
      }
      updates <-kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source:kubetypes.ApiserverSource}  //将pod更新信息发送至updates channel。kebelet
 会处理来自updates channel的更新

   }
   cache.NewReflector(lw, &api.Pod{},cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0).Run()  

 

在pkg/client/cache/Reflector.go中

func (r *Reflector) Run() {
   glog.V(3).Infof("Startingreflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
   go wait.Until(func() {
      if err :=r.ListAndWatch(wait.NeverStop); err != nil {
        utilruntime.HandleError(err)    
      }
   }, r.period, wait.NeverStop)
}

 

回到RunKubelet(kcfg)函数,该函数调用startKubelet(k, podCfg, kcfg),

go wait.Until(func() { k.Run(podCfg.Updates()) }, 0,wait.NeverStop)// 开始kubelet处理函数

在该函数中开始pkg\kuberlet\kubelet.go 的func(kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate)

     k.Run(podCfg.Updates())来自type KubeletBootstrap interface    {Run(<-chan kubetypes.PodUpdate)}

            来自pkg\kuberlet\kubelet.go的 func(kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate)

              initializeModules()

                    //Step1: Promethues metrics.
                   metrics.Register(kl.runtimeCache)

                    //Step 2: Setup filesystem directories.

                    setupDataDirs()

                    //Step 3: If the container logs directory does not exist, create it.

                    os.Stat(containerLogsDir)

                    //Step 4: Start the image manager.

                    imageManager.Start()

                    //Step 5: Start container manager.

                    containerManager.Start()

                    //Step 6: Start out of memory watcher

                    oomWatcher.Start(kl.nodeRef)

                    //Step 7: Start resource analyzer

                go wait.Until(kl.syncNodeStatus,kl.nodeStatusUpdateFrequency, wait.NeverStop)

                go wait.Until(kl.syncNetworkStatus,30*time.Second, wait.NeverStop)

                go wait.Until(kl.updateRuntimeUp,5*time.Second, wait.NeverStop)

                //Start component sync loops.

               kl.statusManager.Start()

                kl.probeManager.Start()

                //Start the pod lifecycle event generator.
                kl.pleg.Start()
                kl.syncLoop(updates, kl)

                    kl.syncLoopIteration(updates,handler, syncTicker.C, housekeepingTicker.C, plegCh)

                       死循环

                       kl.syncLoopIteration(updates, handler, syncTicker.C,housekeepingTicker.C, plegCh)

                           case u, open := <-updates:

        开始kurblet对外服务

        开协程 go wait.Until(func() {k.ListenAndServe(kc.Address, kc.Port,kc.TLSOptions, kc.Auth, kc.EnableDebuggingHandlers)}, 0, wait.NeverStop)

                调用server.ListenAndServeKubeletServer(kl,kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers,kl.containerRuntime)

/pkg/kubelet/server/server.go的 ListenAndServeKubeletServer

        开协程go wait.Until(func(){   k.ListenAndServeReadOnly(kc.Address,kc.ReadOnlyPort)}, 0, wait.NeverStop)

               调用server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer,address, port, kl.containerRuntime)

ListenAndServeKubeletReadOnlyServer

ListenAndServeKubeletServer 和ListenAndServeKubeletReadOnlyServer 都调用NewServer

NewServer 通过server.InstallDefaultHandlers() 安装handler

        在InstallDefaultHandlers()中注册/pods和/spec/的uri和处理程序

 

可以在kuberlet运行的节点执行下面操作获取kuberlet的web服务

curl http://127.0.0.1:10248/healthz
curl http://127.0.0.1:10255/pods
curl http://127.0.0.1:10255/spec/

 

 



Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐