K8S node心跳机制学习
K8S node心跳机制kubelet上报使用master节点可以查看node节点上报的信息状态kubelet在1.13版本中有两种上报心跳的方式NodeStatusNodeLease(尝试开启node lease)node_status/NodeLease所属代码:kubernetes/pkg/kubelet/kubelet_node_status.go通过kubelet.go#Run 方法go
K8S node心跳机制
kubelet上报
使用master节点可以查看node节点上报的信息状态
kubelet在1.13版本中有两种上报心跳的方式
-
NodeStatus
-
NodeLease (尝试开启node lease)
-
node_status/NodeLease
-
所属代码:kubernetes/pkg/kubelet/kubelet_node_status.go
-
通过
kubelet.go#Run
方法goroutine 启动 -
syncNodeStatus是上报Node信息的函数
- nodeStatusUpdateFrequency 是上报的时间,默认值为10s,我们现在设置值为3s
- NodeLease 是1.13之后版本开始使用的新的上报模式,默认关闭,我尝试在现有版本开启,提示以上报错
//KubeClient clientset.Interface #使用clientset接口确定本机状态 //Run 函数中上报node状态的代码 if kl.kubeClient != nil { // 开始同步节点状态,需要设置指定的运行参数 go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop) // kubelet启动时更新pod CIDR、运行状态、节点状态 go kl.fastStatusUpdateOnce() // 使用lease进行更新kubelet状态 if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) { go kl.nodeLeaseController.Run(wait.NeverStop) } }
-
调用kubernetes/pkg/kubelet/kubelet.go#fastStatusUpdateOnce
// fastStatusUpdateOnce starts a loop that checks the internal node indexer cache for when a CIDR // is applied and tries to update pod CIDR immediately. After pod CIDR is updated it fires off // a runtime update and a node status update. Function returns after one successful node status update. // Function is executed only during Kubelet start which improves latency to ready node by updating // pod CIDR, runtime status and node statuses ASAP. func (kl *Kubelet) fastStatusUpdateOnce() { for { time.Sleep(100 * time.Millisecond) //sleep 100ms node, err := kl.GetNode() if err != nil { klog.Errorf(err.Error()) continue } if node.Spec.PodCIDR != "" { if _, err := kl.updatePodCIDR(node.Spec.PodCIDR); err != nil { klog.Errorf("Pod CIDR update failed %v", err) continue } kl.updateRuntimeUp() kl.syncNodeStatus() return } } }
-
GetNode
// GetNode returns the node info for the configured node name of this Kubelet. func (kl *Kubelet) GetNode() (*v1.Node, error) { if kl.kubeClient == nil { return kl.initialNode() } return kl.nodeInfo.GetNodeInfo(string(kl.nodeName)) }
-
调用kubernetes/pkg/kubelet/kubelet_node_status.go中syncNodeStatus()函数
// syncNodeStatusMux is a lock on updating the node status, because this path is not thread-safe. // This lock is used by Kublet.syncNodeStatus function and shouldn't be used anywhere else. syncNodeStatusMux sync.Mutex
//NodeStatus上报 func (kl *Kubelet) syncNodeStatus() { kl.syncNodeStatusMux.Lock() //使用syncNodeStatusMux设置线程锁 defer kl.syncNodeStatusMux.Unlock() //优先判断节点、心跳状态不为空 if kl.kubeClient == nil || kl.heartbeatClient == nil { return } //判断节点是否为注册节点,如为注册节点,则执行registerWithAPIServer //初步了解注册节点代表集群之前无该节点信息,需要第一次进行注册,后面继续分析 if kl.registerNode { // This will exit immediately if it doesn't need to do anything. kl.registerWithAPIServer() } //非注册节点执行updateNodeStatus进行更新node信息 if err := kl.updateNodeStatus(); err != nil { klog.Errorf("Unable to update node status: %v", err) } }
// for internal book keeping; access only from within registerWithApiserver registrationCompleted bool
//注册节点函数 // registerWithAPIServer registers the node with the cluster master. It is safe // to call multiple times, but not concurrently (kl.registrationCompleted is // not locked). func (kl *Kubelet) registerWithAPIServer() { if kl.registrationCompleted { return } step := 100 * time.Millisecond //设置step变量,起始值100ms,最大7s,无限循环重试初始化节点信息 for { time.Sleep(step) step = step * 2 if step >= 7*time.Second { step = 7 * time.Second } //使用initialNode初始化节点数据进行上报 node, err := kl.initialNode() if err != nil { klog.Errorf("Unable to construct v1.Node object for kubelet: %v", err) continue } klog.Infof("Attempting to register node %s", node.Name) #执行tryRegisterWithAPIServer上报数据 registered := kl.tryRegisterWithAPIServer(node) if registered { klog.Infof("Successfully registered node %s", node.Name) kl.registrationCompleted = true return } } }
// initialNode constructs the initial v1.Node for this Kubelet, incorporating node // labels, information from the cloud provider, and Kubelet configuration. func (kl *Kubelet) initialNode() (*v1.Node, error) { node := &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: string(kl.nodeName), Labels: map[string]string{ kubeletapis.LabelHostname: kl.hostname, kubeletapis.LabelOS: goruntime.GOOS, kubeletapis.LabelArch: goruntime.GOARCH, }, }, Spec: v1.NodeSpec{ Unschedulable: !kl.registerSchedulable, }, } nodeTaints := make([]v1.Taint, 0) if len(kl.registerWithTaints) > 0 { taints := make([]v1.Taint, len(kl.registerWithTaints)) for i := range kl.registerWithTaints { if err := k8s_api_v1.Convert_core_Taint_To_v1_Taint(&kl.registerWithTaints[i], &taints[i], nil); err != nil { return nil, err } } nodeTaints = append(nodeTaints, taints...) } unschedulableTaint := v1.Taint{ Key: schedulerapi.TaintNodeUnschedulable, Effect: v1.TaintEffectNoSchedule, } // If TaintNodesByCondition enabled, taint node with TaintNodeUnschedulable when initializing // node to avoid race condition; refer to #63897 for more detail. if utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition) { if node.Spec.Unschedulable && !taintutil.TaintExists(nodeTaints, &unschedulableTaint) { nodeTaints = append(nodeTaints, unschedulableTaint) } } if kl.externalCloudProvider { taint := v1.Taint{ Key: schedulerapi.TaintExternalCloudProvider, Value: "true", Effect: v1.TaintEffectNoSchedule, } nodeTaints = append(nodeTaints, taint) } if len(nodeTaints) > 0 { node.Spec.Taints = nodeTaints } // Initially, set NodeNetworkUnavailable to true. if kl.providerRequiresNetworkingConfiguration() { node.Status.Conditions = append(node.Status.Conditions, v1.NodeCondition{ Type: v1.NodeNetworkUnavailable, Status: v1.ConditionTrue, Reason: "NoRouteCreated", Message: "Node created without a route", LastTransitionTime: metav1.NewTime(kl.clock.Now()), }) } if kl.enableControllerAttachDetach { if node.Annotations == nil { node.Annotations = make(map[string]string) } klog.Infof("Setting node annotation to enable volume controller attach/detach") node.Annotations[volutil.ControllerManagedAttachAnnotation] = "true" } else { klog.Infof("Controller attach/detach is disabled for this node; Kubelet will attach and detach volumes") } if kl.keepTerminatedPodVolumes { if node.Annotations == nil { node.Annotations = make(map[string]string) } klog.Infof("Setting node annotation to keep pod volumes of terminated pods attached to the node") node.Annotations[volutil.KeepTerminatedPodVolumesAnnotation] = "true" } // @question: should this be place after the call to the cloud provider? which also applies labels for k, v := range kl.nodeLabels { if cv, found := node.ObjectMeta.Labels[k]; found { klog.Warningf("the node label %s=%s will overwrite default setting %s", k, v, cv) } node.ObjectMeta.Labels[k] = v } if kl.providerID != "" { node.Spec.ProviderID = kl.providerID } if kl.cloud != nil { instances, ok := kl.cloud.Instances() if !ok { return nil, fmt.Errorf("failed to get instances from cloud provider") } // TODO: We can't assume that the node has credentials to talk to the // cloudprovider from arbitrary nodes. At most, we should talk to a // local metadata server here. var err error if node.Spec.ProviderID == "" { node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(context.TODO(), kl.cloud, kl.nodeName) if err != nil { return nil, err } } instanceType, err := instances.InstanceType(context.TODO(), kl.nodeName) if err != nil { return nil, err } if instanceType != "" { klog.Infof("Adding node label from cloud provider: %s=%s", kubeletapis.LabelInstanceType, instanceType) node.ObjectMeta.Labels[kubeletapis.LabelInstanceType] = instanceType } // If the cloud has zone information, label the node with the zone information zones, ok := kl.cloud.Zones() if ok { zone, err := zones.GetZone(context.TODO()) if err != nil { return nil, fmt.Errorf("failed to get zone from cloud provider: %v", err) } if zone.FailureDomain != "" { klog.Infof("Adding node label from cloud provider: %s=%s", kubeletapis.LabelZoneFailureDomain, zone.FailureDomain) node.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain] = zone.FailureDomain } if zone.Region != "" { klog.Infof("Adding node label from cloud provider: %s=%s", kubeletapis.LabelZoneRegion, zone.Region) node.ObjectMeta.Labels[kubeletapis.LabelZoneRegion] = zone.Region } } } kl.setNodeStatus(node) return node, nil }
// tryRegisterWithAPIServer makes an attempt to register the given node with // the API server, returning a boolean indicating whether the attempt was // successful. If a node with the same name already exists, it reconciles the // value of the annotation for controller-managed attach-detach of attachable // persistent volumes for the node. func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool { _, err := kl.kubeClient.CoreV1().Nodes().Create(node) if err == nil { return true } if !apierrors.IsAlreadyExists(err) { klog.Errorf("Unable to register node %q with API server: %v", kl.nodeName, err) return false } existingNode, err := kl.kubeClient.CoreV1().Nodes().Get(string(kl.nodeName), metav1.GetOptions{}) if err != nil { klog.Errorf("Unable to register node %q with API server: error getting existing node: %v", kl.nodeName, err) return false } if existingNode == nil { klog.Errorf("Unable to register node %q with API server: no node instance returned", kl.nodeName) return false } originalNode := existingNode.DeepCopy() if originalNode == nil { klog.Errorf("Nil %q node object", kl.nodeName) return false } klog.Infof("Node %s was previously registered", kl.nodeName) // Edge case: the node was previously registered; reconcile // the value of the controller-managed attach-detach // annotation. requiresUpdate := kl.reconcileCMADAnnotationWithExistingNode(node, existingNode) requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate requiresUpdate = kl.reconcileExtendedResource(node, existingNode) || requiresUpdate if requiresUpdate { if _, _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil { klog.Errorf("Unable to reconcile node %q with API server: error updating node: %v", kl.nodeName, err) return false } } return true }
apierrors "k8s.io/apimachinery/pkg/api/errors"
updateNodeStatus
// nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed. nodeStatusUpdateRetry = 5
// updateNodeStatus updates node status to master with retries if there is any // change or enough time passed from the last sync. func (kl *Kubelet) updateNodeStatus() error { klog.V(5).Infof("Updating node status") for i := 0; i < nodeStatusUpdateRetry; i++ { #使用tryUpdateNodeStatus进行数据更新,之前遇到的报错,其实就是在这出现了error,可以继续往下看 if err := kl.tryUpdateNodeStatus(i); err != nil { if i > 0 && kl.onRepeatedHeartbeatFailure != nil { kl.onRepeatedHeartbeatFailure() } klog.Errorf("Error updating node status, will retry: %v", err) } else { return nil } } return fmt.Errorf("update node status exceeds retry count") }
// tryUpdateNodeStatus tries to update node status to master if there is any // change or enough time passed from the last sync. func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error { // In large clusters, GET and PUT operations on Node objects coming // from here are the majority of load on apiserver and etcd. // To reduce the load on etcd, we are serving GET operations from // apiserver cache (the data might be slightly delayed but it doesn't // seem to cause more conflict - the delays are pretty small). // If it result in a conflict, all retries are served directly from etcd. //metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" opts := metav1.GetOptions{} if tryNumber == 0 { //util.go里没有找到FromApiserverCache函数,不太确认其中的具体含义 util.FromApiserverCache(&opts) } //HeartbeatClient clientset.Interface //使用k8s.io/client-go/kubernetes/typed/core/v1/node.go#Get获取node名称 node, err := kl.heartbeatClient.CoreV1().Nodes().Get(string(kl.nodeName), opts) if err != nil { return fmt.Errorf("error getting node %q: %v", kl.nodeName, err) } originalNode := node.DeepCopy() if originalNode == nil { return fmt.Errorf("nil %q node object", kl.nodeName) } //判断pod的地址段是否变化 podCIDRChanged := false if node.Spec.PodCIDR != "" { // Pod CIDR could have been updated before, so we cannot rely on // node.Spec.PodCIDR being non-empty. We also need to know if pod CIDR is // actually changed. if podCIDRChanged, err = kl.updatePodCIDR(node.Spec.PodCIDR); err != nil { klog.Errorf(err.Error()) } } //更新node信息 kl.setNodeStatus(node) now := kl.clock.Now() //判断节点是否开启了NodeLease上报模式 if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) && now.Before(kl.lastStatusReportTime.Add(kl.nodeStatusReportFrequency)) { if !podCIDRChanged && !nodeStatusHasChanged(&originalNode.Status, &node.Status) { return nil } } // Patch the current status on the API server //将最新的node信息上报至apiserver //nodeutil "k8s.io/kubernetes/pkg/util/node" updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node) if err != nil { return err } kl.lastStatusReportTime = now kl.setLastObservedNodeAddresses(updatedNode.Status.Addresses) // If update finishes successfully, mark the volumeInUse as reportedInUse to indicate // those volumes are already updated in the node's status kl.volumeManager.MarkVolumesAsReportedInUse(updatedNode.Status.VolumesInUse) return nil }
// setNodeStatus fills in the Status fields of the given Node, overwriting // any fields that are currently set. // TODO(madhusudancs): Simplify the logic for setting node conditions and // refactor the node status condition code out to a different file. func (kl *Kubelet) setNodeStatus(node *v1.Node) { for i, f := range kl.setNodeStatusFuncs { klog.V(5).Infof("Setting node status at position %v", i) if err := f(node); err != nil { klog.Warningf("Failed to set some node status fields: %s", err) } } }
// handlers called during the tryUpdateNodeStatus cycle setNodeStatusFuncs []func(*v1.Node) error // Generating the status funcs should be the last thing we do, // since this relies on the rest of the Kubelet having been constructed. klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs() return klet, nil
// defaultNodeStatusFuncs is a factory that generates the default set of // setNodeStatus funcs func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error { // if cloud is not nil, we expect the cloud resource sync manager to exist var nodeAddressesFunc func() ([]v1.NodeAddress, error) if kl.cloud != nil { nodeAddressesFunc = kl.cloudResourceSyncManager.NodeAddresses } var validateHostFunc func() error if kl.appArmorValidator != nil { validateHostFunc = kl.appArmorValidator.ValidateHost } var setters []func(n *v1.Node) error klog.V(5).Infof("Node_Address") //获取Address,MachineInfo,Capacity,VersionInfo等模块数据 setters = append(setters, nodestatus.NodeAddress(kl.nodeIP, kl.nodeIPValidator, kl.hostname, kl.hostnameOverridden, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc), nodestatus.MachineInfo(string(kl.nodeName), kl.maxPods, kl.podsPerCore, kl.GetCachedMachineInfo, kl.containerManager.GetCapacity, kl.containerManager.GetDevicePluginResourceCapacity, kl.containerManager.GetNodeAllocatableReservation, kl.recordEvent), nodestatus.VersionInfo(kl.cadvisor.VersionInfo, kl.containerRuntime.Type, kl.containerRuntime.Version), nodestatus.DaemonEndpoints(kl.daemonEndpoints), nodestatus.Images(kl.nodeStatusMaxImages, kl.imageManager.GetImageList), nodestatus.GoRuntime(), ) if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { setters = append(setters, nodestatus.VolumeLimits(kl.volumePluginMgr.ListVolumePluginWithLimits)) } klog.V(5).Infof("Node_Memory") //获取Condition等模块数据 setters = append(setters, nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent), nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent), nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent), nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, validateHostFunc, kl.containerManager.Status, kl.recordNodeStatusEvent), nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse), // TODO(mtaufen): I decided not to move this setter for now, since all it does is send an event // and record state back to the Kubelet runtime object. In the future, I'd like to isolate // these side-effects by decoupling the decisions to send events and partial status recording // from the Node setters. kl.recordNodeSchedulableEvent, ) return setters }
// PatchNodeStatus patches node status. //v1core "k8s.io/client-go/kubernetes/typed/core/v1" //"k8s.io/apimachinery/pkg/types" func PatchNodeStatus(c v1core.CoreV1Interface, nodeName types.NodeName, oldNode *v1.Node, newNode *v1.Node) (*v1.Node, []byte, error) { patchBytes, err := preparePatchBytesforNodeStatus(nodeName, oldNode, newNode) if err != nil { return nil, nil, err } updatedNode, err := c.Nodes().Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status") if err != nil { return nil, nil, fmt.Errorf("failed to patch status %q for node %q: %v", patchBytes, nodeName, err) } return updatedNode, patchBytes, nil }
func (kl *Kubelet) setLastObservedNodeAddresses(addresses []v1.NodeAddress) { kl.lastObservedNodeAddressesMux.Lock() defer kl.lastObservedNodeAddressesMux.Unlock() kl.lastObservedNodeAddresses = addresses }
故障定位:
kubelet日志如下:分析:
-
日志提示error updating node status ,will retry可以直接定位到报错代码函数为updateNodeStatus调用kl.tryUpdateNodeStatus(i)时出现err
-
查看tryUpdateNodeStatus函数时定位日志报错时提示error getting node异常,可以继续跟进至函数kl.heartbeatClient.CoreV1().Nodes().Get()
-
查看kl.heartbeatClient.CoreV1().Nodes().Get()发现这个函数
-
对比新版本其实可以发现kl.heartbeatClient.CoreV1().Nodes().Get()发现这个函数有三个参数,context,name,opts
-
查看context可以定位到其实我们的报错context其实是上下文这个函数才会报出来的异常值
结论:可以看到我们的报错,其实是出在了超出上下文截止时间,而1.13版本get和patch函数并没有使用到context这个参数,context deadline exceeded 解释 http://xiaorui.cc/archives/6999
-
-
contorller-manager判断机制
更多推荐
所有评论(0)