【kubernetes/k8s源码分析】创建pod源码分析
1 创建pod流程具体的创建步骤包括:1、客户端发起请求,可API Server http请求,也可以kubectl命令行,数据类型包括JSON和YAML2、API Server处理请求,存储Pod到etcd3、Scheduler通过API Server查看未绑定的Pod,尝试为Pod分配节点4、调度预选:Scheduler利用规则过滤掉不符合要求节点。例...
1 创建pod流程
-
具体的创建步骤包括:
1、客户端发起请求,可API Server http请求,也可以kubectl命令行,数据类型包括JSON和YAML
2、API Server处理请求,存储Pod到etcd
3、Scheduler通过API Server查看未绑定的Pod,尝试为Pod分配节点
4、调度预选:Scheduler利用规则过滤掉不符合要求节点。例如Pod指定了资源量,可用资源比Pod需要资源量小的过滤掉
5、调度优选:Scheduler考虑一些整体优化,比如一个Replication Controller的副本分布到不同的节点,使用最低负载的节点等
6、选择主机:选择打分最高的主机,进行绑定操作,存储到etcd
7、kubelet创建操作: 绑定成功后,Scheduler会调用APIServer在etcd中创建一个boundpod对象,描述在一个工作节点上绑定运行的所有pod信息。运行在每个工作节点上的kubelet也会定期与etcd同步boundpod信息,一旦发现应该在该工作节点上运行的boundpod对象没有更新,则调用Docker API创建并启动pod内的容器。
2 syncLoop函数
-
syncLoop 主循环,文件、URL 和 apiserver监听变化,当有新的变化发生时,它会调用对应的处理函数,保证 pod 处于期望的状态。如果 pod 没有变化,它也会定期保证所有的容器和最新的期望状态保持一致。
for 循环不断调用 syncLoopIteration 方法。两个定时器: syncTicker 和 housekeepingTicker,即使没有需要更新的 pod 配置,kubelet 也会定时去做同步和清理工作。在每次循环出现错误,kubelet 会记录到 runtimeState 中,就等待 5 秒中继续循环。第二个参数 SyncHandler 类型,interface 定义处理不同情况
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
glog.Info("Starting kubelet main sync loop.")
// The resyncTicker wakes up kubelet to checks if there are any pod workers
// that need to be sync'd. A one-second period is sufficient because the
// sync interval is defaulted to 10s.
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
plegCh := kl.pleg.Watch()
for {
if rs := kl.runtimeState.runtimeErrors(); len(rs) != 0 {
glog.Infof("skipping pod synchronization - %v", rs)
time.Sleep(5 * time.Second)
continue
}
kl.syncLoopMonitor.Store(kl.clock.Now())
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}
3 syncLoopIteration函数
-
configCh:读取配置事件,通过文件、URL 和 apiserver 汇聚起来的事件
-
syncCh:定时器,定期同步最新保存的 pod 状态
-
houseKeepingCh:做 pod 清理工作
-
plegCh:如果 pod 的状态改变(被杀死,被暂停等)做处理
-
livenessManager.Updates():健康检查发现某个 pod 不可用,一般也要对它进行重启
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool
3.1 HandlePodAddtions函数
-
把所有的 pod 按照创建日期进行排序,保证最先创建的 pod 会最先被处理
-
加入到 podManager 中,因为 podManager 是 kubelet 的 source of truth,所有被管理的 pod 都要出现在里面。如果 podManager 中找不到某个 pod,就认为这个 pod 被删除了
如果是 mirror pod调用其单独的方法
验证 pod 能否该节点运行,不可以拒绝
把 pod 分配 worker 做异步处理
在 probeManager 中添加 pod,如果 pod 中定义了 readiness 和 liveness 健康检查,启动 goroutine 定期进行检测
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
start := kl.clock.Now()
sort.Sort(sliceutils.PodsByCreationTime(pods))
for _, pod := range pods {
existingPods := kl.podManager.GetPods()
kl.podManager.AddPod(pod)
if kubepod.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
if !kl.podIsTerminated(pod) {
activePods := kl.filterOutTerminatedPods(existingPods)
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
kl.probeManager.AddPod(pod)
}
}
3.1.1 dispatchWork函数
-
dispatchWork 工作是把接收到的参数封装成 UpdatePodOptions结构体,调用 kl.podWorkers.UpdatePod 方法。
-
podWorkers 的代码在 pkg/kubelet/pod_workers.go 文件中,
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
if kl.podIsTerminated(pod) {
if pod.DeletionTimestamp != nil {
kl.statusManager.TerminatePod(pod)
}
return
}
// Run the sync in an async worker.
kl.podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
OnCompleteFunc: func(err error) {
if err != nil {
metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
}
},
})
// Note the number of containers for new pods.
if syncType == kubetypes.SyncPodCreate {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
3.1.1.1 UpdatePod函数
-
UpdatePod 检查 podUpdates 字典是否存在对应的 pod
-
新建的 pod,会调用 p.managePodLoop() 方法作为 goroutine 运行更新工作。
-
它还会更新 podUpdate 和 isWorking,填入新 pod 信息,并往 podUpdates 管道中发送接收到的 pod 选项信息。
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
pod := options.Pod
uid := pod.UID
var podUpdates chan UpdatePodOptions
var exists bool
p.podLock.Lock()
defer p.podLock.Unlock()
if podUpdates, exists = p.podUpdates[uid]; !exists {
podUpdates = make(chan UpdatePodOptions, 1)
p.podUpdates[uid] = podUpdates
go func() {
defer runtime.HandleCrash()
p.managePodLoop(podUpdates)
}()
}
if !p.isWorking[pod.UID] {
p.isWorking[pod.UID] = true
podUpdates <- *options
} else {
// if a request to kill a pod is pending, we do not let anything overwrite that request.
update, found := p.lastUndeliveredWorkUpdate[pod.UID]
if !found || update.UpdateType != kubetypes.SyncPodKill {
p.lastUndeliveredWorkUpdate[pod.UID] = *options
}
}
}
3.1.1.1 managePodLoop函数
-
managePodLoop 调用 syncPodFn 方法去同步 pod,syncPodFn 实际上就是 kubelet.SyncPod
func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
var lastSyncTime time.Time
for update := range podUpdates {
err := func() error {
podUID := update.Pod.UID
status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
if err != nil {
return err
}
err = p.syncPodFn(syncPodOptions{
mirrorPod: update.MirrorPod,
pod: update.Pod,
podStatus: status,
killPodOptions: update.KillPodOptions,
updateType: update.UpdateType,
})
lastSyncTime = time.Now()
return err
}()
// notify the call-back function if the operation succeeded or not
if update.OnCompleteFunc != nil {
update.OnCompleteFunc(err)
}
if err != nil {
p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "Error syncing pod")
}
p.wrapUp(update.Pod.UID, err)
}
}
3.1.1.1.1 syncPod函数
// syncPod is the transaction script for the sync of a single pod.
//
// Arguments:
//
// o - the SyncPodOptions for this invocation
//
// The workflow is:
// * If the pod is being created, record pod worker start latency
// * Call generateAPIPodStatus to prepare an v1.PodStatus for the pod
// * If the pod is being seen as running for the first time, record pod
// start latency
// * Update the status of the pod in the status manager
// * Kill the pod if it should not be running
// * Create a mirror pod if the pod is a static pod, and does not
// already have a mirror pod
// * Create the data directories for the pod if they do not exist
// * Wait for volumes to attach/mount
// * Fetch the pull secrets for the pod
// * Call the container runtime's SyncPod callback
// * Update the traffic shaping for the pod's ingress and egress limits
//
// If any step of this workflow errors, the error is returned, and is repeated
// on the next syncPod call.
func (kl *Kubelet) syncPod(o syncPodOptions) error
a. 如删除 pod,执行并返回
// if we want to kill a pod, do it now!
if updateType == kubetypes.SyncPodKill {
killPodOptions := o.killPodOptions
if killPodOptions == nil || killPodOptions.PodStatusFunc == nil {
return fmt.Errorf("kill pod options are required if update type is kill")
}
apiPodStatus := killPodOptions.PodStatusFunc(pod, podStatus)
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// we kill the pod with the specified grace period since this is a termination
if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil {
// there was an error killing the pod, so we return that error directly
utilruntime.HandleError(err)
return err
}
return nil
}
b. 检查 pod ,主要是权限检查(使用主机网络模式,privileged 权限运行等)。如果没有权限,就删除本地旧的 pod 并返回错误信息
runnable := kl.canRunPod(pod)
if !runnable.Admit {
// Pod is not runnable; update the Pod and Container statuses to why.
apiPodStatus.Reason = runnable.Reason
apiPodStatus.Message = runnable.Message
// Waiting containers are not creating.
const waitingReason = "Blocked"
for _, cs := range apiPodStatus.InitContainerStatuses {
if cs.State.Waiting != nil {
cs.State.Waiting.Reason = waitingReason
}
}
for _, cs := range apiPodStatus.ContainerStatuses {
if cs.State.Waiting != nil {
cs.State.Waiting.Reason = waitingReason
}
}
}
func canRunPod(pod *v1.Pod) error {
if !capabilities.Get().AllowPrivileged {
for _, container := range pod.Spec.Containers {
if securitycontext.HasPrivilegedRequest(&container) {
return fmt.Errorf("pod with UID %q specified privileged container, but is disallowed", pod.UID)
}
}
for _, container := range pod.Spec.InitContainers {
if securitycontext.HasPrivilegedRequest(&container) {
return fmt.Errorf("pod with UID %q specified privileged init container, but is disallowed", pod.UID)
}
}
}
if pod.Spec.HostNetwork {
allowed, err := allowHostNetwork(pod)
if err != nil {
return err
}
if !allowed {
return fmt.Errorf("pod with UID %q specified host networking, but is disallowed", pod.UID)
}
}
if pod.Spec.HostPID {
allowed, err := allowHostPID(pod)
if err != nil {
return err
}
if !allowed {
return fmt.Errorf("pod with UID %q specified host PID, but is disallowed", pod.UID)
}
}
if pod.Spec.HostIPC {
allowed, err := allowHostIPC(pod)
if err != nil {
return err
}
if !allowed {
return fmt.Errorf("pod with UID %q specified host ipc, but is disallowed", pod.UID)
}
}
return nil
}
c. 如果是 static Pod,就创建或者更新对应的 mirrorPod
if kubepod.IsStaticPod(pod) {
podFullName := kubecontainer.GetPodFullName(pod)
deleted := false
if mirrorPod != nil {
if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
// The mirror pod is semantically different from the static pod. Remove
// it. The mirror pod will get recreated later.
glog.Warningf("Deleting mirror pod %q because it is outdated", format.Pod(mirrorPod))
if err := kl.podManager.DeleteMirrorPod(podFullName); err != nil {
glog.Errorf("Failed deleting mirror pod %q: %v", format.Pod(mirrorPod), err)
} else {
deleted = true
}
}
}
if mirrorPod == nil || deleted {
glog.V(3).Infof("Creating a mirror pod for static pod %q", format.Pod(pod))
if err := kl.podManager.CreateMirrorPod(pod); err != nil {
glog.Errorf("Failed creating a mirror pod for %q: %v", format.Pod(pod), err)
}
}
}
d. 创建 pod 的数据目录,存放 volume 和 plugin 信息
if err := kl.makePodDataDirs(pod); err != nil {
glog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err)
return err
}
func (kl *Kubelet) makePodDataDirs(pod *v1.Pod) error {
uid := pod.UID
if err := os.MkdirAll(kl.getPodDir(uid), 0750); err != nil && !os.IsExist(err) {
return err
}
if err := os.MkdirAll(kl.getPodVolumesDir(uid), 0750); err != nil && !os.IsExist(err) {
return err
}
if err := os.MkdirAll(kl.getPodPluginsDir(uid), 0750); err != nil && !os.IsExist(err) {
return err
}
return nil
}
e. 如果 PV,等待所有的 volume mount 完成(volumeManager后台处理)
// Volume manager will not mount volumes for terminated pods
if !kl.podIsTerminated(pod) {
// Wait for volumes to attach/mount
if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to mount volumes for pod %q: %v", format.Pod(pod), err)
glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)
return err
}
}
f. 有 image secrets,去 apiserver 获取对应的 secrets 数据
pullSecrets := kl.getPullSecretsForPod(pod)
g. 调用 container runtime 的 SyncPod 方法,去实现真正的容器创建逻辑
kl.containerRuntime.SyncPod() 调 runtime 执行具体容器的创建
// Call the container runtime's SyncPod callback
result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
kl.reasonCache.Update(pod.UID, result)
if err := result.Error(); err != nil {
return err
}
4 SyncPod函数
-
con
// Syncs the running pod into the desired pod.
SyncPod(pod *v1.Pod, apiPodStatus v1.PodStatus, podStatus *PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) PodSyncResult
2 syncLoop函数
-
具体的创建步骤包括:
pod phase 可能存在的值
- Pending: Pod 已经被 kubernetes 接收,但有一个或者多个容器镜像尚未创建,等待时间包括调度,Pod的时间和通过网络下载镜像的时间,可能需要些时间
- Running: 该 Pod 已经绑定到了一个节点上,Pod 中所有的容器都已被创建,至少有一个容器正在运行,或者正处于启动或重启状态
- Succeeded: Pod中所有的容器都被成功终止,并且不会在重启
- Failed: Pod中所有容器都已终止,并且至少有一个容器是因为失败终止(容器以非0状态退出或者被系统终止)
- Unknown: 因为某些原因无法获取 Pod 状态,通常因为与 Pod 所在主机通信失败
更多推荐
所有评论(0)