K8S node心跳机制

kubelet上报

使用master节点可以查看node节点上报的信息状态

kubelet在1.13版本中有两种上报心跳的方式

  • NodeStatus
    在这里插入图片描述

  • NodeLease (尝试开启node lease)
    在这里插入图片描述

  1. node_status/NodeLease

    • 所属代码:kubernetes/pkg/kubelet/kubelet_node_status.go

    • 通过kubelet.go#Run 方法goroutine 启动

    • syncNodeStatus是上报Node信息的函数

      • nodeStatusUpdateFrequency 是上报的时间,默认值为10s,我们现在设置值为3s
      • NodeLease 是1.13之后版本开始使用的新的上报模式,默认关闭,我尝试在现有版本开启,提示以上报错
        在这里插入图片描述
      //KubeClient  clientset.Interface   #使用clientset接口确定本机状态
      
      //Run 函数中上报node状态的代码
      if kl.kubeClient != nil {
         // 开始同步节点状态,需要设置指定的运行参数
         go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
       // kubelet启动时更新pod CIDR、运行状态、节点状态
         go kl.fastStatusUpdateOnce()
      
         // 使用lease进行更新kubelet状态
         if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
            go kl.nodeLeaseController.Run(wait.NeverStop)
         }
      }
      
    • 调用kubernetes/pkg/kubelet/kubelet.go#fastStatusUpdateOnce

      // fastStatusUpdateOnce starts a loop that checks the internal node indexer cache for when a CIDR
      // is applied  and tries to update pod CIDR immediately. After pod CIDR is updated it fires off
      // a runtime update and a node status update. Function returns after one successful node status update.
      // Function is executed only during Kubelet start which improves latency to ready node by updating
      // pod CIDR, runtime status and node statuses ASAP.
      func (kl *Kubelet) fastStatusUpdateOnce() {
         for {
            time.Sleep(100 * time.Millisecond) //sleep 100ms
            node, err := kl.GetNode()	
            if err != nil {
               klog.Errorf(err.Error())
               continue
            }
            if node.Spec.PodCIDR != "" {
               if _, err := kl.updatePodCIDR(node.Spec.PodCIDR); err != nil {
                  klog.Errorf("Pod CIDR update failed %v", err)
                  continue
               }
               kl.updateRuntimeUp()
               kl.syncNodeStatus()
               return
            }
         }
      }
      
    • GetNode

      // GetNode returns the node info for the configured node name of this Kubelet.
      func (kl *Kubelet) GetNode() (*v1.Node, error) {
         if kl.kubeClient == nil {
            return kl.initialNode()
         }
         return kl.nodeInfo.GetNodeInfo(string(kl.nodeName))
      }
      
    • 调用kubernetes/pkg/kubelet/kubelet_node_status.go中syncNodeStatus()函数

      // syncNodeStatusMux is a lock on updating the node status, because this path is not thread-safe.
      // This lock is used by Kublet.syncNodeStatus function and shouldn't be used anywhere else.
      syncNodeStatusMux sync.Mutex
      
      //NodeStatus上报
      func (kl *Kubelet) syncNodeStatus() {
         kl.syncNodeStatusMux.Lock()			 //使用syncNodeStatusMux设置线程锁
         defer kl.syncNodeStatusMux.Unlock()
      
         //优先判断节点、心跳状态不为空
         if kl.kubeClient == nil || kl.heartbeatClient == nil {
            return
         }
         //判断节点是否为注册节点,如为注册节点,则执行registerWithAPIServer
         //初步了解注册节点代表集群之前无该节点信息,需要第一次进行注册,后面继续分析
         if kl.registerNode {
            // This will exit immediately if it doesn't need to do anything.
            kl.registerWithAPIServer()
         }
         //非注册节点执行updateNodeStatus进行更新node信息
         if err := kl.updateNodeStatus(); err != nil {
            klog.Errorf("Unable to update node status: %v", err)
         }
      }
      
      // for internal book keeping; access only from within registerWithApiserver
      registrationCompleted bool
      
      //注册节点函数
      // registerWithAPIServer registers the node with the cluster master. It is safe
      // to call multiple times, but not concurrently (kl.registrationCompleted is
      // not locked).
      func (kl *Kubelet) registerWithAPIServer() {
         if kl.registrationCompleted {
            return
         }
         step := 100 * time.Millisecond  //设置step变量,起始值100ms,最大7s,无限循环重试初始化节点信息
      
         for {
            time.Sleep(step)
            step = step * 2
            if step >= 7*time.Second {
               step = 7 * time.Second
            }
      
            //使用initialNode初始化节点数据进行上报
            node, err := kl.initialNode()
            if err != nil {
               klog.Errorf("Unable to construct v1.Node object for kubelet: %v", err)
               continue
            }
            
            klog.Infof("Attempting to register node %s", node.Name)
            #执行tryRegisterWithAPIServer上报数据
            registered := kl.tryRegisterWithAPIServer(node)
            if registered {
               klog.Infof("Successfully registered node %s", node.Name)
               kl.registrationCompleted = true
               return
            }
         }
      }
      
      // initialNode constructs the initial v1.Node for this Kubelet, incorporating node
      // labels, information from the cloud provider, and Kubelet configuration.
      func (kl *Kubelet) initialNode() (*v1.Node, error) {
         node := &v1.Node{
            ObjectMeta: metav1.ObjectMeta{
               Name: string(kl.nodeName),
               Labels: map[string]string{
                  kubeletapis.LabelHostname: kl.hostname,
                  kubeletapis.LabelOS:       goruntime.GOOS,
                  kubeletapis.LabelArch:     goruntime.GOARCH,
               },
            },
            Spec: v1.NodeSpec{
               Unschedulable: !kl.registerSchedulable,
            },
         }
         nodeTaints := make([]v1.Taint, 0)
         if len(kl.registerWithTaints) > 0 {
            taints := make([]v1.Taint, len(kl.registerWithTaints))
            for i := range kl.registerWithTaints {
               if err := k8s_api_v1.Convert_core_Taint_To_v1_Taint(&kl.registerWithTaints[i], &taints[i], nil); err != nil {
                  return nil, err
               }
            }
            nodeTaints = append(nodeTaints, taints...)
         }
      
         unschedulableTaint := v1.Taint{
            Key:    schedulerapi.TaintNodeUnschedulable,
            Effect: v1.TaintEffectNoSchedule,
         }
      
         // If TaintNodesByCondition enabled, taint node with TaintNodeUnschedulable when initializing
         // node to avoid race condition; refer to #63897 for more detail.
         if utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition) {
            if node.Spec.Unschedulable &&
               !taintutil.TaintExists(nodeTaints, &unschedulableTaint) {
               nodeTaints = append(nodeTaints, unschedulableTaint)
            }
         }
      
         if kl.externalCloudProvider {
            taint := v1.Taint{
               Key:    schedulerapi.TaintExternalCloudProvider,
               Value:  "true",
               Effect: v1.TaintEffectNoSchedule,
            }
      
            nodeTaints = append(nodeTaints, taint)
         }
         if len(nodeTaints) > 0 {
            node.Spec.Taints = nodeTaints
         }
         // Initially, set NodeNetworkUnavailable to true.
         if kl.providerRequiresNetworkingConfiguration() {
            node.Status.Conditions = append(node.Status.Conditions, v1.NodeCondition{
               Type:               v1.NodeNetworkUnavailable,
               Status:             v1.ConditionTrue,
               Reason:             "NoRouteCreated",
               Message:            "Node created without a route",
               LastTransitionTime: metav1.NewTime(kl.clock.Now()),
            })
         }
      
         if kl.enableControllerAttachDetach {
            if node.Annotations == nil {
               node.Annotations = make(map[string]string)
            }
      
            klog.Infof("Setting node annotation to enable volume controller attach/detach")
            node.Annotations[volutil.ControllerManagedAttachAnnotation] = "true"
         } else {
            klog.Infof("Controller attach/detach is disabled for this node; Kubelet will attach and detach volumes")
         }
      
         if kl.keepTerminatedPodVolumes {
            if node.Annotations == nil {
               node.Annotations = make(map[string]string)
            }
            klog.Infof("Setting node annotation to keep pod volumes of terminated pods attached to the node")
            node.Annotations[volutil.KeepTerminatedPodVolumesAnnotation] = "true"
         }
      
         // @question: should this be place after the call to the cloud provider? which also applies labels
         for k, v := range kl.nodeLabels {
            if cv, found := node.ObjectMeta.Labels[k]; found {
               klog.Warningf("the node label %s=%s will overwrite default setting %s", k, v, cv)
            }
            node.ObjectMeta.Labels[k] = v
         }
      
         if kl.providerID != "" {
            node.Spec.ProviderID = kl.providerID
         }
      
         if kl.cloud != nil {
            instances, ok := kl.cloud.Instances()
            if !ok {
               return nil, fmt.Errorf("failed to get instances from cloud provider")
            }
      
            // TODO: We can't assume that the node has credentials to talk to the
            // cloudprovider from arbitrary nodes. At most, we should talk to a
            // local metadata server here.
            var err error
            if node.Spec.ProviderID == "" {
               node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(context.TODO(), kl.cloud, kl.nodeName)
               if err != nil {
                  return nil, err
               }
            }
      
            instanceType, err := instances.InstanceType(context.TODO(), kl.nodeName)
            if err != nil {
               return nil, err
            }
            if instanceType != "" {
               klog.Infof("Adding node label from cloud provider: %s=%s", kubeletapis.LabelInstanceType, instanceType)
               node.ObjectMeta.Labels[kubeletapis.LabelInstanceType] = instanceType
            }
            // If the cloud has zone information, label the node with the zone information
            zones, ok := kl.cloud.Zones()
            if ok {
               zone, err := zones.GetZone(context.TODO())
               if err != nil {
                  return nil, fmt.Errorf("failed to get zone from cloud provider: %v", err)
               }
               if zone.FailureDomain != "" {
                  klog.Infof("Adding node label from cloud provider: %s=%s", kubeletapis.LabelZoneFailureDomain, zone.FailureDomain)
                  node.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain] = zone.FailureDomain
               }
               if zone.Region != "" {
                  klog.Infof("Adding node label from cloud provider: %s=%s", kubeletapis.LabelZoneRegion, zone.Region)
                  node.ObjectMeta.Labels[kubeletapis.LabelZoneRegion] = zone.Region
               }
            }
         }
      
         kl.setNodeStatus(node)
      
         return node, nil
      }
      
      // tryRegisterWithAPIServer makes an attempt to register the given node with
      // the API server, returning a boolean indicating whether the attempt was
      // successful.  If a node with the same name already exists, it reconciles the
      // value of the annotation for controller-managed attach-detach of attachable
      // persistent volumes for the node.
      func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
         _, err := kl.kubeClient.CoreV1().Nodes().Create(node)
         if err == nil {
            return true
         }
      
         if !apierrors.IsAlreadyExists(err) {
            klog.Errorf("Unable to register node %q with API server: %v", kl.nodeName, err)
            return false
         }
      
         existingNode, err := kl.kubeClient.CoreV1().Nodes().Get(string(kl.nodeName), metav1.GetOptions{})
         if err != nil {
            klog.Errorf("Unable to register node %q with API server: error getting existing node: %v", kl.nodeName, err)
            return false
         }
         if existingNode == nil {
            klog.Errorf("Unable to register node %q with API server: no node instance returned", kl.nodeName)
            return false
         }
      
         originalNode := existingNode.DeepCopy()
         if originalNode == nil {
            klog.Errorf("Nil %q node object", kl.nodeName)
            return false
         }
      
         klog.Infof("Node %s was previously registered", kl.nodeName)
      
         // Edge case: the node was previously registered; reconcile
         // the value of the controller-managed attach-detach
         // annotation.
         requiresUpdate := kl.reconcileCMADAnnotationWithExistingNode(node, existingNode)
         requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate
         requiresUpdate = kl.reconcileExtendedResource(node, existingNode) || requiresUpdate
         if requiresUpdate {
            if _, _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil {
               klog.Errorf("Unable to reconcile node %q with API server: error updating node: %v", kl.nodeName, err)
               return false
            }
         }
      
         return true
      }
      
      apierrors "k8s.io/apimachinery/pkg/api/errors"
      

      updateNodeStatus

      // nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
      nodeStatusUpdateRetry = 5
      
      // updateNodeStatus updates node status to master with retries if there is any
      // change or enough time passed from the last sync.
      func (kl *Kubelet) updateNodeStatus() error {
         klog.V(5).Infof("Updating node status")
         for i := 0; i < nodeStatusUpdateRetry; i++ {
            #使用tryUpdateNodeStatus进行数据更新,之前遇到的报错,其实就是在这出现了error,可以继续往下看
            if err := kl.tryUpdateNodeStatus(i); err != nil {
               if i > 0 && kl.onRepeatedHeartbeatFailure != nil {
                  kl.onRepeatedHeartbeatFailure()
               }
               klog.Errorf("Error updating node status, will retry: %v", err)
            } else {
               return nil
            }
         }
         return fmt.Errorf("update node status exceeds retry count")
      }
      
      // tryUpdateNodeStatus tries to update node status to master if there is any
      // change or enough time passed from the last sync.
      func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
         // In large clusters, GET and PUT operations on Node objects coming
         // from here are the majority of load on apiserver and etcd.
         // To reduce the load on etcd, we are serving GET operations from
         // apiserver cache (the data might be slightly delayed but it doesn't
         // seem to cause more conflict - the delays are pretty small).
         // If it result in a conflict, all retries are served directly from etcd.
         
         //metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
         opts := metav1.GetOptions{}
         if tryNumber == 0 {
            //util.go里没有找到FromApiserverCache函数,不太确认其中的具体含义
            util.FromApiserverCache(&opts)
         }
         //HeartbeatClient         clientset.Interface
         //使用k8s.io/client-go/kubernetes/typed/core/v1/node.go#Get获取node名称
         node, err := kl.heartbeatClient.CoreV1().Nodes().Get(string(kl.nodeName), opts)
         if err != nil {
            return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
         }
      
         originalNode := node.DeepCopy()
         if originalNode == nil {
            return fmt.Errorf("nil %q node object", kl.nodeName)
         }
         //判断pod的地址段是否变化
         podCIDRChanged := false
         if node.Spec.PodCIDR != "" {
            // Pod CIDR could have been updated before, so we cannot rely on
            // node.Spec.PodCIDR being non-empty. We also need to know if pod CIDR is
            // actually changed.
            if podCIDRChanged, err = kl.updatePodCIDR(node.Spec.PodCIDR); err != nil {
               klog.Errorf(err.Error())
            }
         }
         //更新node信息
         kl.setNodeStatus(node)
      
         now := kl.clock.Now()
         //判断节点是否开启了NodeLease上报模式
         if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) && now.Before(kl.lastStatusReportTime.Add(kl.nodeStatusReportFrequency)) {
            if !podCIDRChanged && !nodeStatusHasChanged(&originalNode.Status, &node.Status) {
               return nil
            }
         }
      
         // Patch the current status on the API server
         //将最新的node信息上报至apiserver
         //nodeutil "k8s.io/kubernetes/pkg/util/node"
         updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node)
         if err != nil {
            return err
         }
         kl.lastStatusReportTime = now
         kl.setLastObservedNodeAddresses(updatedNode.Status.Addresses)
         // If update finishes successfully, mark the volumeInUse as reportedInUse to indicate
         // those volumes are already updated in the node's status
         kl.volumeManager.MarkVolumesAsReportedInUse(updatedNode.Status.VolumesInUse)
         return nil
      }
      
      // setNodeStatus fills in the Status fields of the given Node, overwriting
      // any fields that are currently set.
      // TODO(madhusudancs): Simplify the logic for setting node conditions and
      // refactor the node status condition code out to a different file.
      func (kl *Kubelet) setNodeStatus(node *v1.Node) {
         for i, f := range kl.setNodeStatusFuncs {
            klog.V(5).Infof("Setting node status at position %v", i)
            if err := f(node); err != nil {
               klog.Warningf("Failed to set some node status fields: %s", err)
            }
         }
      }
      
      // handlers called during the tryUpdateNodeStatus cycle
      setNodeStatusFuncs []func(*v1.Node) error
      
      // Generating the status funcs should be the last thing we do,
      // since this relies on the rest of the Kubelet having been constructed.
      klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
      return klet, nil
      
      // defaultNodeStatusFuncs is a factory that generates the default set of
      // setNodeStatus funcs
      func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {
         // if cloud is not nil, we expect the cloud resource sync manager to exist
         var nodeAddressesFunc func() ([]v1.NodeAddress, error)
         if kl.cloud != nil {
            nodeAddressesFunc = kl.cloudResourceSyncManager.NodeAddresses
         }
         var validateHostFunc func() error
         if kl.appArmorValidator != nil {
            validateHostFunc = kl.appArmorValidator.ValidateHost
         }
         var setters []func(n *v1.Node) error
         klog.V(5).Infof("Node_Address")
         //获取Address,MachineInfo,Capacity,VersionInfo等模块数据
         setters = append(setters,
            nodestatus.NodeAddress(kl.nodeIP, kl.nodeIPValidator, kl.hostname, kl.hostnameOverridden, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc),
            nodestatus.MachineInfo(string(kl.nodeName), kl.maxPods, kl.podsPerCore, kl.GetCachedMachineInfo, kl.containerManager.GetCapacity,
               kl.containerManager.GetDevicePluginResourceCapacity, kl.containerManager.GetNodeAllocatableReservation, kl.recordEvent),
            nodestatus.VersionInfo(kl.cadvisor.VersionInfo, kl.containerRuntime.Type, kl.containerRuntime.Version),
            nodestatus.DaemonEndpoints(kl.daemonEndpoints),
            nodestatus.Images(kl.nodeStatusMaxImages, kl.imageManager.GetImageList),
            nodestatus.GoRuntime(),
         )
         if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
            setters = append(setters, nodestatus.VolumeLimits(kl.volumePluginMgr.ListVolumePluginWithLimits))
         }
         klog.V(5).Infof("Node_Memory")
         //获取Condition等模块数据
         setters = append(setters,
            nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent),
            nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent),
            nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent),
            nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, validateHostFunc, kl.containerManager.Status, kl.recordNodeStatusEvent),
            nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse),
            // TODO(mtaufen): I decided not to move this setter for now, since all it does is send an event
            // and record state back to the Kubelet runtime object. In the future, I'd like to isolate
            // these side-effects by decoupling the decisions to send events and partial status recording
            // from the Node setters.
            kl.recordNodeSchedulableEvent,
         )
         return setters
      }
      
      // PatchNodeStatus patches node status.
      //v1core "k8s.io/client-go/kubernetes/typed/core/v1"
      //"k8s.io/apimachinery/pkg/types"
      func PatchNodeStatus(c v1core.CoreV1Interface, nodeName types.NodeName, oldNode *v1.Node, newNode *v1.Node) (*v1.Node, []byte, error) {
         patchBytes, err := preparePatchBytesforNodeStatus(nodeName, oldNode, newNode)
         if err != nil {
            return nil, nil, err
         }
      
         updatedNode, err := c.Nodes().Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status")
         if err != nil {
            return nil, nil, fmt.Errorf("failed to patch status %q for node %q: %v", patchBytes, nodeName, err)
         }
         return updatedNode, patchBytes, nil
      }
      
      func (kl *Kubelet) setLastObservedNodeAddresses(addresses []v1.NodeAddress) {
         kl.lastObservedNodeAddressesMux.Lock()
         defer kl.lastObservedNodeAddressesMux.Unlock()
         kl.lastObservedNodeAddresses = addresses
      }
      
      

      故障定位:
      kubelet日志如下:

      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bNJCrsZq-1651816958352)(file:///C:/Users/keda/Documents/WXWork/1688851908032325/Cache/Image/2021-07/企业微信截图_162626429922.png)]

      分析:

      • 日志提示error updating node status ,will retry可以直接定位到报错代码函数为updateNodeStatus调用kl.tryUpdateNodeStatus(i)时出现err
        image-20210719113709032

      • 查看tryUpdateNodeStatus函数时定位日志报错时提示error getting node异常,可以继续跟进至函数kl.heartbeatClient.CoreV1().Nodes().Get()
        在这里插入图片描述

      • 查看kl.heartbeatClient.CoreV1().Nodes().Get()发现这个函数
        在这里插入图片描述

      • 对比新版本其实可以发现kl.heartbeatClient.CoreV1().Nodes().Get()发现这个函数有三个参数,context,name,opts
        在这里插入图片描述

      • 查看context可以定位到其实我们的报错context其实是上下文这个函数才会报出来的异常值
        在这里插入图片描述

      结论:可以看到我们的报错,其实是出在了超出上下文截止时间,而1.13版本get和patch函数并没有使用到context这个参数,context deadline exceeded 解释 http://xiaorui.cc/archives/6999

contorller-manager判断机制
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐