Kubernetes的Pod有状态迁移过程自动化
我们基于kubernetes实现了pod的有状态迁移,但同时也发现整个迁移过程会比较繁琐,因此本文将对这一过程编写自动化脚本,实现一键迁移的功能,即输入pod的名称和目标节点,系统自己完成整个迁移过程。再一次说明,本文的底层技术和方案均来自k8s社区,本文的目的在于将这一过程封装起来,为今后在此基础上扩展其它实验提供便利。
0. 前言
上一篇中我们基于kubernetes实现了pod的有状态迁移,但同时也发现整个迁移过程会比较繁琐,因此本文将对这一过程编写自动化脚本,实现一键迁移的功能,即输入pod的名称和目标节点,系统自己完成整个迁移过程。
再一次说明,本文的底层技术和方案均来自k8s社区,本文的目的在于将这一过程封装起来,为今后在此基础上扩展其它实验提供便利。
1. 迁移流程代码解释
全部流程如下:
整个流程一共包含两个python文件,即master.py与node.py,分别运行在master节点和其它所有node节点上,master.py负责流程控制,node.py在工作节点上负责具体的迁移工作实施。
下面将依据上图依次展示代码实现。
0. 程序所需的依赖包:
kubernetes==27.2.0
PyYAML==6.0.1
Requests==2.31.0(pyhon自带的未列出,见完整代码)
1. 查询pod信息(master.py)
在主函数中输入pod名称和目标节点名称,查询pod信息并展示,方法为get_pod_info,如下:
# 查找待迁移pod的基本信息,输入pod名称,输出pod所在的节点IP、pod命名空间、容器名称
def get_pod_info(pod_name):
try:
# 使用默认的Kubeconfig配置
config.load_kube_config()
# 创建一个Kubernetes API客户端
api = client.CoreV1Api()
# 在pod列表里搜索指定Pod
pod_list = api.list_pod_for_all_namespaces(watch=False)
target_pod = None
for pod in pod_list.items:
if pod.metadata.name == pod_name:
target_pod = pod
break
if target_pod:
# 获取Pod所在节点的IP
node_name = target_pod.spec.node_name
pod_namespace = target_pod.metadata.namespace
# 获取Pod内所有容器的名称
container_names = [container.name for container in target_pod.spec.containers]
return {
"Node IP": node_name,
"Namespace": pod_namespace,
"Container Names": container_names
}
else:
return "Pod not found."
except Exception as e:
return f"Error: {str(e)}"
2. 发送pod检查点存档指令(master.py)
# 下发检查点存档指令,输入存档请求需要的所有参数,执行存档请求,返回存档文件名称
def send_checkpoint(source_node_ip, pod_namespace, pod_name, container_name):
url = f"https://{source_node_ip}:10250/checkpoint/{pod_namespace}/{pod_name}/{container_name}"
# 发送POST请求
response = requests.post(url, cert=cert, verify=False) # 设置verify为False来忽略SSL验证
# 检查响应
if response.status_code == 200:
print("dump请求成功,响应内容:")
data = json.loads(response.text)
tar_path = data['items'][0]
tar_name = os.path.basename(tar_path)
print(data)
return tar_name
else:
print(f"请求失败,状态码: {response.status_code}")
检查点存档请求成功后将记录存档文件的文件名,格式为.tar,并作为结果返回。
3. 对待迁移pod实行检查点存档的实际操作,这一步由kubelet、criu工具来完成,不需要自己写代码。
4. 发送存档传输指令(master.py)
# 下发存档传输指令,输入源节点IP、待传输文档名称、目标节点IP,返回传输状态
def send_tar_transport(source_node_ip, tar_name, target_node_ip):
'''
创建一个 XML-RPC 服务器代理对象。该代理对象用于与指定的地址和端口上运行的 XML-RPC
服务器进行通信。使用该代理对象来调用服务器上公开的 XML-RPC 方法。
'''
# 由于存档文件是保存在源节点上,所以需要源节点传输存档文件到目标节点
# 这里创建源节点(待迁移pod所在节点)的代理对象,调用源节点上的tar_transport方法
proxy = xmlrpc.client.ServerProxy(f"http://{source_node_ip}:8000/")
# 发送存档传输命令
result = proxy.tar_transport(tar_name, target_node_ip)
print(f"存档传输执行状态: {result} 存档文件已传输至目标节点")
return result
5. 运行RPC服务(node.py)
根据第4步的思路,节点之间存在p2p的文件传输,每个node节点既是服务端又是客户端,所以每个节点都需要开启RPC服务,并一直保持服务:
if __name__ == "__main__":
server = xmlrpc.server.SimpleXMLRPCServer(("0.0.0.0", 8000))
print("等待RPC请求...")
server.serve_forever()
同时,为了能让master节点调用tar_transport方法,需要在服务中注册该方法:
if __name__ == "__main__":
server = xmlrpc.server.SimpleXMLRPCServer(("0.0.0.0", 8000))
server.register_function(tar_transport)
print("等待RPC请求...")
server.serve_forever()
6. 在node.py中实现文件传输方法(node.py)
# 输入存档文件名称、目标节点名称,发送存档文件到目标节点
def tar_transport(filename, target_node_ip):
local_file_path = f"/var/lib/kubelet/checkpoints/{filename}"
try:
# 读取.tar文件数据
with open(local_file_path, 'rb') as file:
file_data = xmlrpc.client.Binary(file.read())
# 连接服务器端
proxy = xmlrpc.client.ServerProxy(f"http://{target_node_ip}:8000/")
# 发送存档文件到目标节点
result = proxy.receive_file(filename, file_data)
if result:
return "successful"
else:
return "failing,文件在目标节点上保存失败"
except Exception as e:
return f"文件传输失败: {str(e)}"
同时,目标节点对存档文件进行保存:
def receive_file(filename, file_data):
emote_file_path = f"/check/{filename}"
# 保存接收到的文件数据
with open(emote_file_path, 'wb') as file:
file.write(file_data.data)
return True
7. 发送镜像创建指令(master.py)
# 下发镜像创建指令,输入目标节点IP、pod名称、pod内容器名称,存档文件名称,返回镜像创建结果
def send_buildah_operations(target_node_ip, pod_name, container_name, tar_name):
proxy = xmlrpc.client.ServerProxy(f"http://{target_node_ip}:8000/")
# 发送镜像创建命令
result = proxy.buildah_operations(pod_name, container_name, tar_name)
print(f"镜像建立执行状态: {result} 已创建待恢复镜像")
return result
代码思路与第4步一致,不再过多解释。
8. 在node.py中实现镜像创建方法(node.py)
# 输入pod名称、容器名称、存档文件名称,创建待恢复pod的镜像,返回镜像名称
def buildah_operations(pod_name, container_name, tar_name):
try:
# 创建新容器
new_container = subprocess.check_output(["buildah", "from", "scratch"]).decode().strip()
# 指定文件的路径和名称
source_path = f"/check/{tar_name}"
# 将文件添加到容器
subprocess.run(["buildah", "add", new_container, source_path, "/"])
# 配置注释
container_name_annotation = f"io.kubernetes.cri-o.annotations.checkpoint.name={container_name}"
subprocess.run(["buildah", "config", "--annotation", container_name_annotation, new_container])
# 提交镜像
subprocess.run(["buildah", "commit", new_container, f"{pod_name}:dump"]
# 删除容器
subprocess.run(["buildah", "rm", new_container])
return "successful"
except Exception as e:
return f"Buildah操作出错:{str(e)}"
本质上是使用buildah工具创建镜像并将这一过程打包。
更新node.py的方法注册信息:
if __name__ == "__main__":
server = xmlrpc.server.SimpleXMLRPCServer(("0.0.0.0", 8000))
server.register_function(tar_transport)
server.register_function(receive_file)
server.register_function(buildah_operations)
print("等待RPC请求...")
server.serve_forever()
9. 接下来做一些辅助工作后恢复pod,具体为获取pod的配置信息并修改,修改内容包括所使用的镜像、运行的节点等(master.py)
# 读取pod配置信息
def get_pod_config(namespace, pod_name):
try:
# 执行 kubectl 命令并捕获输出
command = f"kubectl get -n {namespace} pod {pod_name} -oyaml"
result = subprocess.check_output(command, shell=True, text=True)
# 解析 YAML
pod_data = yaml.safe_load(result)
# 提取 last-applied-configuration 字段
last_applied_config = pod_data.get('metadata', {}).get('annotations', {}).get(
'kubectl.kubernetes.io/last-applied-configuration')
if last_applied_config:
# 将 last-applied-configuration 字段解析为 YAML
pod_config = yaml.safe_load(last_applied_config)
return pod_config
else:
return None
except Exception as e:
return str(e)
# 修改pod配置信息,镜像改为待恢复镜像
def modify_pod_yaml(pod_data, new_image, node_name):
if pod_data:
# 修改镜像
containers = pod_data['spec']['containers']
for container in containers:
container['image'] = new_image
# 增加节点选择字段
pod_data['spec']['nodeName'] = node_name
return pod_data
2. 代码运行效果验证
1. 运行待迁移pod
命令为kubectl apply -f counters.yaml,counters.yaml文件内容如下:
apiVersion: v1
kind: Pod
metadata:
name: counters
spec:
containers:
- name: counter
image: quay.io/adrianreber/counter:blog
查看pod所在节点和使用的镜像:
pod所在节点为mu36,使用镜像与counters.yaml文件的配置一致。
2. 迁移pod
将pod迁移至mu26,迁移过程的输出信息如下:
此时查看pod所在节点和使用的镜像:
可以看到pod已经迁移到mu26,并且使用的是存档文件所建立的镜像。
3. 完整代码
1. master.py
import os
import json
import yaml
import xmlrpc.client
import requests
import subprocess
from kubernetes import client, config
cert = ("/etc/kubernetes/pki/apiserver-kubelet-client.crt", "/etc/kubernetes/pki/apiserver-kubelet-client.key")
tar_name = ""
# 查找待迁移pod的基本信息,输入pod名称,输出pod所在的节点IP、pod命名空间、容器名称
def get_pod_info(pod_name):
try:
# 使用默认的Kubeconfig配置
config.load_kube_config()
# 创建一个Kubernetes API客户端
api = client.CoreV1Api()
# 搜索指定Pod
pod_list = api.list_pod_for_all_namespaces(watch=False)
target_pod = None
for pod in pod_list.items:
if pod.metadata.name == pod_name:
target_pod = pod
break
if target_pod:
# 获取Pod所在节点的IP
node_name = target_pod.spec.node_name
pod_namespace = target_pod.metadata.namespace
# 获取Pod内所有容器的名称
container_names = [container.name for container in target_pod.spec.containers]
return {
"Node IP": node_name,
"Namespace": pod_namespace,
"Container Names": container_names
}
else:
return "Pod not found."
except Exception as e:
return f"Error: {str(e)}"
# 下发检查点存档指令,输入存档请求需要的所有参数,执行存档请求,返回存档文件名称
def send_checkpoint(source_node_ip, pod_namespace, pod_name, container_name):
url = f"https://{source_node_ip}:10250/checkpoint/{pod_namespace}/{pod_name}/{container_name}"
# 发送POST请求
response = requests.post(url, cert=cert, verify=False) # 设置verify为False来忽略SSL验证
# 检查响应
if response.status_code == 200:
print("dump请求成功,响应内容:")
data = json.loads(response.text)
tar_path = data['items'][0]
tar_name = os.path.basename(tar_path)
print(data)
return tar_name
else:
print(f"请求失败,状态码: {response.status_code}")
# 下发存档传输指令,输入源节点IP、待传输文档名称、目标节点IP,返回传输状态
def send_tar_transport(source_node_ip, tar_name, target_node_ip):
proxy = xmlrpc.client.ServerProxy(f"http://{source_node_ip}:8000/")
# 发送存档传输命令
result = proxy.tar_transport(tar_name, target_node_ip)
print(f"存档传输执行状态: {result} 存档文件已传输至目标节点")
return result
# 下发镜像创建指令,输入目标节点IP、pod名称、pod内容器名称,存档文件名称,返回镜像创建结果
def send_buildah_operations(target_node_ip, pod_name, container_name, tar_name):
proxy = xmlrpc.client.ServerProxy(f"http://{target_node_ip}:8000/")
# 发送镜像创建命令
result = proxy.buildah_operations(pod_name, container_name, tar_name)
print(f"镜像建立执行状态: {result} 已创建待恢复镜像")
return result
# Pod有状态迁移中间过程总控制
def pod_transport(source_node_ip, target_node_ip, pod_namespace, pod_name, container_name):
tar_name = send_checkpoint(source_node_ip, pod_namespace, pod_name, container_name)
if tar_name:
result_tar_transport = send_tar_transport(source_node_ip, tar_name, target_node_ip)
if result_tar_transport == "successful":
result_buildah_operations = send_buildah_operations(target_node_ip, pod_name, container_name, tar_name)
if result_buildah_operations == "successful":
return result_buildah_operations
else:
print(result_buildah_operations)
else:
print(result_tar_transport)
else:
print("pod存档失败")
# 读取pod配置信息
def get_pod_config(namespace, pod_name):
try:
# 执行 kubectl 命令并捕获输出
command = f"kubectl get -n {namespace} pod {pod_name} -oyaml"
result = subprocess.check_output(command, shell=True, text=True)
# 解析 YAML
pod_data = yaml.safe_load(result)
# 提取 last-applied-configuration 字段
last_applied_config = pod_data.get('metadata', {}).get('annotations', {}).get(
'kubectl.kubernetes.io/last-applied-configuration')
if last_applied_config:
# 将 last-applied-configuration 字段解析为 YAML
pod_config = yaml.safe_load(last_applied_config)
return pod_config
else:
return None
except Exception as e:
return str(e)
# 修改pod配置信息,镜像改为待恢复镜像
def modify_pod_yaml(pod_data, new_image, node_name):
if pod_data:
# 修改镜像
containers = pod_data['spec']['containers']
for container in containers:
container['image'] = new_image
# 增加节点选择字段
pod_data['spec']['nodeName'] = node_name
return pod_data
# pod恢复节点总控
def restore_pod(namespace, pod_name, new_image, node_name):
pod_config = get_pod_config(namespace, pod_name)
modify_pod_yaml(pod_config, new_image, node_name)
if pod_config:
# 保存配置到文件
with open(f"/check/{pod_name}_restore.yaml", 'w') as f:
yaml.dump(pod_config, f, default_flow_style=False)
try:
subprocess.run(["kubectl", "delete", "pod", "-n", namespace, pod_name])
subprocess.run(["kubectl", "apply", "-f", f"/check/{pod_name}_restore.yaml"])
print(f"Pod已迁移至{node_name}")
print(f"Pod配置信息已修改并保存到 /check/{pod_name}_restore.yaml 文件。")
except Exception as e:
return f"Pod恢复操作出错:{str(e)}"
else:
print("未找到指定的Pod或配置信息。")
if __name__ == "__main__":
print("-----------------------------------------")
pod_name = input("请输入待迁移pod名称:")
target_node_ip = input("请输入目标节点名称: ")
pod_info = get_pod_info(pod_name)
print("-----------------------------------------")
print(f"pod所在节点:{pod_info['Node IP']}")
print(f"pod所在命名空间:{pod_info['Namespace']}")
print(f"pod内部容器:{pod_info['Container Names'][0]}")
print("-----------------------------------------")
print("开始执行迁移操作...")
result_image = pod_transport(pod_info["Node IP"], target_node_ip, pod_info["Namespace"], pod_name, pod_info["Container Names"][0])
if result_image:
restore_pod(pod_info["Namespace"], pod_name, f"localhost/{pod_name}:dump", target_node_ip)
2. node.py
import subprocess
import xmlrpc.server
import xmlrpc.client
def receive_file(filename, file_data):
# save_path = '/check/' # 指定保存文件的文件夹路径
# file_path = os.path.join(save_path, filename)
emote_file_path = f"/check/{filename}"
# 保存接收到的文件数据
with open(emote_file_path, 'wb') as file:
file.write(file_data.data)
return True
# 输入存档文件名称、目标节点名称,发送存档文件到目标节点
def tar_transport(filename, target_node_ip):
local_file_path = f"/var/lib/kubelet/checkpoints/{filename}"
try:
# 读取.tar文件数据
with open(local_file_path, 'rb') as file:
file_data = xmlrpc.client.Binary(file.read())
# 连接服务器端
proxy = xmlrpc.client.ServerProxy(f"http://{target_node_ip}:8000/")
# 发送存档文件到目标节点
result = proxy.receive_file(filename, file_data)
if result:
return "successful"
else:
return "failing,文件在目标节点上保存失败"
except Exception as e:
return f"文件传输失败: {str(e)}"
# 输入pod名称、容器名称、存档文件名称,创建待恢复pod的镜像,返回镜像名称
def buildah_operations(pod_name, container_name, tar_name):
try:
# 创建新容器
new_container = subprocess.check_output(["buildah", "from", "scratch"]).decode().strip()
# 指定文件的路径和名称
source_path = f"/check/{tar_name}"
# source_path = f"/var/lib/kubelet/checkpoints/checkpoint-counters_default-counter-2023-10-16T23:26:59+08:00.tar"
# 将文件添加到容器
subprocess.run(["buildah", "add", new_container, source_path, "/"])
# 配置注释
# container_name_annotation = f"io.kubernetes.cri-o.annotations.checkpoint.name=counter"
container_name_annotation = f"io.kubernetes.cri-o.annotations.checkpoint.name={container_name}"
subprocess.run(["buildah", "config", "--annotation", container_name_annotation, new_container])
# 提交镜像
subprocess.run(["buildah", "commit", new_container, f"{pod_name}:dump"])
# 删除容器
subprocess.run(["buildah", "rm", new_container])
return "successful"
except Exception as e:
return f"Buildah操作出错:{str(e)}"
if __name__ == "__main__":
server = xmlrpc.server.SimpleXMLRPCServer(("0.0.0.0", 8000))
server.register_function(tar_transport)
server.register_function(receive_file)
server.register_function(buildah_operations)
print("等待RPC请求...")
server.serve_forever()
4. 结尾
本文实现了pod的一键迁移功能,将繁琐的迁移细节给封装了起来。之后可以在此基础上进行各种迁移相关的实验。
但从系统的角度来说这样还是不太够的,比如集群中有那么多node节点,需要一个一个的去部署吗,所以接下来可以将程序容器化,再使用DaemonSet去部署。
(补充):
上篇文章忘了提醒Linux环境的cgroup得使用第一版,使用第二版将无法恢复pod。这里给出解决方法
查看cgroup版本:
# 查看Linux cgroup版本:
stat -fc %T /sys/fs/cgroup/
# 输出 cgroup2fs,为cgroup v2。
# 输出 tmpfs,为cgroup v1。
cgroup v2 -> v1:
sudo cp /etc/default/grub /etc/default/grub.bak
sudo vi /etc/default/grub
# 修改内容
GRUB_CMDLINE_LINUX="systemd.unified_cgroup_hierarchy=0"
sudo update-grub
sudo reboot
更多推荐
所有评论(0)