【kubernetes/k8s源码分析】kubelet源码分析-statusManager与probeManager
简介在 kubelet 初始化的时候,会NewMainKubelet函数中创建 statusManager 和 probeManager。statusManager 负责维护状态信息,并把 pod 状态更新到 apiserver,但是不负责监控 pod 状态的变化,而是提供接口供其他组件调用,比如 probeManager。probeManager 定时去监控 pod ...
简介
在 kubelet 初始化的时候,会NewMainKubelet函数中创建 statusManager 和 probeManager。
statusManager 负责维护状态信息,并把 pod 状态更新到 apiserver,但是不负责监控 pod 状态的变化,而是提供接口供其他组件调用,比如 probeManager。
probeManager 定时去监控 pod 中容器状况,一旦发现状态变化调用 statusManager 提供的方法更新 pod 状态。
readinessProbe 检测容器是否可以接受请求,如果检测结果失败,则将其从 service 的 endpoints 中移除,后续的请求也就不会发送给这个容器;livenessProbe 检测容器是否存活,如果检测结果失败,kubelet 会杀死这个容器,并重启一个新容器(除非 RestartPolicy 设置成了 Never)。
klet.livenessManager = proberesults.NewManager()
klet.podCache = kubecontainer.NewCache()
// podManager is also responsible for keeping secretManager and configMapManager contents up-to-date.
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager)
初始化probeManager,看到readiness liveness是不是特别熟悉
klet.probeManager = prober.NewManager(
klet.statusManager,
klet.livenessManager,
klet.runner,
containerRefManager,
kubeDeps.Recorder)
StatusManager
statusManager 对应的代码在 pkg/kubelet/status/status_manager.go
文件中,主要接口如下:
type PodStatusProvider interface {
// GetPodStatus returns the cached status for the provided pod UID, as well as whether it
// was a cache hit.
GetPodStatus(uid types.UID) (v1.PodStatus, bool)
}
PodStatusprovider主要为其他组件提供接口
type Manager interface {
PodStatusProvider
// Start the API server status sync loop.
Start()
// SetPodStatus caches updates the cached status for the given pod, and triggers a status update.
SetPodStatus(pod *v1.Pod, status v1.PodStatus)
// SetContainerReadiness updates the cached container status with the given readiness, and
// triggers a status update.
SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
// TerminatePod resets the container status for the provided pod to terminated and triggers
// a status update.
TerminatePod(pod *v1.Pod)
// RemoveOrphanedStatuses scans the status cache and removes any entries for pods not included in
// the provided podUIDs.
RemoveOrphanedStatuses(podUIDs map[types.UID]bool)
}
这个接口的方法:获取 pod 的状态、goroutine 同步工作、修改 pod 状态。修改状态的方法有多个,每个都有不同的用途:
- SetPodStatus:如 pod 状态发生变化会调用这个方法,把新状态更新到 apiserver,一般在 kubelet 维护 pod 生命周期的时候会调用
- SetContainerReadiness:如果健康检查发现 pod 中容器的状态变化会调用这个方法,修改 pod 的健康状态
- TerminatePod:删除 pod 时候调用这个方法,把 pod 中所有的容器置为 terminated
- RemoveOrphanedStatuses:删除孤儿 pod,直接把对应的状态数据从缓存中删除即可
kl.statusManager.Start()
kl.probeManager.Start()
1. Start函数
路径: pkg/kubelet/status/status_manager.go
Start()
方法是在 kubelet 运行的时候调用的,它会启动一个 goroutine 执行更新操作:
func (m *manager) Start() {
// Don't start the status manager if we don't have a client. This will happen
// on the master, where the kubelet is responsible for bootstrapping the pods
// of the master components.
if m.kubeClient == nil {
glog.Infof("Kubernetes client is nil, not starting status manager.")
return
}
glog.Info("Starting to sync pod status with apiserver")
syncTicker := time.Tick(syncPeriod)
// syncPod and syncBatch share the same go routine to avoid sync races.
go wait.Forever(func() {
select {
case syncRequest := <-m.podStatusChannel:
glog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",
syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)
m.syncPod(syncRequest.podUID, syncRequest.status)
case <-syncTicker:
m.syncBatch()
}
}, 0)
}
从两个 channel 监听处理:syncTicker
是个定时器,也就是说它会定时保证 apiserver 和自己缓存的最新 pod 状态保持一致;podStatusChannel
是所有 pod 状态更新发送到的地方,调用方不会直接操作这个 channel,而是通过调用上面提到的修改状态的各种方法,这些方法内部会往这个 channel 写数据。
1.2 syncBatch函数
syncBatch定期的和apiserver同步pod状态:清除掉孤立的版本。最终调用syncPod进行更新状态
func (m *manager) syncBatch() {
var updatedStatuses []podStatusSyncRequest
podToMirror, mirrorToPod := m.podManager.GetUIDTranslations()
func() { // Critical section
m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock()
// Clean up orphaned versions.
for uid := range m.apiStatusVersions {
_, hasPod := m.podStatuses[uid]
_, hasMirror := mirrorToPod[uid]
if !hasPod && !hasMirror {
delete(m.apiStatusVersions, uid)
}
}
for uid, status := range m.podStatuses {
syncedUID := uid
if mirrorUID, ok := podToMirror[uid]; ok {
if mirrorUID == "" {
glog.V(5).Infof("Static pod %q (%s/%s) does not have a corresponding mirror pod; skipping", uid, status.podName, status.podNamespace)
continue
}
syncedUID = mirrorUID
}
if m.needsUpdate(syncedUID, status) {
updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
} else if m.needsReconcile(uid, status.status) {
// Delete the apiStatusVersions here to force an update on the pod status
// In most cases the deleted apiStatusVersions here should be filled
// soon after the following syncPod() [If the syncPod() sync an update
// successfully].
delete(m.apiStatusVersions, syncedUID)
updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
}
}
}()
for _, update := range updatedStatuses {
glog.V(5).Infof("Status Manager: syncPod in syncbatch. pod UID: %q", update.podUID)
m.syncPod(update.podUID, update.status)
}
}
1. 2 syncPod函数
syncPod
根据参数中的 pod 和它的状态信息对 apiserver 中的数据进行更新(调用API CoreV1().Pods(namespace).Patch),如果发现 pod 已经被删除也会把它从内部数据结构中删除。
func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
if !m.needsUpdate(uid, status) {
glog.V(1).Infof("Status for pod %q is up-to-date; skipping", uid)
return
}
// TODO: make me easier to express from client code
pod, err := m.kubeClient.Core().Pods(status.podNamespace).Get(status.podName, metav1.GetOptions{})
if errors.IsNotFound(err) {
glog.V(3).Infof("Pod %q (%s) does not exist on the server", status.podName, uid)
// If the Pod is deleted the status will be cleared in
// RemoveOrphanedStatuses, so we just ignore the update here.
return
}
if err != nil {
glog.Warningf("Failed to get status for pod %q: %v", format.PodDesc(status.podName, status.podNamespace, uid), err)
return
}
translatedUID := m.podManager.TranslatePodUID(pod.UID)
if len(translatedUID) > 0 && translatedUID != uid {
glog.V(2).Infof("Pod %q was deleted and then recreated, skipping status update; old UID %q, new UID %q", format.Pod(pod), uid, translatedUID)
m.deletePodStatus(uid)
return
}
pod.Status = status.status
if err := podutil.SetInitContainersStatusesAnnotations(pod); err != nil {
glog.Error(err)
}
// TODO: handle conflict as a retry, make that easier too.
newPod, err := m.kubeClient.Core().Pods(pod.Namespace).UpdateStatus(pod)
if err != nil {
glog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err)
return
}
pod = newPod
glog.V(3).Infof("Status for pod %q updated successfully: (%d, %+v)", format.Pod(pod), status.version, status.status)
m.apiStatusVersions[pod.UID] = status.version
// We don't handle graceful deletion of mirror pods.
if m.canBeDeleted(pod, status.status) {
deleteOptions := metav1.NewDeleteOptions(0)
// Use the pod UID as the precondition for deletion to prevent deleting a newly created pod with the same name and namespace.
deleteOptions.Preconditions = metav1.NewUIDPreconditions(string(pod.UID))
err = m.kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, deleteOptions)
if err != nil {
glog.Warningf("Failed to delete status for pod %q: %v", format.Pod(pod), err)
return
}
glog.V(3).Infof("Pod %q fully terminated and removed from etcd", format.Pod(pod))
m.deletePodStatus(uid)
}
}
2. ProbeManager
2.1 Manager接口
路径pkg/kubelet/prober/prober_manager.go
type Manager interface {
// AddPod creates new probe workers for every container probe. This should be called for every
// pod created.
AddPod(pod *v1.Pod)
// RemovePod handles cleaning up the removed pod state, including terminating probe workers and
// deleting cached results.
RemovePod(pod *v1.Pod)
// CleanupPods handles cleaning up pods which should no longer be running.
// It takes a list of "active pods" which should not be cleaned up.
CleanupPods(activePods []*v1.Pod)
// UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each
// container based on container running status, cached probe results and worker states.
UpdatePodStatus(types.UID, *v1.PodStatus)
// Start starts the Manager sync loops.
Start()
}
probeManager 检测 pod 中容器的健康状态,目前有两种 probe:readiness 和 liveness。
const (
liveness probeType = iota
readiness
)
readinessProbe 检测容器是否可以接受请求,如果检测结果失败,则将其从 service 的 endpoints 中移除,后续的请求也就不会发送给这个容器;livenessProbe 检测容器是否存活,如果检测结果失败,kubelet 会杀死这个容器,并重启一个新容器(除非 RestartPolicy 设置成了 Never)。
并不是所有的 pod 中的容器都有健康检查的探针,如果没有则不进行检测则认为容器是正常的。
数据流:
Run ->
syncLoop(kubelet) ->
syncLoopIteration ->
HandlePodAdditions
在每次创建新 pod 的时候,kubelet 都会调用 probeManager.AddPod(pod)
方法,调用的位置为:
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
start := kl.clock.Now()
sort.Sort(sliceutils.PodsByCreationTime(pods))
for _, pod := range pods {
existingPods := kl.podManager.GetPods()
// Always add the pod to the pod manager. Kubelet relies on the pod
// manager as the source of truth for the desired state. If a pod does
// not exist in the pod manager, it means that it has been deleted in
// the apiserver and no action (other than cleanup) is required.
kl.podManager.AddPod(pod)
if kubepod.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
if !kl.podIsTerminated(pod) {
// Only go through the admission process if the pod is not
// terminated.
// We failed pods that we rejected, so activePods include all admitted
// pods that are alive.
activePods := kl.filterOutTerminatedPods(existingPods)
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
kl.probeManager.AddPod(pod)
}
}
它对应的实现在 pkg/kubelet/prober/prober_manager.go
文件中:
func (m *manager) AddPod(pod *v1.Pod) {
m.workerLock.Lock()
defer m.workerLock.Unlock()
key := probeKey{podUID: pod.UID}
for _, c := range pod.Spec.Containers {
key.containerName = c.Name
if c.ReadinessProbe != nil {
key.probeType = readiness
if _, ok := m.workers[key]; ok {
glog.Errorf("Readiness probe already exists! %v - %v",
format.Pod(pod), c.Name)
return
}
w := newWorker(m, readiness, pod, c)
m.workers[key] = w
go w.run()
}
if c.LivenessProbe != nil {
key.probeType = liveness
if _, ok := m.workers[key]; ok {
glog.Errorf("Liveness probe already exists! %v - %v",
format.Pod(pod), c.Name)
return
}
w := newWorker(m, liveness, pod, c)
m.workers[key] = w
go w.run()
}
}
}
遍历 pod 中的容器,如定义了 readiness 或者 liveness,就创建一个 worker一个 goroutine 在后台运行这个 worker,run()函数定期探测容器,创建两个定时器。
func (w *worker) run() {
probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second
probeTicker := time.NewTicker(probeTickerPeriod)
defer func() {
// Clean up.
probeTicker.Stop()
if !w.containerID.IsEmpty() {
w.resultsManager.Remove(w.containerID)
}
w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType)
}()
// If kubelet restarted the probes could be started in rapid succession.
// Let the worker wait for a random portion of tickerPeriod before probing.
time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))
probeLoop:
for w.doProbe() {
// Wait for next probe tick.
select {
case <-w.stopCh:
break probeLoop
case <-probeTicker.C:
// continue
}
}
}
2.2 doProbe探测容器一次并报告结果,该函数返回值是否worker继续运行
func (w *worker) doProbe() (keepGoing bool) {
defer func() { recover() }() // Actually eat panics (HandleCrash takes care of logging)
defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true })
status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID)
if !ok {
// Either the pod has not been created yet, or it was already deleted.
glog.V(3).Infof("No status for pod: %v", format.Pod(w.pod))
return true
}
// Worker should terminate if pod is terminated.
if status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded {
glog.V(3).Infof("Pod %v %v, exiting probe worker",
format.Pod(w.pod), status.Phase)
return false
}
c, ok := podutil.GetContainerStatus(status.ContainerStatuses, w.container.Name)
if !ok || len(c.ContainerID) == 0 {
// Either the container has not been created yet, or it was deleted.
glog.V(3).Infof("Probe target container not found: %v - %v",
format.Pod(w.pod), w.container.Name)
return true // Wait for more information.
}
if w.containerID.String() != c.ContainerID {
if !w.containerID.IsEmpty() {
w.resultsManager.Remove(w.containerID)
}
w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
// We've got a new container; resume probing.
w.onHold = false
}
if w.onHold {
// Worker is on hold until there is a new container.
return true
}
if c.State.Running == nil {
glog.V(3).Infof("Non-running container probed: %v - %v",
format.Pod(w.pod), w.container.Name)
if !w.containerID.IsEmpty() {
w.resultsManager.Set(w.containerID, results.Failure, w.pod)
}
// Abort if the container will not be restarted.
return c.State.Terminated == nil ||
w.pod.Spec.RestartPolicy != v1.RestartPolicyNever
}
if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
return true
}
// TODO: in order for exec probes to correctly handle downward API env, we must be able to reconstruct
// the full container environment here, OR we must make a call to the CRI in order to get those environment
// values from the running container.
result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID)
if err != nil {
// Prober error, throw away the result.
return true
}
if w.lastResult == result {
w.resultRun++
} else {
w.lastResult = result
w.resultRun = 1
}
if (result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) ||
(result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) {
// Success or failure is below threshold - leave the probe state unchanged.
return true
}
w.resultsManager.Set(w.containerID, result, w.pod)
if w.probeType == liveness && result == results.Failure {
// The container fails a liveness check, it will need to be restarted.
// Stop probing until we see a new container ID. This is to reduce the
// chance of hitting #21751, where running `docker exec` when a
// container is being stopped may lead to corrupted container state.
w.onHold = true
w.resultRun = 1
}
return true
}
2.3 runProbe函数
最主要函数probe,调用最终函数执行的命令exec,HTTP, TCP三种方式,代码如下:
func (pb *prober) runProbe(p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
timeout := time.Duration(p.TimeoutSeconds) * time.Second
if p.Exec != nil {
glog.V(4).Infof("Exec-Probe Pod: %v, Container: %v, Command: %v", pod, container, p.Exec.Command)
command := kubecontainer.ExpandContainerCommandOnlyStatic(p.Exec.Command, container.Env)
return pb.exec.Probe(pb.newExecInContainer(container, containerID, command, timeout))
}
if p.HTTPGet != nil {
scheme := strings.ToLower(string(p.HTTPGet.Scheme))
host := p.HTTPGet.Host
if host == "" {
host = status.PodIP
}
port, err := extractPort(p.HTTPGet.Port, container)
if err != nil {
return probe.Unknown, "", err
}
path := p.HTTPGet.Path
glog.V(4).Infof("HTTP-Probe Host: %v://%v, Port: %v, Path: %v", scheme, host, port, path)
url := formatURL(scheme, host, port, path)
headers := buildHeader(p.HTTPGet.HTTPHeaders)
glog.V(4).Infof("HTTP-Probe Headers: %v", headers)
return pb.http.Probe(url, headers, timeout)
}
if p.TCPSocket != nil {
port, err := extractPort(p.TCPSocket.Port, container)
if err != nil {
return probe.Unknown, "", err
}
host := p.TCPSocket.Host
if host == "" {
host = status.PodIP
}
glog.V(4).Infof("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout)
return pb.tcp.Probe(host, port, timeout)
}
glog.Warningf("Failed to find probe builder for container: %v", container)
return probe.Unknown, "", fmt.Errorf("Missing probe handler for %s:%s", format.Pod(pod), container.Name)
}
2.3.1 Probe函数 exec方式
路径: pkg/probe/exec/exec.go
執行的爲cmd命令
type ExecProber interface {
Probe(e exec.Cmd) (probe.Result, string, error)
}
type execProber struct{}
func (pr execProber) Probe(e exec.Cmd) (probe.Result, string, error) {
data, err := e.CombinedOutput()
glog.V(4).Infof("Exec probe response: %q", string(data))
if err != nil {
exit, ok := err.(exec.ExitError)
if ok {
if exit.ExitStatus() == 0 {
return probe.Success, string(data), nil
} else {
return probe.Failure, string(data), nil
}
}
return probe.Unknown, "", err
}
return probe.Success, string(data), nil
}
2.3.2 probe函数 HTTPGET方式
路径: pkg/probe/http/http.go
HTTP请求response code 在200到400之间为成功
/ DoHTTPProbe checks if a GET request to the url succeeds.
// If the HTTP response code is successful (i.e. 400 > code >= 200), it returns Success.
// If the HTTP response code is unsuccessful or HTTP communication fails, it returns Failure.
// This is exported because some other packages may want to do direct HTTP probes.
func DoHTTPProbe(url *url.URL, headers http.Header, client HTTPGetInterface) (probe.Result, string, error) {
req, err := http.NewRequest("GET", url.String(), nil)
if err != nil {
// Convert errors into failures to catch timeouts.
return probe.Failure, err.Error(), nil
}
if _, ok := headers["User-Agent"]; !ok {
if headers == nil {
headers = http.Header{}
}
// explicitly set User-Agent so it's not set to default Go value
v := version.Get()
headers.Set("User-Agent", fmt.Sprintf("kube-probe/%s.%s", v.Major, v.Minor))
}
req.Header = headers
if headers.Get("Host") != "" {
req.Host = headers.Get("Host")
}
res, err := client.Do(req)
if err != nil {
// Convert errors into failures to catch timeouts.
return probe.Failure, err.Error(), nil
}
defer res.Body.Close()
b, err := ioutil.ReadAll(res.Body)
if err != nil {
return probe.Failure, "", err
}
body := string(b)
if res.StatusCode >= http.StatusOK && res.StatusCode < http.StatusBadRequest {
glog.V(4).Infof("Probe succeeded for %s, Response: %v", url.String(), *res)
return probe.Success, body, nil
}
glog.V(4).Infof("Probe failed for %s with request headers %v, response body: %v", url.String(), headers, body)
return probe.Failure, fmt.Sprintf("HTTP probe failed with statuscode: %d", res.StatusCode), nil
}
2.3.3 probe函数 TCPSocket方式
路径: pkg/probe/tcp/tcp.go
可以建立TCP连接为成功
// DoTCPProbe checks that a TCP socket to the address can be opened.
// If the socket can be opened, it returns Success
// If the socket fails to open, it returns Failure.
// This is exported because some other packages may want to do direct TCP probes.
func DoTCPProbe(addr string, timeout time.Duration) (probe.Result, string, error) {
conn, err := net.DialTimeout("tcp", addr, timeout)
if err != nil {
// Convert errors to failures to handle timeouts.
return probe.Failure, err.Error(), nil
}
err = conn.Close()
if err != nil {
glog.Errorf("Unexpected error closing TCP probe socket: %v (%#v)", err, err)
}
return probe.Success, "", nil
}
w.resultsManager.Set(w.containerID, result, w.pod)
来保存检测结果。结果保存在缓存中,并发送到 m.updates
管道。对于 liveness 来说,它的管道消费者是 kubelet,syncLoopIteration监听channel:
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
// The liveness manager detected a failure; sync the pod.
// We should not use the pod from livenessManager, because it is never updated after
// initialization.
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
if !ok {
// If the pod no longer exists, ignore the update.
glog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
break
}
glog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
handler.HandlePodSyncs([]*v1.Pod{pod})
}
liveness 关系pod 的存亡,所以需要 kubelet 的处理。而 readiness 失败也不会重建 pod,它的处理逻辑是不同的,只需要调用一次即可:
func (m *manager) Start() {
// Start syncing readiness.
go wait.Forever(m.updateReadiness, 0)
}
func (m *manager) updateReadiness() {
update := <-m.readinessManager.Updates()
ready := update.Result == results.Success
m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
}
proberManager
启动运行一个 goroutine 定时读取 readinessManager channel中的数据,并调用 statusManager
去更新 apiserver 中 pod 的状态信息。负责 Service 逻辑的组件获取到了这个状态,就能根据不同的值来决定是否需要更新 endpoints 的内容,也就是 service 的请求是否发送到这个 pod。
func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
pod, ok := m.podManager.GetPodByUID(podUID)
if !ok {
glog.V(4).Infof("Pod %q has been deleted, no need to update readiness", string(podUID))
return
}
oldStatus, found := m.podStatuses[pod.UID]
if !found {
glog.Warningf("Container readiness changed before pod has synced: %q - %q",
format.Pod(pod), containerID.String())
return
}
// Find the container to update.
containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String())
if !ok {
glog.Warningf("Container readiness changed for unknown container: %q - %q",
format.Pod(pod), containerID.String())
return
}
if containerStatus.Ready == ready {
glog.V(4).Infof("Container readiness unchanged (%v): %q - %q", ready,
format.Pod(pod), containerID.String())
return
}
// Make sure we're not updating the cached version.
status, err := copyStatus(&oldStatus.status)
if err != nil {
return
}
containerStatus, _, _ = findContainerStatus(&status, containerID.String())
containerStatus.Ready = ready
// Update pod condition.
readyConditionIndex := -1
for i, condition := range status.Conditions {
if condition.Type == v1.PodReady {
readyConditionIndex = i
break
}
}
readyCondition := GeneratePodReadyCondition(&pod.Spec, status.ContainerStatuses, status.Phase)
if readyConditionIndex != -1 {
status.Conditions[readyConditionIndex] = readyCondition
} else {
glog.Warningf("PodStatus missing PodReady condition: %+v", status)
status.Conditions = append(status.Conditions, readyCondition)
}
m.updateStatusInternal(pod, status, false)
}
SetContainerReadiness更新cache容器状态,会触发更新状态,首先检测是否需要更新(条件检查);
更多推荐
所有评论(0)