kubelet启动pod源码分析(二)
上一篇blog讲解到到从apiserver获取需要更新的pod的信息,然后传入syncLoopIteration。那么需要变化的pod不是只有上面上面三个,因为本地的pod状态可能发生变化,譬如本地的容器退出,那么怎么去让本地退出的容器重新启动呢?答案在于另一个输入源码plegch这个channel中。下面看看syncLoopIteration这几个输入源func (kl *Kubelet) s
上一篇blog讲解到到从apiserver获取需要更新的pod的信息,然后传入syncLoopIteration。那么需要变化的pod不是只有上面上面三个,因为本地的pod状态可能发生变化,譬如本地的容器退出,那么怎么去让本地退出的容器重新启动呢?答案在于另一个输入源码plegch这个channel中。
下面看看syncLoopIteration这几个输入源
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {}
configCh就是上一篇介绍的update ;syncCh 和housekeepingCh是两个定时任务,分别是每隔一秒和两秒运行一次,plegCh是本地就是同步本地变化,如果本地真实pod状态和本地pod缓存的状态不一致,则会把变化发到这个管道里面。pkg/kubelet/pleg/generic.go,
先介绍一个pod记录的结构体
type podRecord struct {
old *kubecontainer.Pod
current *kubecontainer.Pod
}
很简单,就是记录当前pod状态和之前记录状态的两个属性。
func (g *GenericPLEG) relist() {
//获取本地pod真实状态
podList, err := g.runtime.GetPods(true)
//设置本地新状态
pods := kubecontainer.Pods(podList)
g.podRecords.setCurrent(pods)
//和老状态比较,通过computeEvents产生event,如果新老状态一致则返回nil,不一致,如果新状态是running,这返回容器启动状态,如果新状态是exit,则返回容器died,详细的看generateEvents这个方法。这个event就是上面plegch里面的内容。
for _, container := range allContainers {
events := computeEvents(oldPod, pod, &container.ID)
for _, e := range events {
updateEvents(eventsByPodID, e)
}
}
}
//更新本地缓存
g.updateCache(pod, pid)
g.podRecords.update(pid)
syncLoopIteration这个事件的来源说清楚了。具体看看syncLoopIteration是怎样运行的
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
// Update from a config source; dispatch it to the right handler
// callback.
if !open {
glog.Errorf("Update channel is closed. Exiting the sync loop.")
return false
}
switch u.Op {
case kubetypes.ADD:
glog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
glog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletiontimestamps(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
glog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
glog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
glog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
....
syncLoopIteration本质上就是根据不同的事件类型去执行不同的方法。已创建pod为例,handler.HandlePodAdditions(u.Pods),代码任然在pkg/kubelet/kubelet.go
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
start := kl.clock.Now()
sort.Sort(sliceutils.PodsByCreationTime(pods))
for _, pod := range pods {
existingPods := kl.podManager.GetPods()
kl.podManager.AddPod(pod)
//如果是mirror pod则直接跳过
if kubepod.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
...
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
kl.probeManager.AddPod(pod)
}
}
上面的dispatchWork负责分发任务,具体看下图
通过分发,最终调用,kubelet.go里面的syncPod方法,
func (kl *Kubelet) syncPod(o syncPodOptions) error {
...
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
kl.statusManager.SetPodStatus(pod, apiPodStatus)
pcm := kl.containerManager.NewPodContainerManager()
if kubepod.IsStaticPod(pod) ...
kl.makePodDataDirs(pod)
kl.volumeManager.WaitForAttachAndMount(pod)
kl.getPullSecretsForPod(pod)
kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
}
最终还是通过运行时SyncPod去启动容器,具体代码在pkg/kubelet/kuberuntime/kuberuntime_manager.go里面,
func (m *kubeGenericRuntimeManager) SyncPod
通过六个步骤
// 1. Compute sandbox and container changes.
// 2. Kill pod sandbox if necessary.
// 3. Kill any containers that should not be running.
// 4. Create sandbox if necessary.
// 5. Create init containers.
// 6. Create normal containers.
到了第六步才是启动业务容器的代码pkg/kubelet/kuberuntime/kuberuntime_container.go
func (m *kubeGenericRuntimeManager) startContainer
它也是分为一下几个步骤
// * pull the image
// * create the container
// * start the container
// * run the post start lifecycle hooks (if applicable)
至此,容器启动的代码已经介绍完毕,但还有很多细节,譬如存储挂载网络分配等细节,后面再慢慢阐述!
更多推荐
所有评论(0)