k8s kubelet device-plugins

场景:

对于云的用户来说,在 GPU 的支持上,他们最基本的诉求其实非常简单:我只要在 Pod 的 YAML 里面,声明某容器需要的 GPU 个数,那么 Kubernetes 为我创建的容器里就应该出现对应的 GPU 设备,以及它对应的驱动目录。以 NVIDIA 的 GPU 设备为例,上面的需求就意味着当用户的容器被创建之后,这个容器里必须出现如下两部分设备和目录:
GPU 设备,比如 /dev/nvidia0;
GPU 驱动目录,比如 /usr/local/nvidia/*。

apiVersion: v1
kind: Pod
metadata:
  name: cuda-vector-add
spec:
  restartPolicy: OnFailure
  containers:
    - name: cuda-vector-add
      image: "k8s.gcr.io/cuda-vector-add:v0.1"
      resources:
        limits:
          nvidia.com/gpu: 1

在 kube-scheduler 里面,它其实并不关心nvidia.com/gpu的具体含义,只会在计算的时候,一律将调度器里保存的该类型资源的可用量,直接减去 Pod 声明的数值即可。为了能够让调度器知道这个自定义类型的资源在每台宿主机上的可用量,宿主机节点本身,就必须能够向 API Server 汇报该类型资源的可用数量。在 Kubernetes 里,各种类型的资源可用量,其实是 Node 对象 Status 字段的内容。为了能够在上述 Status 字段里添加自定义资源的数据,你就必须使用 PATCH API 来对该 Node 对象进行更新,加上你的自定义资源的数量。这个 PATCH 操作,可以简单地使用 curl 命令来发起

 curl --header "Content-Type: application/json-patch+json" \--request PATCH \--data '[{"op": "add", "path": "/status/capacity/nvidia.com/gpu", "value": "1"}]' \http://localhost:8001/api/v1/nodes//status
apiVersion: v1
kind: Node
...
Status:
  Capacity:
   cpu:  2
   memory:  2049008Ki
   nvidia.com/gpu: 1

在这里插入图片描述

Device Plugin工作原理

1、对于每一种硬件设备,都需要有它所对应的 Device Plugin 进行管理,这些 Device Plugin,都通过 gRPC 的方式同 kubelet 连接起来。以 NVIDIA GPU 为例,它对应的插件叫作NVIDIA GPU device plugin。DevicePlugin 注册一个socket 文件到 /var/lib/kubelet/device-plugins/ 目录下,Kubelet 通过这个目录下的socket 文件向对应的 DevicePlugin 发送gRPC 请求。PS: 通过目录做服务发现。

2、Device Plugin 会通过一个叫作 ListAndWatch 的 API,定期向 kubelet 汇报该 Node 上 GPU 的列表。比如,在上图的例子里,一共有三个 GPU(GPU0、GPU1 和 GPU2)。这样,kubelet 在拿到这个列表之后,就可以直接在它向 APIServer 发送的心跳里,以 Extended Resource 的方式,加上这些 GPU 的数量,比如nvidia.com/gpu=3。

3、当 kubelet 发现这个 Pod 的容器请求一个 GPU 的时候,kubelet 就会从自己持有的 GPU 列表里,为这个容器分配一个 GPU。此时,kubelet 就会向本机的 Device Plugin 发起一个 Allocate() 请求。这个请求携带的参数,正是即将分配给该容器的设备 ID 列表。

4、当 Device Plugin 收到 Allocate 请求之后,它就会根据 kubelet 传递过来的设备 ID,从 Device Plugin 里找到这些设备对应的设备路径和驱动目录。比如,在 NVIDIA Device Plugin 的实现里,它会定期访问 nvidia-docker 插件,从而获取到本机的 GPU 信息。而被分配 GPU 对应的设备路径和驱动目录信息被返回给 kubelet 之后,kubelet 就完成了为一个容器分配 GPU 的操作。接下来,kubelet 会把这些信息追加在创建该容器所对应的 CRI 请求当中。这样,当这个 CRI 请求发给 Docker 之后,Docker 为你创建出来的容器里,就会出现这个 GPU 设备,并把它所需要的驱动目录挂载进去。

service DevicePlugin {
    // ListAndWatch returns a stream of List of Devices
    // Whenever a Device state change or a Device disappears, ListAndWatch
    // returns the new list
    rpc ListAndWatch(Empty) returns (stream ListAndWatchResponse) {}
    // Allocate is called during container creation so that the Device
    // Plugin can run device specific operations and instruct Kubelet
    // of the steps to make the Device available in the container
    rpc Allocate(AllocateRequest) returns (AllocateResponse) {}
}

Device Plugin的不足:

目前 Kubernetes 本身的 Device Plugin 的设计,实际上能覆盖的场景是非常单一的,属于“可用”但是“不好用”的状态。一旦你的设备是异构的、不能简单地用“数目”去描述具体使用需求的时候,比如,“我的 Pod 想要运行在计算能力最强的那个 GPU 上”,Device Plugin 就完全不能处理了。在很多场景下,我们其实希望在调度器进行调度的时候,就可以根据整个集群里的某种硬件设备的全局分布,做出一个最佳的调度选择。

注意:

1、调度器扮演的角色,仅仅是为 Pod 寻找到可用的、支持这种硬件设备的节点
2、GPU 等硬件设备的调度工作,实际上是由 kubelet 完成的。即,kubelet 会负责从它所持有的硬件设备列表中,为容器挑选一个硬件设备,然后调用 Device Plugin 的 Allocate API 来完成这个分配操作。
3、不支持多个pod共享一个gpu

Nvidia Docker

最前面提到,在容器中使用GPU设备需要设备目录和驱动目录,而yaml文件中并没有定义此类字段,只说自己想要使用一块GPU,实际上本机GPU设备的设备目录设备号都是通过另一个插件nvidia-docker获得的。nvidia-docker实际上是宿主机上的一个守护进程,Device Plugin周期性的通过nvidia-docker获取本机的GPU信息。Device-Plugin将获取的设备路径和驱动目录信息返回给kubelet之后,kubelet将此信息加入到创建该容器所对应的CRI请求中,这样创建的容器就可以使用该GPU设备。

Device 插件原理

使用 Device 插件之前,首先要开启 DevicePlugins 功能,即配置 --feature-gates=DevicePlugins=true(默认是关闭的)。

Device 插件实际上是一个 gPRC 接口,需要实现 ListAndWatch() 和 Allocate() 等方法,并监听 gRPC Server 的 Unix Socket 在 /var/lib/kubelet/device-plugins/ 目录中,如 /var/lib/kubelet/device-plugins/nvidiaGPU.sock。

在实现 Device 插件时需要注意:

1、插件启动时,需要通过 /var/lib/kubelet/device-plugins/kubelet.sock 向 Kubelet 注册,同时提供插件的 Unix Socket 名称、API 的版本号和插件名称(格式为 vendor-domain/resource,如 nvidia.com/gpu)。Kubelet 会将这些设备暴露到 Node 状态中,方便后续调度器使用

2、插件启动后向 Kubelet 发送插件列表、按需分配设备并持续监控设备的实时状态

3、插件启动后要持续监控 Kubelet 的状态,并在 Kubelet 重启后重新注册自己。比如,Kubelet 刚启动后会清空 /var/lib/kubelet/device-plugins/ 目录,所以插件作者可以监控自己监听的 unix socket 是否被删除了,并根据此事件重新注册自己

device-plugins

Starting in version 1.8, Kubernetes provides a device plugin framework for vendors to advertise their resources to the kubelet without changing Kubernetes core code. Instead of writing custom Kubernetes code, vendors can implement a device plugin that can be deployed manually or as a DaemonSet. The targeted devices include GPUs, High-performance NICs, FPGAs, InfiniBand, and other similar computing resources that may require vendor specific initialization and setup.

译:

从kubernetes1.8版本开始,提供了设备插件框架,设备厂商无需修改kubernetes核心代码就可以将自己生产的设备的资源(kubernetes可管理的资源包括CPU、内存和存储资源)可以让kubelet使用(这一点与操作系统一样,所有设备厂商自己实现驱动)。设备厂商可以自己人工或者以DaemonSet方式部署,而不是定制kubernetes代码。目标设备包括GPU、高性能NIC(网络接口卡)、FPGA、InfiniBand以及其他类似的需要厂商指定初始化和安装的计算资源。

在这里插入图片描述

如果我作为kubernetes开发者,思路是由kubelet汇总所有的资源,然后在汇总到管理端,kubernetes也就是apiserver。当创建Pod时,请求会发送给scheduler,scheduler根据节点状态选择一个最优的节点,最后由最优节点的kubelet创建这个Pod。嗯,这个思路应该没什么大毛病,至少我开发的一个分布式计算系统采用的就是这个方式,没问题!好,我们先假设这个想法是就是kubernetes的设计方案,此处我们不讲内存、CPU、存储这些资源是kubelet是怎么获取的,因为本文的重点是device-plugins,我们只说GPU这个kubelet是怎么获取的。

上面说到了,kubernetes有设备插件框架,那这个框架又是什么样的呢?说白了也很简单,就是kubernetes定义了一套机制和接口,各设备厂商按照协议开发就可以了,这个和Linux驱动原理是一样的,只是实现方式不一样而已。

我们来看看kubernetes是怎么实现的,首先我们先说说机制:

1、厂商自行实现一个管理设备资源的程序,部署到相应的节点上,我们称之为插件;

2、插件需要向kubelet注册,注册内容要包含自己的endpoint
  (endpoint就是一个用于通信的地址)以及一些其他信息;
      
3、kubelet连接插件的endpoint,就此kubelet和插件就建立了联系;

4、kubelet监听/var/lib/kubelet/device-plugins/kubelet.sock(unix sockets)这个地址,
   插件监听的也是类似的地址,只是地址变成了/var/lib/kubelet/device-plugins/xx.sock

比如:gpu-plugin:
在这里插入图片描述

grpc接口分析

插件和kubelet之间的通信接口了,kubelet与插件采用grpc通信,通信接口定义在kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1/api.proto(不了解protobuf和grpc的同学请自行学习)中。文件中定义了两个用于通信的接口:

service Registration {
 rpc Register(RegisterRequest) returns (Empty) {}
}
service DevicePlugin {
 rpc GetDevicePluginOptions(Empty) returns (DevicePluginOptions) {}
 rpc ListAndWatch(Empty) returns (stream ListAndWatchResponse) {}
 rpc Allocate(AllocateRequest) returns (AllocateResponse) {}
 rpc PreStartContainer(PreStartContainerRequest) returns (PreStartContainerResponse) {}
}

第一个接口用于插件向kubelet注册,kubelet是服务端,插件是客户端;第二个接口是kubelet向插件索要支持,kubelet是客户端,插件是服务端,

看看插件注册的时候需要提供哪些信息?

message RegisterRequest {
 string version = 1; // 版本信息
 string endpoint = 2; // 插件的endpoint
 string resource_name = 3; // 资源名称
 DevicePluginOptions options = 4; // 插件选项
}
// 那插件选项又包含什么呢?
message DevicePluginOptions {
 bool pre_start_required = 1; // 启动容器前是否调用DevicePlugin.PreStartContainer()
}

version、endpoint、resource_name都比较好理解,pre_start_required 不是很明确,其实就是启动容器前先通知插件做一下准备,多一个机制扩展性要好一些。

接下来看看插件能为kubelet提供什么样的服务,亦或说kubelet需要插件提供什么样的功能。

1.rpc GetDevicePluginOptions(Empty) returns (DevicePluginOptions) {},这个和注册提供的信息是一样的,只是变成了kubelet可以再获取;

2.rpc PreStartContainer(PreStartContainerRequest) returns (PreStartContainerResponse) {},这个就是kubelet启动容器前调用的,其中PreStartContainerRequest,PreStartContainerResponse定义如下,非常简单:

message PreStartContainerRequest

{    

repeated string devicesIDs = 1; // 需要使用设备的所有ID,数组形式}
message PreStartContainerResponse { // 什么也没有

}

3.rpc ListAndWatch(Empty) returns (stream ListAndWatchResponse) {},kubelet监听设备变化,一旦有设备更新,插件就会通知kubelet,注意此处返回是stream类型。我们再来看看插件返回的结果都有什么?

 message ListAndWatchResponse {
 repeated Device devices = 1; //设备数组
}
message Device {
 string ID = 1; // 设备唯一ID
 string health = 2; // 设备健康情况,就是是好的还是坏的
}
// 内容非常少,其实内容越少弹性越大,内容越多约束越多。

4.rpc Allocate(AllocateRequest) returns (AllocateResponse) {},这个就是kubelet向插件申请资源的接口了,申请资源需要提供如下信息:

message AllocateRequest {message AllocateRequest {    
repeated ContainerAllocateRequest container_requests = 1;
}
message ContainerAllocateRequest {    
repeated string devicesIDs = 1; // 设备ID数组
} 
从上面的类型可以看出申请资源接口可以同时为多个容器申请资源,AllocateRequest .container_requests代表的是多个容器对于资源的需求,ContainerAllocateRequest.devicesIDs是一个容器对于资源的需求。我们再来看看插件返回给kubelet什么信息?

 message AllocateResponse {
 repeated ContainerAllocateResponse container_responses = 1;
}
message ContainerAllocateResponse {
 map<string, string> envs = 1; // 环境变量,需要为容器添加这些环境变量
 repeated Mount mounts = 2; // 挂载信息
 repeated DeviceSpec devices = 3; // 设备信息
 map<string, string> annotations = 4; // 需要加入到容器的annotations字段
}
message Mount {
 string container_path = 1; // 设备在容器中的路径
 string host_path = 2; // 设备在宿主机上的路径
 bool read_only = 3; // 是否只读
}
message DeviceSpec {
 string container_path = 1; // 设备在容器中的路径
 string host_path = 2; // 设备在宿主机上的路径
 string permissions = 3; // 访问设备需要的权限
}

和AllocateRequest一样,返回的申请结果也是多容器的。

源码分析:创建pod时如何为pod分配所需要的device resource

从创建pod开始

func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
    container := spec.container
    ...
    ...
    // 为pod生成配置
    containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, podIPs, target)
    if cleanupAction != nil {
        defer cleanupAction()
    }
    ...
    ...
}

// generateContainerConfig generates container config for kubelet runtime v1.
func (m *kubeGenericRuntimeManager) generateContainerConfig(container *v1.Container, pod *v1.Pod, restartCount int, podIP, imageRef string, podIPs []string, nsTarget *kubecontainer.ContainerID) (*runtimeapi.ContainerConfig, func(), error) {
    // 生成容器运行参数
    opts, cleanupAction, err := m.runtimeHelper.GenerateRunContainerOptions(pod, container, podIP, podIPs)
    if err != nil {
        return nil, nil, err
    }

    uid, username, err := m.getImageUser(container.Image)
    if err != nil {
        return nil, cleanupAction, err
    }

    // Verify RunAsNonRoot. Non-root verification only supports numeric user.
    if err := verifyRunAsNonRoot(pod, container, uid, username); err != nil {
        return nil, cleanupAction, err
    }

    command, args := kubecontainer.ExpandContainerCommandAndArgs(container, opts.Envs)
    logDir := BuildContainerLogsDirectory(pod.Namespace, pod.Name, pod.UID, container.Name)
    err = m.osInterface.MkdirAll(logDir, 0755)
    if err != nil {
        return nil, cleanupAction, fmt.Errorf("create container log directory for container %s failed: %v", container.Name, err)
    }
    containerLogsPath := buildContainerLogsPath(container.Name, restartCount)
    restartCountUint32 := uint32(restartCount)
    // 生成容器配置
    config := &runtimeapi.ContainerConfig{
        Metadata: &runtimeapi.ContainerMetadata{
            Name:    container.Name,
            Attempt: restartCountUint32,
        },
        Image:       &runtimeapi.ImageSpec{Image: imageRef},
        Command:     command,
        Args:        args,
        WorkingDir:  container.WorkingDir,
        Labels:      newContainerLabels(container, pod),
        Annotations: newContainerAnnotations(container, pod, restartCount, opts),
        // 容器挂载设备
        Devices:     makeDevices(opts),
        // 容器挂载目录
        Mounts:      m.makeMounts(opts, container),
        LogPath:     containerLogsPath,
        Stdin:       container.Stdin,
        StdinOnce:   container.StdinOnce,
        Tty:         container.TTY,
    }
    ...
    ...
}

// GenerateRunContainerOptions generates the RunContainerOptions, which can be used by
// the container runtime to set parameters for launching a container.
func (kl *Kubelet) GenerateRunContainerOptions(pod *v1.Pod, container *v1.Container, podIP string, podIPs []string) (*kubecontainer.RunContainerOptions, func(), error) {
    // 根据pod的resource种类,去获取相应的资源,比如:gpu等
    opts, err := kl.containerManager.GetResources(pod, container)
    if err != nil {
        return nil, nil, err
    }
    // The value of hostname is the short host name and it is sent to makeMounts to create /etc/hosts file.
    hostname, hostDomainName, err := kl.GeneratePodHostNameAndDomain(pod)
    if err != nil {
        return nil, nil, err
    }
    // nodename will be equals to hostname if SetHostnameAsFQDN is nil or false. If SetHostnameFQDN
    // is true and hostDomainName is defined, nodename will be the FQDN (hostname.hostDomainName)
    nodename, err := util.GetNodenameForKernel(hostname, hostDomainName, pod.Spec.SetHostnameAsFQDN)
    if err != nil {
        return nil, nil, err
    }
    opts.Hostname = nodename
    podName := volumeutil.GetUniquePodName(pod)
    volumes := kl.volumeManager.GetMountedVolumesForPod(podName)

    blkutil := volumepathhandler.NewBlockVolumePathHandler()
    blkVolumes, err := kl.makeBlockVolumes(pod, container, volumes, blkutil)
    if err != nil {
        return nil, nil, err
    }
    opts.Devices = append(opts.Devices, blkVolumes...)

    envs, err := kl.makeEnvironmentVariables(pod, container, podIP, podIPs)
    if err != nil {
        return nil, nil, err
    }
    opts.Envs = append(opts.Envs, envs...)

    // only podIPs is sent to makeMounts, as podIPs is populated even if dual-stack feature flag is not enabled.
    mounts, cleanupAction, err := makeMounts(pod, kl.getPodDir(pod.UID), container, hostname, hostDomainName, podIPs, volumes, kl.hostutil, kl.subpather, opts.Envs)
    if err != nil {
        return nil, cleanupAction, err
    }
    opts.Mounts = append(opts.Mounts, mounts...)

    // adding TerminationMessagePath on Windows is only allowed if ContainerD is used. Individual files cannot
    // be mounted as volumes using Docker for Windows.
    if len(container.TerminationMessagePath) != 0 {
        p := kl.getPodContainerDir(pod.UID, container.Name)
        if err := os.MkdirAll(p, 0750); err != nil {
            klog.ErrorS(err, "Error on creating dir", "path", p)
        } else {
            opts.PodContainerDir = p
        }
    }

    // only do this check if the experimental behavior is enabled, otherwise allow it to default to false
    if kl.experimentalHostUserNamespaceDefaulting {
        opts.EnableHostUserNamespace = kl.enableHostUserNamespace(pod)
    }

    return opts, cleanupAction, nil
}

// TODO: move the GetResources logic to PodContainerManager.
func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
    opts := &kubecontainer.RunContainerOptions{}
    // Allocate should already be called during predicateAdmitHandler.Admit(),
    // just try to fetch device runtime information from cached state here
    // 获取容器申请的device
    devOpts, err := cm.deviceManager.GetDeviceRunContainerOptions(pod, container)
    if err != nil {
        return nil, err
    } else if devOpts == nil {
        return opts, nil
    }
    // 设置容器运行时的device相关参数,设备路径,设备挂载点,设备信息通过Env传入等等
    opts.Devices = append(opts.Devices, devOpts.Devices...)
    opts.Mounts = append(opts.Mounts, devOpts.Mounts...)
    opts.Envs = append(opts.Envs, devOpts.Envs...)
    opts.Annotations = append(opts.Annotations, devOpts.Annotations...)
    return opts, nil
}

func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) {
    podUID := string(pod.UID)
    contName := container.Name
    needsReAllocate := false
    // 根据容器的Resources.Limits中的device种类,依次向其对应的插件发起Allocate
    for k, v := range container.Resources.Limits {
        resource := string(k)
        if !m.isDevicePluginResource(resource) || v.Value() == 0 {
            continue
        }
        err := m.callPreStartContainerIfNeeded(podUID, contName, resource)
        if err != nil {
            return nil, err
        }

        if !m.checkPodActive(pod) {
            klog.ErrorS(nil, "pod deleted from activePods, skip to reAllocate", "podUID", podUID)
            continue
        }

        // This is a device plugin resource yet we don't have cached
        // resource state. This is likely due to a race during node
        // restart. We re-issue allocate request to cover this race.
        if m.podDevices.containerDevices(podUID, contName, resource) == nil {
            needsReAllocate = true
        }
    }
    // 向device插件发起allocate
    if needsReAllocate {
        klog.V(2).InfoS("Needs to re-allocate device plugin resources for pod", "pod", klog.KObj(pod), "containerName", container.Name)
        if err := m.Allocate(pod, container); err != nil {
            return nil, err
        }
    }
    // 设置容器运行时如何来使用容器申请的device
    return m.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name), nil
}

// 向对应设备申请资源
func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error {
    // The pod is during the admission phase. We need to save the pod to avoid it
    // being cleaned before the admission ended
    m.setPodPendingAdmission(pod)

    if _, ok := m.devicesToReuse[string(pod.UID)]; !ok {
        m.devicesToReuse[string(pod.UID)] = make(map[string]sets.String)
    }
    // If pod entries to m.devicesToReuse other than the current pod exist, delete them.
    for podUID := range m.devicesToReuse {
        if podUID != string(pod.UID) {
            delete(m.devicesToReuse, podUID)
        }
    }
    // Allocate resources for init containers first as we know the caller always loops
    // through init containers before looping through app containers. Should the caller
    // ever change those semantics, this logic will need to be amended.
    for _, initContainer := range pod.Spec.InitContainers {
        if container.Name == initContainer.Name {
            if err := m.allocateContainerResources(pod, container, m.devicesToReuse[string(pod.UID)]); err != nil {
                return err
            }
            m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)])
            return nil
        }
    }
    if err := m.allocateContainerResources(pod, container, m.devicesToReuse[string(pod.UID)]); err != nil {
        return err
    }
    m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)])
    return nil
}

func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container, devicesToReuse map[string]sets.String) error {
    podUID := string(pod.UID)
    contName := container.Name
    allocatedDevicesUpdated := false
    needsUpdateCheckpoint := false
    // Extended resources are not allowed to be overcommitted.
    // Since device plugin advertises extended resources,
    // therefore Requests must be equal to Limits and iterating
    // over the Limits should be sufficient.
    for k, v := range container.Resources.Limits {
        resource := string(k)
        needed := int(v.Value())
        klog.V(3).InfoS("Looking for needed resources", "needed", needed, "resourceName", resource)
        if !m.isDevicePluginResource(resource) {
            continue
        }
        // Updates allocatedDevices to garbage collect any stranded resources
        // before doing the device plugin allocation.
        if !allocatedDevicesUpdated {
            m.UpdateAllocatedDevices()
            allocatedDevicesUpdated = true
        }
        allocDevices, err := m.devicesToAllocate(podUID, contName, resource, needed, devicesToReuse[resource])
        if err != nil {
            return err
        }
        if allocDevices == nil || len(allocDevices) <= 0 {
            continue
        }

        needsUpdateCheckpoint = true

        startRPCTime := time.Now()
        // Manager.Allocate involves RPC calls to device plugin, which
        // could be heavy-weight. Therefore we want to perform this operation outside
        // mutex lock. Note if Allocate call fails, we may leave container resources
        // partially allocated for the failed container. We rely on UpdateAllocatedDevices()
        // to garbage collect these resources later. Another side effect is that if
        // we have X resource A and Y resource B in total, and two containers, container1
        // and container2 both require X resource A and Y resource B. Both allocation
        // requests may fail if we serve them in mixed order.
        // TODO: may revisit this part later if we see inefficient resource allocation
        // in real use as the result of this. Should also consider to parallelize device
        // plugin Allocate grpc calls if it becomes common that a container may require
        // resources from multiple device plugins.
        m.mutex.Lock()
        // 获取device对应的device plugin实例,他们是启动的时候会通过grpc向kubelet注册
        eI, ok := m.endpoints[resource]
        m.mutex.Unlock()
        if !ok {
            m.mutex.Lock()
            m.allocatedDevices = m.podDevices.devices()
            m.mutex.Unlock()
            return fmt.Errorf("unknown Device Plugin %s", resource)
        }

        devs := allocDevices.UnsortedList()
        // TODO: refactor this part of code to just append a ContainerAllocationRequest
        // in a passed in AllocateRequest pointer, and issues a single Allocate call per pod.
        klog.V(3).InfoS("Making allocation request for device plugin", "devices", devs, "resourceName", resource)
        // 向设备对应的endpoint,也就是device plugin发起allocate的请求
        resp, err := eI.e.allocate(devs)
        metrics.DevicePluginAllocationDuration.WithLabelValues(resource).Observe(metrics.SinceInSeconds(startRPCTime))
        if err != nil {
            // In case of allocation failure, we want to restore m.allocatedDevices
            // to the actual allocated state from m.podDevices.
            m.mutex.Lock()
            m.allocatedDevices = m.podDevices.devices()
            m.mutex.Unlock()
            return err
        }

        if len(resp.ContainerResponses) == 0 {
            return fmt.Errorf("no containers return in allocation response %v", resp)
        }

        allocDevicesWithNUMA := checkpoint.NewDevicesPerNUMA()
        // Update internal cached podDevices state.
        m.mutex.Lock()
        for dev := range allocDevices {
            if m.allDevices[resource][dev].Topology == nil || len(m.allDevices[resource][dev].Topology.Nodes) == 0 {
                allocDevicesWithNUMA[nodeWithoutTopology] = append(allocDevicesWithNUMA[nodeWithoutTopology], dev)
                continue
            }
            for idx := range m.allDevices[resource][dev].Topology.Nodes {
                node := m.allDevices[resource][dev].Topology.Nodes[idx]
                allocDevicesWithNUMA[node.ID] = append(allocDevicesWithNUMA[node.ID], dev)
            }
        }
        m.mutex.Unlock()
        m.podDevices.insert(podUID, contName, resource, allocDevicesWithNUMA, resp.ContainerResponses[0])
    }

    if needsUpdateCheckpoint {
        return m.writeCheckpoint()
    }

    return nil
}

device-plugins整体流程分析

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

注意分配逻辑的维护

kubelet只会从你提供的devices列表中,随机抽出一些序号传过来,并不能手动指定,然后你收到序号后将设备分配出去,然后你需要更新你自己的设备分配情况,下次通知的时候把你分配出去的过滤掉。然后如果pod删除了,回收掉,没人用的话,你应该有设备发现机制,发现了后重新加回到你的设备列表

kubelet device-plugins example:

https://github.com/huyuan1999/kubelet-device-plugin-example
https://github.com/everpeace/k8s-host-device-plugin
https://github.com/dtaniwaki/k8s-virtual-device-plugin
https://github.com/everpeace/k8s-dumb-device-plugin/blob/master/server.go
https://github.com/kubernetes/kubernetes/blob/v1.25.0/test/images/sample-device-plugin/sampledeviceplugin.go
https://github.com/squat/generic-device-plugin/blob/main/deviceplugin/generic.go
https://github.com/squat/generic-device-plugin/blob/main/manifests/generic-device-plugin.yaml
https://github.com/squat/generic-device-plugin

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐