0. 前言 

上一篇中我们基于kubernetes实现了pod的有状态迁移,但同时也发现整个迁移过程会比较繁琐,因此本文将对这一过程编写自动化脚本,实现一键迁移的功能,即输入pod的名称和目标节点,系统自己完成整个迁移过程。

再一次说明,本文的底层技术和方案均来自k8s社区,本文的目的在于将这一过程封装起来,为今后在此基础上扩展其它实验提供便利。

1. 迁移流程代码解释

全部流程如下:

整个流程一共包含两个python文件,即master.py与node.py,分别运行在master节点和其它所有node节点上,master.py负责流程控制,node.py在工作节点上负责具体的迁移工作实施。

下面将依据上图依次展示代码实现。

0. 程序所需的依赖包:

kubernetes==27.2.0
PyYAML==6.0.1
Requests==2.31.0

(pyhon自带的未列出,见完整代码)

1. 查询pod信息(master.py)

在主函数中输入pod名称和目标节点名称,查询pod信息并展示,方法为get_pod_info,如下:

# 查找待迁移pod的基本信息,输入pod名称,输出pod所在的节点IP、pod命名空间、容器名称
def get_pod_info(pod_name):
    try:
        # 使用默认的Kubeconfig配置
        config.load_kube_config()
        # 创建一个Kubernetes API客户端
        api = client.CoreV1Api()
        # 在pod列表里搜索指定Pod
        pod_list = api.list_pod_for_all_namespaces(watch=False)
        target_pod = None
        for pod in pod_list.items:
            if pod.metadata.name == pod_name:
                target_pod = pod
                break

        if target_pod:
            # 获取Pod所在节点的IP
            node_name = target_pod.spec.node_name
            pod_namespace = target_pod.metadata.namespace

            # 获取Pod内所有容器的名称
            container_names = [container.name for container in target_pod.spec.containers]

            return {
                "Node IP": node_name,
                "Namespace": pod_namespace,
                "Container Names": container_names
            }
        else:
            return "Pod not found."

    except Exception as e:
        return f"Error: {str(e)}"
2. 发送pod检查点存档指令(master.py)
# 下发检查点存档指令,输入存档请求需要的所有参数,执行存档请求,返回存档文件名称
def send_checkpoint(source_node_ip, pod_namespace, pod_name, container_name):
    url = f"https://{source_node_ip}:10250/checkpoint/{pod_namespace}/{pod_name}/{container_name}"

    # 发送POST请求
    response = requests.post(url, cert=cert, verify=False)  # 设置verify为False来忽略SSL验证

    # 检查响应
    if response.status_code == 200:
        print("dump请求成功,响应内容:")
        data = json.loads(response.text)
        tar_path = data['items'][0]
        tar_name = os.path.basename(tar_path)
        print(data)

        return tar_name

    else:
        print(f"请求失败,状态码: {response.status_code}")

检查点存档请求成功后将记录存档文件的文件名,格式为.tar,并作为结果返回。

3. 对待迁移pod实行检查点存档的实际操作,这一步由kubelet、criu工具来完成,不需要自己写代码。
4. 发送存档传输指令(master.py)
# 下发存档传输指令,输入源节点IP、待传输文档名称、目标节点IP,返回传输状态
def send_tar_transport(source_node_ip, tar_name, target_node_ip):

    '''
    创建一个 XML-RPC 服务器代理对象。该代理对象用于与指定的地址和端口上运行的 XML-RPC 
    服务器进行通信。使用该代理对象来调用服务器上公开的 XML-RPC 方法。
    '''
    # 由于存档文件是保存在源节点上,所以需要源节点传输存档文件到目标节点
    # 这里创建源节点(待迁移pod所在节点)的代理对象,调用源节点上的tar_transport方法
    proxy = xmlrpc.client.ServerProxy(f"http://{source_node_ip}:8000/")

    # 发送存档传输命令
    result = proxy.tar_transport(tar_name, target_node_ip)
    print(f"存档传输执行状态: {result}  存档文件已传输至目标节点")

    return result
5. 运行RPC服务(node.py)

根据第4步的思路,节点之间存在p2p的文件传输,每个node节点既是服务端又是客户端,所以每个节点都需要开启RPC服务,并一直保持服务:

if __name__ == "__main__":
    server = xmlrpc.server.SimpleXMLRPCServer(("0.0.0.0", 8000))
    
    print("等待RPC请求...")
    server.serve_forever()

同时,为了能让master节点调用tar_transport方法,需要在服务中注册该方法:

if __name__ == "__main__":
    server = xmlrpc.server.SimpleXMLRPCServer(("0.0.0.0", 8000))
    server.register_function(tar_transport)
    print("等待RPC请求...")
    server.serve_forever()
6. 在node.py中实现文件传输方法(node.py)
# 输入存档文件名称、目标节点名称,发送存档文件到目标节点
def tar_transport(filename, target_node_ip):
    local_file_path = f"/var/lib/kubelet/checkpoints/{filename}"

    try:
        # 读取.tar文件数据
        with open(local_file_path, 'rb') as file:
            file_data = xmlrpc.client.Binary(file.read())

        # 连接服务器端
        proxy = xmlrpc.client.ServerProxy(f"http://{target_node_ip}:8000/")

        # 发送存档文件到目标节点
        result = proxy.receive_file(filename, file_data)

        if result:
            return "successful"
        else:
            return "failing,文件在目标节点上保存失败"
    except Exception as e:
        return f"文件传输失败: {str(e)}"

同时,目标节点对存档文件进行保存:

def receive_file(filename, file_data):
    emote_file_path = f"/check/{filename}"
    # 保存接收到的文件数据
    with open(emote_file_path, 'wb') as file:
        file.write(file_data.data)

    return True
7. 发送镜像创建指令(master.py)
# 下发镜像创建指令,输入目标节点IP、pod名称、pod内容器名称,存档文件名称,返回镜像创建结果
def send_buildah_operations(target_node_ip, pod_name, container_name, tar_name):
    proxy = xmlrpc.client.ServerProxy(f"http://{target_node_ip}:8000/")

    # 发送镜像创建命令
    result = proxy.buildah_operations(pod_name, container_name, tar_name)
    print(f"镜像建立执行状态: {result}  已创建待恢复镜像")

    return result

代码思路与第4步一致,不再过多解释。

8. 在node.py中实现镜像创建方法(node.py)
# 输入pod名称、容器名称、存档文件名称,创建待恢复pod的镜像,返回镜像名称
def buildah_operations(pod_name, container_name, tar_name):
    try:
        # 创建新容器
        new_container = subprocess.check_output(["buildah", "from", "scratch"]).decode().strip()
        # 指定文件的路径和名称
        source_path = f"/check/{tar_name}"
        # 将文件添加到容器
        subprocess.run(["buildah", "add", new_container, source_path, "/"])
        # 配置注释
        container_name_annotation = f"io.kubernetes.cri-o.annotations.checkpoint.name={container_name}"
        subprocess.run(["buildah", "config", "--annotation", container_name_annotation, new_container])
        # 提交镜像
        subprocess.run(["buildah", "commit", new_container, f"{pod_name}:dump"]
        # 删除容器
        subprocess.run(["buildah", "rm", new_container])

        return "successful"
    except Exception as e:
        return f"Buildah操作出错:{str(e)}"

本质上是使用buildah工具创建镜像并将这一过程打包。

更新node.py的方法注册信息:

if __name__ == "__main__":
    server = xmlrpc.server.SimpleXMLRPCServer(("0.0.0.0", 8000))
    server.register_function(tar_transport)
    server.register_function(receive_file)
    server.register_function(buildah_operations)
    print("等待RPC请求...")
    server.serve_forever()
9. 接下来做一些辅助工作后恢复pod,具体为获取pod的配置信息并修改,修改内容包括所使用的镜像、运行的节点等(master.py)
# 读取pod配置信息
def get_pod_config(namespace, pod_name):
    try:
        # 执行 kubectl 命令并捕获输出
        command = f"kubectl get -n {namespace} pod {pod_name} -oyaml"
        result = subprocess.check_output(command, shell=True, text=True)

        # 解析 YAML
        pod_data = yaml.safe_load(result)

        # 提取 last-applied-configuration 字段
        last_applied_config = pod_data.get('metadata', {}).get('annotations', {}).get(
            'kubectl.kubernetes.io/last-applied-configuration')

        if last_applied_config:
            # 将 last-applied-configuration 字段解析为 YAML
            pod_config = yaml.safe_load(last_applied_config)

            return pod_config
        else:
            return None

    except Exception as e:
        return str(e)


# 修改pod配置信息,镜像改为待恢复镜像
def modify_pod_yaml(pod_data, new_image, node_name):
    if pod_data:
        # 修改镜像
        containers = pod_data['spec']['containers']
        for container in containers:
            container['image'] = new_image

        # 增加节点选择字段
        pod_data['spec']['nodeName'] = node_name

    return pod_data

2. 代码运行效果验证

 1. 运行待迁移pod

命令为kubectl apply -f counters.yaml,counters.yaml文件内容如下:

apiVersion: v1
kind: Pod
metadata:
  name: counters
spec:
  containers:
  - name: counter
    image: quay.io/adrianreber/counter:blog

查看pod所在节点和使用的镜像:

pod所在节点为mu36,使用镜像与counters.yaml文件的配置一致。

2. 迁移pod

 将pod迁移至mu26,迁移过程的输出信息如下:

此时查看pod所在节点和使用的镜像:

 可以看到pod已经迁移到mu26,并且使用的是存档文件所建立的镜像。

3. 完整代码

1. master.py
import os
import json
import yaml
import xmlrpc.client
import requests
import subprocess
from kubernetes import client, config

cert = ("/etc/kubernetes/pki/apiserver-kubelet-client.crt", "/etc/kubernetes/pki/apiserver-kubelet-client.key")
tar_name = ""


# 查找待迁移pod的基本信息,输入pod名称,输出pod所在的节点IP、pod命名空间、容器名称
def get_pod_info(pod_name):
    try:
        # 使用默认的Kubeconfig配置
        config.load_kube_config()

        # 创建一个Kubernetes API客户端
        api = client.CoreV1Api()

        # 搜索指定Pod
        pod_list = api.list_pod_for_all_namespaces(watch=False)
        target_pod = None
        for pod in pod_list.items:
            if pod.metadata.name == pod_name:
                target_pod = pod
                break

        if target_pod:
            # 获取Pod所在节点的IP
            node_name = target_pod.spec.node_name
            pod_namespace = target_pod.metadata.namespace

            # 获取Pod内所有容器的名称
            container_names = [container.name for container in target_pod.spec.containers]

            return {
                "Node IP": node_name,
                "Namespace": pod_namespace,
                "Container Names": container_names
            }
        else:
            return "Pod not found."

    except Exception as e:
        return f"Error: {str(e)}"


# 下发检查点存档指令,输入存档请求需要的所有参数,执行存档请求,返回存档文件名称
def send_checkpoint(source_node_ip, pod_namespace, pod_name, container_name):
    url = f"https://{source_node_ip}:10250/checkpoint/{pod_namespace}/{pod_name}/{container_name}"

    # 发送POST请求
    response = requests.post(url, cert=cert, verify=False)  # 设置verify为False来忽略SSL验证

    # 检查响应
    if response.status_code == 200:
        print("dump请求成功,响应内容:")
        data = json.loads(response.text)
        tar_path = data['items'][0]
        tar_name = os.path.basename(tar_path)
        print(data)

        return tar_name

    else:
        print(f"请求失败,状态码: {response.status_code}")


# 下发存档传输指令,输入源节点IP、待传输文档名称、目标节点IP,返回传输状态
def send_tar_transport(source_node_ip, tar_name, target_node_ip):
    proxy = xmlrpc.client.ServerProxy(f"http://{source_node_ip}:8000/")

    # 发送存档传输命令
    result = proxy.tar_transport(tar_name, target_node_ip)
    print(f"存档传输执行状态: {result}  存档文件已传输至目标节点")

    return result


# 下发镜像创建指令,输入目标节点IP、pod名称、pod内容器名称,存档文件名称,返回镜像创建结果
def send_buildah_operations(target_node_ip, pod_name, container_name, tar_name):
    proxy = xmlrpc.client.ServerProxy(f"http://{target_node_ip}:8000/")

    # 发送镜像创建命令
    result = proxy.buildah_operations(pod_name, container_name, tar_name)
    print(f"镜像建立执行状态: {result}  已创建待恢复镜像")

    return result


# Pod有状态迁移中间过程总控制
def pod_transport(source_node_ip, target_node_ip, pod_namespace, pod_name, container_name):
    tar_name = send_checkpoint(source_node_ip, pod_namespace, pod_name, container_name)

    if tar_name:
        result_tar_transport = send_tar_transport(source_node_ip, tar_name, target_node_ip)

        if result_tar_transport == "successful":
            result_buildah_operations = send_buildah_operations(target_node_ip, pod_name, container_name, tar_name)

            if result_buildah_operations == "successful":
                return result_buildah_operations
            else:
                print(result_buildah_operations)
        else:
            print(result_tar_transport)
    else:
        print("pod存档失败")


# 读取pod配置信息
def get_pod_config(namespace, pod_name):
    try:
        # 执行 kubectl 命令并捕获输出
        command = f"kubectl get -n {namespace} pod {pod_name} -oyaml"
        result = subprocess.check_output(command, shell=True, text=True)

        # 解析 YAML
        pod_data = yaml.safe_load(result)

        # 提取 last-applied-configuration 字段
        last_applied_config = pod_data.get('metadata', {}).get('annotations', {}).get(
            'kubectl.kubernetes.io/last-applied-configuration')

        if last_applied_config:
            # 将 last-applied-configuration 字段解析为 YAML
            pod_config = yaml.safe_load(last_applied_config)

            return pod_config
        else:
            return None

    except Exception as e:
        return str(e)


# 修改pod配置信息,镜像改为待恢复镜像
def modify_pod_yaml(pod_data, new_image, node_name):
    if pod_data:
        # 修改镜像
        containers = pod_data['spec']['containers']
        for container in containers:
            container['image'] = new_image

        # 增加节点选择字段
        pod_data['spec']['nodeName'] = node_name

    return pod_data

# pod恢复节点总控
def restore_pod(namespace, pod_name, new_image, node_name):
    pod_config = get_pod_config(namespace, pod_name)

    modify_pod_yaml(pod_config, new_image, node_name)

    if pod_config:
        # 保存配置到文件
        with open(f"/check/{pod_name}_restore.yaml", 'w') as f:
            yaml.dump(pod_config, f, default_flow_style=False)

        try:
            subprocess.run(["kubectl", "delete", "pod", "-n", namespace, pod_name])
            subprocess.run(["kubectl", "apply", "-f", f"/check/{pod_name}_restore.yaml"])
            print(f"Pod已迁移至{node_name}")
            print(f"Pod配置信息已修改并保存到 /check/{pod_name}_restore.yaml 文件。")

        except Exception as e:
            return f"Pod恢复操作出错:{str(e)}"

    else:
        print("未找到指定的Pod或配置信息。")


if __name__ == "__main__":

    print("-----------------------------------------")
    pod_name = input("请输入待迁移pod名称:")
    target_node_ip = input("请输入目标节点名称: ")

    pod_info = get_pod_info(pod_name)

    print("-----------------------------------------")
    print(f"pod所在节点:{pod_info['Node IP']}")
    print(f"pod所在命名空间:{pod_info['Namespace']}")
    print(f"pod内部容器:{pod_info['Container Names'][0]}")
    print("-----------------------------------------")
    print("开始执行迁移操作...")

    result_image = pod_transport(pod_info["Node IP"], target_node_ip, pod_info["Namespace"], pod_name, pod_info["Container Names"][0])

    if result_image:
        restore_pod(pod_info["Namespace"], pod_name, f"localhost/{pod_name}:dump", target_node_ip)

2. node.py 
import subprocess
import xmlrpc.server
import xmlrpc.client


def receive_file(filename, file_data):
    # save_path = '/check/'  # 指定保存文件的文件夹路径
    # file_path = os.path.join(save_path, filename)
    emote_file_path = f"/check/{filename}"

    # 保存接收到的文件数据
    with open(emote_file_path, 'wb') as file:
        file.write(file_data.data)

    return True


# 输入存档文件名称、目标节点名称,发送存档文件到目标节点
def tar_transport(filename, target_node_ip):
    local_file_path = f"/var/lib/kubelet/checkpoints/{filename}"

    try:
        # 读取.tar文件数据
        with open(local_file_path, 'rb') as file:
            file_data = xmlrpc.client.Binary(file.read())

        # 连接服务器端
        proxy = xmlrpc.client.ServerProxy(f"http://{target_node_ip}:8000/")

        # 发送存档文件到目标节点
        result = proxy.receive_file(filename, file_data)

        if result:
            return "successful"
        else:
            return "failing,文件在目标节点上保存失败"
    except Exception as e:
        return f"文件传输失败: {str(e)}"


# 输入pod名称、容器名称、存档文件名称,创建待恢复pod的镜像,返回镜像名称
def buildah_operations(pod_name, container_name, tar_name):
    try:
        # 创建新容器
        new_container = subprocess.check_output(["buildah", "from", "scratch"]).decode().strip()

        # 指定文件的路径和名称
        source_path = f"/check/{tar_name}"
        # source_path = f"/var/lib/kubelet/checkpoints/checkpoint-counters_default-counter-2023-10-16T23:26:59+08:00.tar"

        # 将文件添加到容器
        subprocess.run(["buildah", "add", new_container, source_path, "/"])

        # 配置注释
        # container_name_annotation = f"io.kubernetes.cri-o.annotations.checkpoint.name=counter"
        container_name_annotation = f"io.kubernetes.cri-o.annotations.checkpoint.name={container_name}"
        subprocess.run(["buildah", "config", "--annotation", container_name_annotation, new_container])

        # 提交镜像
        subprocess.run(["buildah", "commit", new_container, f"{pod_name}:dump"])

        # 删除容器
        subprocess.run(["buildah", "rm", new_container])

        return "successful"
    except Exception as e:
        return f"Buildah操作出错:{str(e)}"


if __name__ == "__main__":
    server = xmlrpc.server.SimpleXMLRPCServer(("0.0.0.0", 8000))
    server.register_function(tar_transport)
    server.register_function(receive_file)
    server.register_function(buildah_operations)
    print("等待RPC请求...")
    server.serve_forever()

 4. 结尾

本文实现了pod的一键迁移功能,将繁琐的迁移细节给封装了起来。之后可以在此基础上进行各种迁移相关的实验。

但从系统的角度来说这样还是不太够的,比如集群中有那么多node节点,需要一个一个的去部署吗,所以接下来可以将程序容器化,再使用DaemonSet去部署。

 (补充):

上篇文章忘了提醒Linux环境的cgroup得使用第一版,使用第二版将无法恢复pod。这里给出解决方法

查看cgroup版本:

# 查看Linux cgroup版本:
stat -fc %T /sys/fs/cgroup/
# 输出 cgroup2fs,为cgroup v2。
# 输出 tmpfs,为cgroup v1。

 cgroup v2 -> v1:

sudo cp /etc/default/grub /etc/default/grub.bak
sudo vi /etc/default/grub
# 修改内容
GRUB_CMDLINE_LINUX="systemd.unified_cgroup_hierarchy=0"

sudo update-grub
sudo reboot
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐