Kubelet 源码走读(1)
源码版本 v1.3.0-alpha.51.3.0 版本相对1.2.4版本最明显的区别的是将各k8s 部件的二进制可执行文件合到一个文件hyperkube中。通过命令行参数,启动不同的k8s部件。这里按照代码执行顺序简单的走读kubelet源码。程序入口: cmd/kubelet/kubelet.go 文件中main() 函数实例化NewKubeletServer,命令行参数
源码版本 v1.3.0-alpha.5
1.3.0 版本相对1.2.4版本最明显的区别的是将各k8s 部件的二进制可执行文件合到一个文件hyperkube中。通过命令行参数,启动不同的k8s部件。
这里按照代码执行顺序简单的走读kubelet源码。
程序入口: cmd/kubelet/kubelet.go 文件中main() 函数实例化NewKubeletServer,命令行参数处理,日志初始化,调用cmd/kubelet/app/server.go文件中Run函数。(如果使用hyperkube启动入口也在这)
func run(s *options.KubeletServer, kcfg *KubeletConfig) (errerror).
在run函数中主要做的事是:
kcfg.KubeClient, err = clientset.NewForConfig(clientConfig)//创建API 客户端
cloud, err :=cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)//云provider初始化,一般本地执行不涉及云
kcfg.CAdvisorInterface, err = cadvisor.New(s.CAdvisorPort,kcfg.ContainerRuntime)
kcfg.ContainerManager, err =cm.NewContainerManager(kcfg.Mounter, kcfg.CAdvisorInterface,cm.NodeConfig{ //容器管理器
RunKubelet(kcfg) //运行kubket
http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress,strconv.Itoa(int(s.HealthzPort))), nil)//启动health web服务
对于RunKubelet(kcfg),主要完成:
eventBroadcaster := record.NewBroadcaster()
kcfg.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component:"kubelet", Host: kcfg.NodeName})
builder := kcfg.Builder
if builder == nil {
builder = CreateAndInitKubelet
}
if kcfg.OSInterface == nil {
kcfg.OSInterface =kubecontainer.RealOS{}
}
k, podCfg, err := builder(kcfg)
其中CreateAndInitKubelet调用makePodSourceConfig,makePodSourceConfig主要设置pod信息的来源,pod信息可以来源ConfigFile,ManifestURL,KubeClient。一般默认信息来源是通过KubeClient定时轮询kubernets Apiserver 获取,获取信息发送至通道里。(早期版本通过watchetcd 获取,后来因为安全问题,变为api轮询)。
在pkg\kubelets\config\apiserver.go中
func newSourceApiserverFromLW(lw cache.ListerWatcher,updates chan<- interface{}) {
send := func(objs []interface{}) {
var pods []*api.Pod
for _, o := range objs {
pods = append(pods,o.(*api.Pod))
}
updates <-kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source:kubetypes.ApiserverSource} //将pod更新信息发送至updates channel。kebelet
会处理来自updates channel的更新
}
cache.NewReflector(lw, &api.Pod{},cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0).Run()
在pkg/client/cache/Reflector.go中
func (r *Reflector) Run() {
glog.V(3).Infof("Startingreflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
go wait.Until(func() {
if err :=r.ListAndWatch(wait.NeverStop); err != nil {
utilruntime.HandleError(err)
}
}, r.period, wait.NeverStop)
}
回到RunKubelet(kcfg)函数,该函数调用startKubelet(k, podCfg, kcfg),
go wait.Until(func() { k.Run(podCfg.Updates()) }, 0,wait.NeverStop)// 开始kubelet处理函数
在该函数中开始pkg\kuberlet\kubelet.go 的func(kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate)
k.Run(podCfg.Updates())来自type KubeletBootstrap interface {Run(<-chan kubetypes.PodUpdate)}
来自pkg\kuberlet\kubelet.go的 func(kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate)
initializeModules()
//Step1: Promethues metrics.
metrics.Register(kl.runtimeCache)
//Step 2: Setup filesystem directories.
setupDataDirs()
//Step 3: If the container logs directory does not exist, create it.
os.Stat(containerLogsDir)
//Step 4: Start the image manager.
imageManager.Start()
//Step 5: Start container manager.
containerManager.Start()
//Step 6: Start out of memory watcher
oomWatcher.Start(kl.nodeRef)
//Step 7: Start resource analyzer
go wait.Until(kl.syncNodeStatus,kl.nodeStatusUpdateFrequency, wait.NeverStop)
go wait.Until(kl.syncNetworkStatus,30*time.Second, wait.NeverStop)
go wait.Until(kl.updateRuntimeUp,5*time.Second, wait.NeverStop)
//Start component sync loops.
kl.statusManager.Start()
kl.probeManager.Start()
//Start the pod lifecycle event generator.
kl.pleg.Start()
kl.syncLoop(updates, kl)
kl.syncLoopIteration(updates,handler, syncTicker.C, housekeepingTicker.C, plegCh)
死循环
kl.syncLoopIteration(updates, handler, syncTicker.C,housekeepingTicker.C, plegCh)
case u, open := <-updates:
开始kurblet对外服务
开协程 go wait.Until(func() {k.ListenAndServe(kc.Address, kc.Port,kc.TLSOptions, kc.Auth, kc.EnableDebuggingHandlers)}, 0, wait.NeverStop)
调用server.ListenAndServeKubeletServer(kl,kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers,kl.containerRuntime)
/pkg/kubelet/server/server.go的 ListenAndServeKubeletServer
开协程go wait.Until(func(){ k.ListenAndServeReadOnly(kc.Address,kc.ReadOnlyPort)}, 0, wait.NeverStop)
调用server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer,address, port, kl.containerRuntime)
ListenAndServeKubeletReadOnlyServer
ListenAndServeKubeletServer 和ListenAndServeKubeletReadOnlyServer 都调用NewServer
NewServer 通过server.InstallDefaultHandlers() 安装handler
在InstallDefaultHandlers()中注册/pods和/spec/的uri和处理程序
可以在kuberlet运行的节点执行下面操作获取kuberlet的web服务
curl http://127.0.0.1:10248/healthz
curl http://127.0.0.1:10255/pods
curl http://127.0.0.1:10255/spec/
更多推荐
所有评论(0)