我在 Kubernetes 上的 Spark 之旅......在 Python 中 (3/3)
我们需要将 Kubernetes 作为 Python 客户端应用程序的一部分进行操作。因此,我们需要与 Kubernetes REST API 进行交互。幸运的是,我们不需要自己实现 API 调用和管理 HTTP 请求/响应:我们可以依赖Kubernetes Python 客户端以及其他官方支持的其他语言的 Kubernetes 客户端库,例如 Go、Java、.NET、 JavaScript 和
我们需要将 Kubernetes 作为 Python 客户端应用程序的一部分进行操作。因此,我们需要与 Kubernetes REST API 进行交互。幸运的是,我们不需要自己实现 API 调用和管理 HTTP 请求/响应:我们可以依赖Kubernetes Python 客户端以及其他官方支持的其他语言的 Kubernetes 客户端库,例如 Go、Java、.NET、 JavaScript 和 Haskell(还有很多社区维护的用于多种语言的客户端库)。
Kubernetes Python 客户端
Kubernetes Python 客户端与 Python 2.7 和 3.4+ 兼容。有关 Kubernetes 支持的版本,请参阅兼容性矩阵。
使用客户端库时,首先要加载认证和集群信息。
加载认证和集群信息
首先,您需要设置所需的服务帐户和角色。
k8s/python-client-sa-rbac.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: python-client-sa
namespace: spark-jobs
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: spark-jobs
name: python-client-role
rules:
- apiGroups: [""]
resources: ["configmaps", "pods", "pods/log", "pods/status", "services"]
verbs: ["*"]
- apiGroups: ["networking.k8s.io"]
resources: ["ingresses", "ingresses/status"]
verbs: ["*"]
- apiGroups: ["sparkoperator.k8s.io"]
resources: [sparkapplications]
verbs: ["*"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: python-client-role-binding
namespace: spark-jobs
subjects:
- kind: ServiceAccount
name: python-client-sa
namespace: spark-jobs
roleRef:
kind: Role
name: python-client-role
apiGroup: rbac.authorization.k8s.io
--------
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: node-reader
rules:
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get", "list"]
--------
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: python-client-cluster-role-binding
subjects:
- kind: ServiceAccount
name: python-client-sa
namespace: spark-jobs
roleRef:
kind: ClusterRole
name: node-reader
apiGroup: rbac.authorization.k8s.io
进入全屏模式 退出全屏模式
kubectl create -f k8s/python-client-sa-rbac.yaml
进入全屏模式 退出全屏模式
此命令创建一个名为python-client-sa
的新服务帐户,这是一个在spark-jobs
命名空间中具有所需权限的新角色,然后将新角色绑定到新创建的服务帐户。
警告:python-client-sa
是服务帐户,它将为我们的应用程序中的 Kubernetes Python 客户端提供身份。不要将此服务帐户与驱动程序 pod 的driver-sa
服务帐户混淆。
简单的方法
在这种方法中,我们可以使用辅助实用程序从kubeconfig
文件中加载身份验证和集群信息,并将它们存储在kubernetes.client.configuration
中。
from kubernetes import config, client
config.load_kube_config("path/to/kubeconfig_file")
v1 = client.CoreV1Api()
print("Listing pods with their IPs:")
ret = v1.list_namespaced_pod(namespace="spark-jobs")
for i in ret.items:
print("%s\t%s\t%s" % (i.status.pod_ip, i.metadata.namespace, i.metadata.name))
进入全屏模式 退出全屏模式
但是我们不希望依赖默认的kubeconfig
文件,由环境变量KUBECONFIG
表示,或者,如果失败,则在~/.kube/config
中。这个kubeconfig
文件是你的,作为kubectl
命令的用户。具体来说,使用这个kubeconfig
文件,您有权在 Kubernetes 集群中以及所有命名空间中执行几乎所有操作。相反,我们将在脚本kubeconfig-gen.sh
的帮助下,专门为上面创建的服务帐户生成一个:
#!/usr/bin/env bash
# set -eux
# Reads the API server name from the default `kubeconfig` file.
# Here we suppose that the kubectl command-line tool is already configured to communicate with our cluster.
APISERVER=$(kubectl config view --minify -o jsonpath='{.clusters[0].cluster.server}')
SERVICE_ACCOUNT_NAME=${1:-python-client-sa}
NAMESPACE=${2:-spark-jobs}
SECRET_NAME=$(kubectl get serviceaccount ${SERVICE_ACCOUNT_NAME} -n ${NAMESPACE} -o jsonpath='{.secrets[0].name}')
TOKEN=$(kubectl get secret ${SECRET_NAME} -n ${NAMESPACE} -o jsonpath='{.data.token}' | base64 --decode)
CACERT=$(kubectl get secret ${SECRET_NAME} -n ${NAMESPACE} -o jsonpath="{['data']['ca\.crt']}")
cat > kubeconfig-sa << EOF
apiVersion: v1
kind: Config
clusters:
- cluster:
certificate-authority-data: ${CACERT}
server: ${APISERVER}
name: default-cluster
contexts:
- context:
cluster: default-cluster
namespace: ${NAMESPACE}
user: ${SERVICE_ACCOUNT_NAME}
name: default-context
current-context: default-context
users:
- user:
token: ${TOKEN}
name: ${SERVICE_ACCOUNT_NAME}
EOF
进入全屏模式 退出全屏模式
这样创建的kubeconfig
文件为python-client-sa
服务帐户配置了对集群的访问权限,仅具有我们的客户端应用程序和单个命名空间spark-jobs
所需的权限(“最小权限原则”)。
艰难之路
获取凭证
在这里,我们将以尽可能编程的方式配置 Python 客户端。
首先,我们需要获取访问 Kubernetes 集群的凭据。我们将这些存储在 Python 环境变量中。
export APISERVER=$(kubectl config view --minify -o jsonpath='{.clusters[0].cluster.server}')
SECRET_NAME=$(kubectl get serviceaccount python-client-sa -o jsonpath='{.secrets[0].name}')
export TOKEN=$(kubectl get secret ${SECRET_NAME} -o jsonpath='{.data.token}' | base64 --decode)
export CACERT=$(kubectl get secret ${SECRET_NAME} -o jsonpath="{['data']['ca\.crt']}")
进入全屏模式 退出全屏模式
请注意,第一次导入os
模块时会捕获环境变量,通常是在 IDE/Python 启动期间。在此时间之后对环境所做的更改不会反映在os.environ
中(直接修改 os.environ 所做的更改除外)。
Python示例使用
import base64
import os
from tempfile import NamedTemporaryFile
from kubernetes import client
api_server = os.environ["APISERVER"]
cacert = os.environ["CACERT"]
token = os.environ["TOKEN"]
# Set the configuration
configuration = client.Configuration()
with NamedTemporaryFile(delete=False) as cert:
cert.write(base64.b64decode(cacert))
configuration.ssl_ca_cert = cert.name
configuration.host = api_server
configuration.verify_ssl = True
configuration.debug = False
configuration.api_key = {"authorization": "Bearer " + token}
client.Configuration.set_default(configuration)
v1 = client.CoreV1Api()
print("Listing pods with their IPs:")
ret = v1.list_namespaced_pod(namespace="spark-jobs")
for i in ret.items:
print("%s\t%s\t%s" % (i.status.pod_ip, i.metadata.namespace, i.metadata.name))
进入全屏模式 退出全屏模式
入门
Kubernetes对象管理
使用 Kubernetes Python 客户端,您可以以编程方式创建和管理 Kubernetes 对象。
在以下示例中(在GitHub 存储库中提供),我们使用AppsV1Api
创建、更新然后删除Deployment
:
"""
Creates, updates, and deletes a deployment using AppsV1Api.
"""
from kubernetes import client, config
DEPLOYMENT_NAME = "nginx-deployment"
def create_deployment_object():
# Configureate Pod template container
container = client.V1Container(
name="nginx",
image="nginx:1.15.4",
ports=[client.V1ContainerPort(container_port=80)],
resources=client.V1ResourceRequirements(
requests={"cpu": "100m", "memory": "200Mi"},
limits={"cpu": "500m", "memory": "500Mi"}
)
)
# Create and configurate a spec section
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": "nginx"}),
spec=client.V1PodSpec(containers=[container]))
# Create the specification of deployment
spec = client.V1DeploymentSpec(
replicas=3,
template=template,
selector={'matchLabels': {'app': 'nginx'}})
# Instantiate the deployment object
deployment = client.V1Deployment(
api_version="apps/v1",
kind="Deployment",
metadata=client.V1ObjectMeta(name=DEPLOYMENT_NAME),
spec=spec)
return deployment
def create_deployment(api_instance, deployment):
# Create deployement
api_response = api_instance.create_namespaced_deployment(
body=deployment,
namespace="default")
print("Deployment created. status='%s'" % str(api_response.status))
def update_deployment(api_instance, deployment):
# Update container image
deployment.spec.template.spec.containers[0].image = "nginx:1.16.0"
# Update the deployment
api_response = api_instance.patch_namespaced_deployment(
name=DEPLOYMENT_NAME,
namespace="default",
body=deployment)
print("Deployment updated. status='%s'" % str(api_response.status))
def delete_deployment(api_instance):
# Delete deployment
api_response = api_instance.delete_namespaced_deployment(
name=DEPLOYMENT_NAME,
namespace="default",
body=client.V1DeleteOptions(
propagation_policy='Foreground',
grace_period_seconds=5))
print("Deployment deleted. status='%s'" % str(api_response.status))
def main():
# Configs can be set in Configuration class directly or using helper
# utility. If no argument provided, the config will be loaded from
# default location.
config.load_kube_config("path/to/kubeconfig_file")
apps_v1 = client.AppsV1Api()
deployment = create_deployment_object()
create_deployment(apps_v1, deployment)
update_deployment(apps_v1, deployment)
delete_deployment(apps_v1)
if __name__ == '__main__':
main()
进入全屏模式 退出全屏模式
这很好,但这涉及到掌握客户端的 API,最重要的是我们必须强制配置我们的对象:我们在代表 Kubernetes 对象的 Python 对象上指定所需的操作(创建、替换等)。在这里,我们更喜欢以 declarative 方式管理我们的对象,并像我们通常使用kubectl
命令一样对对象配置文件(沿 Python 代码源存储在本地)进行操作。确实,Python代码应该只是一个简单的执行后端来触发Kubernetes的操作,而_business_逻辑,可以说应该集中在manifest文件中。
我们在上面创建的部署与nginx-deployment.yaml
中的相同:
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx-deployment
labels:
app: nginx
spec:
replicas: 3
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:1.15.4
ports:
- containerPort: 80
进入全屏模式 退出全屏模式
我们可以直接加载清单如下:
from os import path
import yaml
from kubernetes import client, config
config.load_kube_config("path/to/kubeconfig_file")
with open(path.join(path.dirname(__file__), "nginx-deployment.yaml")) as f:
dep = yaml.safe_load(f)
k8s_apps_v1 = client.AppsV1Api()
resp = k8s_apps_v1.create_namespaced_deployment(
body=dep, namespace="default")
print("Deployment created. status='%s'" % resp.metadata.name)
进入全屏模式 退出全屏模式
这相当于 Python 中的kubectl create -f nginx-deployment.yaml
。
如您所见,您必须调用create_namespaced_deployment
来创建 Deployment。同样的,你可以调用create_namespaced_pod
来创建一个 Pod,以此类推。这是因为 Python 客户端是按照 Kubernetes API 的OpenAPI
规范自动生成的。
不得不调用特定方法来创建特定类型的对象是一种耻辱,即使对象本身的类型已经在我们通过此方法加载的清单中指定。幸运的是,Kubernetes Python 客户端提供了一种实用方法,可以作为任何类型对象的输入中心。
import os
import yaml
from kubernetes import client, config, utils
config.load_kube_config("path/to/kubeconfig_file")
with open(os.path.join(os.path.dirname(__file__), "nginx-deployment.yaml")) as f:
dep = yaml.safe_load(f)
k8s_client = client.ApiClient()
resp = utils.create_from_dict(k8s_client, dep)
print("Deployment created. status='%s'" % resp[0].metadata.name)
进入全屏模式 退出全屏模式
utils.create_from_dict
是这里的魔术方法。它只需要一个Dict
持有有效的 kubernetes 对象。找到它是一件幸事,因为它很好地隐藏在客户端中,根本没有记录在案。
因此,要使用 spark-submit 启动 Spark 作业,您只需使用单个 YAML 文件调用上面的代码片段,该文件将所有需要的资源分组(在 YAML 中由 --- 分隔)。
但是 Spark Operator 呢?utils.create_from_dict
不支持自定义资源,这意味着不属于核心 Kubernetes API 的对象类型,即来自 Spark Operator 的SparkApplication
。要使用 Spark Operator 运行 Spark 作业,您别无选择,只能调用CustomObjectsApi
的create_namespaced_custom_object
函数:
import os
import yaml
from kubernetes import client, config, utils
config.load_kube_config("path/to/kubeconfig_file")
with open(os.path.join(os.path.dirname(__file__), "k8s/spark-operator/pyspark-pi.yaml")) as f:
dep = yaml.safe_load(f)
custom_object_api = client.CustomObjectsApi()
custom_object_api.create_namespaced_custom_object(
group="sparkoperator.k8s.io",
version="v1beta2",
namespace="spark-jobs",
plural="sparkapplications",
body=dep,
)
print("SparkApplication created")
进入全屏模式 退出全屏模式
关于 spark-submit,还有一个更直接的方法utils.create_from_yaml
,它从 YAML 文件中读取 Kubernetes 对象。但我们不能使用它,因为我们需要在将 YAML 文件提交给 Kubernetes Python 客户端之前“参数化”它们。
模板
如您所知,当您将清单文件应用到 Kubernetes(Kubernetes 可以理解的 YAML 格式的资源描述)时,您必须指定资源名称,该名称对于该类型的资源(并且在同一命名空间内)必须是唯一的,否则 Kubernetes 将抱怨资源已经存在。
例如,同一命名空间中只能有一个名为myapp-1234
的 Pod,但可以有一个 Pod 和一个 Deployment,每个命名为myapp-1234
。
由于我们希望同时运行多个 Spark 作业,并且这些 Spark 作业除了一些运行时参数之外大部分是相同的,因此我们需要参数化或模板化我们的 Kubernetes YAML 文件。
通常情况下,您不会这样做,至少 Kubernetes 的理念不是这样:Kubernetes 文件应该是无模板的,并且只应该通过Kustomize进行修补。Helm也有自己的模板系统。
您不能将此类工具与 Kubernetes Python 客户端一起使用。相反,我们将用相应的值替换对或
${VAR}
形式的变量的引用,就像envsubst一样,但是以编程方式。
让我们回到 Spark(原生)。我们之前看到了 YAML 文件,它定义了一个驱动程序 pod 来运行 Pi 示例程序。如您所见,我们有占位符来指定namespace
、priorityClassName
、
serviceAccountName
、nodeAffinity
和NAME_SUFFIX
使 pod 的名称独一无二。
现在,在 Python 代码中,我们在运行时将这些变量替换为创建 pod 之前所需的值:
import binascii
import os
from os import listdir
from pprint import pprint
import yaml
from kubernetes import client, config, utils
def create_k8s_object(yaml_file=None, env_subst=None):
with open(yaml_file) as f:
str = f.read()
if env_subst:
for env, value in env_subst.items():
str = str.replace(env, value)
return yaml.safe_load(str)
def main():
# Configs can be set in Configuration class directly or using helper utility
config.load_kube_config("path/to/kubeconfig_file")
name_suffix = "-" + binascii.b2a_hex(os.urandom(8))
priority_class_name = "routine"
env_subst = {"${NAMESPACE}": "spark-jobs",
"${SERVICE_ACCOUNT_NAME}": "driver-sa",
"${DRIVER_NODE_AFFINITIES}": "driver",
"${EXECUTOR_NODE_AFFINITIES}": "compute",
"${NAME_SUFFIX}": name_suffix,
"${PRIORITY_CLASS_NAME}": priority_class_name}
k8s_client = client.ApiClient()
verbose = True
# Create driver pod
k8s_dir = os.path.join(os.path.dirname(__file__), "k8s/spark-native")
k8s_object_dict = create_k8s_object(os.path.join(k8s_dir, "pyspark-pi-driver-pod.yaml"), env_subst)
pprint(k8s_object_dict)
k8s_objects = utils.create_from_dict(k8s_client, k8s_object_dict, verbose=verbose)
# TODO: create the other resources
print("Submitted %s" % (k8s_objects[0].metadata.labels["app-name"]))
if __name__ == "__main__":
main()
进入全屏模式 退出全屏模式
将零件放在一起
我们现在有了通用机制,我们可以创建所需的所有资源。
请记住,驱动程序 pod 使用ConfigMap
来定义环境变量并在 Spark 容器中挂载配置文件(包括执行程序 pod 的模板)。我们还有一个Service
允许执行程序与驱动程序进行通信。最后,我们有另一个Service
,由Ingress
支持,用于公开 Spark UI。
我们只是遍历定义这些资源的 YAML 文件并调用相同的方法utils.create_from_dict
:
# List all YAML files in k8s/spark-native directory, except the driver pod definition file
other_resources = listdir(k8s_dir)
other_resources.remove("pyspark-pi-driver-pod.yaml")
for f in other_resources:
k8s_object_dict = create_k8s_object(os.path.join(k8s_dir, f), env_subst)
pprint(k8s_object_dict)
utils.create_from_dict(k8s_client, k8s_object_dict, verbose=verbose)
print("Submitted %s" % (k8s_objects[0].metadata.labels["app-name"]))
进入全屏模式 退出全屏模式
现在我们已经启动了一个_full_Spark应用程序,让我们看看当我们杀死它时会发生什么😈或应用程序正常完成时会发生什么。
垃圾回收
被杀死的应用程序
Kubernetes 垃圾收集器的作用是删除某些曾经拥有所有者但不再拥有所有者的对象。目标是确保垃圾收集器在终止 Spark 应用程序时正确删除不再需要的资源。
当您要并行运行数十/数百个 Spark 应用程序时,释放 Kubernetes 集群的资源非常重要。
为此,可以将某些 Kubernetes 对象声明为其他对象的所有者。 “拥有”对象在所有者对象上被称为_dependent_。每个从属对象都有一个metadata.ownerReferences
字段,指向所属对象。删除所有者对象时,默认情况下也会自动删除所有依赖对象(cascading删除)。
由其驱动程序 pod 拥有的执行程序 pod 示例:
apiVersion: v1
kind: Pod
metadata:
labels:
spark-role: executor
ownerReferences:
- apiVersion: v1
controller: true
kind: Pod
name: pyspark-pi-routine-0245dc3d340cd533-driver
uid: 3b10fa97-c847-4fce-b3e1-71f779cffbef
...
进入全屏模式 退出全屏模式
Spark Operator 在不同级别自动设置ownerReference
的值:自定义SparkApplication
资源拥有驱动程序 pod,驱动程序 pod 拥有其执行程序。
对于以_natively_(没有 Spark Operator)提交的应用程序,最高级别的所有者对象是驱动程序 pod:执行程序 pod 自动设置ownerReference
字段,指向驱动程序 pod。但是我们必须自己管理其他ConfigMap
、Service
和Ingress
资源的所有权关系。
为此,我们必须检索新创建的驱动程序 pod 的自动生成的uid
并将其注入到依赖对象中:在 YAML 定义文件中手动设置uid
是不可能的,这只能在运行时通过代码完成(以及这就是为什么我们不能将所有资源放在一个 YAML 文件中)。
import binascii
import os
from os import listdir
from pprint import pprint
import yaml
from kubernetes import config, utils
from kubernetes.client import ApiClient
def create_k8s_object(yaml_file=None, env_subst=None):
with open(yaml_file) as f:
str = f.read()
if env_subst:
for env in env_subst:
str = str.replace(env, env_subst[env])
return yaml.safe_load(str)
def main():
# Configs can be set in Configuration class directly or using helper utility
config.load_kube_config("path/to/kubeconfig_file")
name_suffix = "-" + binascii.b2a_hex(os.urandom(8))
priority_class_name = "routine"
env_subst = {"${NAMESPACE}": "spark-jobs",
"${SERVICE_ACCOUNT_NAME}": "driver-sa",
"${DRIVER_NODE_AFFINITIES}": "driver",
"${EXECUTOR_NODE_AFFINITIES}": "compute",
"${NAME_SUFFIX}": name_suffix,
"${PRIORITY_CLASS_NAME}": priority_class_name}
k8s_client = ApiClient()
verbose = True
# Create driver pod
k8s_dir = os.path.join(os.path.dirname(__file__), "k8s/spark-native")
k8s_object_dict = create_k8s_object(os.path.join(k8s_dir, "pyspark-pi-driver-pod.yaml"), env_subst)
pprint(k8s_object_dict)
k8s_objects = utils.create_from_dict(k8s_client, k8s_object_dict, verbose=verbose)
# Prepare ownership on dependent objects
owner_refs = [{"apiVersion": "v1",
"controller": True,
"kind": "Pod",
"name": k8s_objects[0].metadata.name,
"uid": k8s_objects[0].metadata.uid}]
# List all YAML files in k8s/spark-native directory, except the driver pod definition file
other_resources = listdir(k8s_dir)
other_resources.remove("pyspark-pi-driver-pod.yaml")
for f in other_resources:
k8s_object_dict = create_k8s_object(os.path.join(k8s_dir, f), env_subst)
# Set ownership
k8s_object_dict["metadata"]["ownerReferences"] = owner_refs
pprint(k8s_object_dict)
utils.create_from_dict(k8s_client, k8s_object_dict, verbose=verbose)
print("Submitted %s" % (k8s_objects[0].metadata.labels["app-name"]))
if __name__ == "__main__":
main()
进入全屏模式 退出全屏模式
申请正常完成
当应用程序正常完成时,executor pod 会终止并被清理,但驱动程序 pod 会保留日志并在 Kubernetes API 中保持“已完成”状态,“直到最终被垃圾收集或手动清理”。
请注意,在完成状态下,驱动程序 pod 不使用任何计算或内存资源。
Spark Operator 通过名为.spec.timeToLiveSeconds
的可选字段对SparkApplications
提供 TTL 支持,如果设置该字段,则定义SparkAplication
终止后的生存时间 (TTL) 持续时间(以秒为单位)。如果当前时间大于自终止后的.spec.timeToLiveSeconds
,则SparkApplication
对象将被垃圾回收。下面的示例说明了如何使用该字段:
spec:
timeToLiveSeconds: 86400
进入全屏模式 退出全屏模式
在原生 Spark 方面,文档中没有任何内容指定驱动程序 pod 最终如何被删除。我们可以设置一个简单的 KubernetesCronJob
,它会定期运行以自动删除它们。
在写这篇文章的时候,Kubernetes 中有待处理的请求支持Pods
中的 TTL,就像在Jobs
中一样:"TTL 控制器暂时只处理 Jobs,可能会扩展以处理其他将完成执行的资源,例如 Pods 和自定义资源。”
后台级联删除
从 Python 代码中终止应用程序时,我们使用 Background 级联删除策略删除所有者对象。
在后台级联删除中,Kubernetes 立即删除所有者对象,然后垃圾收集器在后台删除依赖项。这很有用,以免延迟主执行线程。
要删除由spark-submit
启动的 Spark 作业:
from kubernetes import client
core_v1_api = client.CoreV1Api()
core_v1_api.delete_namespaced_pod("driver-pod-name", "spark-jobs", propagation_policy="Background")
进入全屏模式 退出全屏模式
要删除使用 Spark Operator 启动的 Spark 作业,我们必须删除封闭的SparkApplication
资源:
from kubernetes import client, config
custom_object_api = client.CustomObjectsApi()
custom_object_api.delete_namespaced_custom_object(
group="sparkoperator.k8s.io",
version="v1beta2",
namespace="spark-jobs",
plural="sparkapplications",
name="app_name",
propagation_policy="Background")
进入全屏模式 退出全屏模式
Pod优先级和抢占
无论是 Volcano 还是默认的kube-scheduler
,作业抢占依赖于作业优先级。对于两个作业,调度程序通过比较.spec.priorityClassName
(然后是createTime
)来决定谁的优先级更高。
优先级传播到驱动程序和执行程序 pod,无论是使用本机 spark-submit 还是使用 Spark Operator,并且与节点关联性无关。
如何知道哪些pod被抢占了
您可以检索有关集群中正在发生的事情的高级信息。要列出命名空间spark-jobs
中的所有事件,您可以使用:
# List Events sorted by timestamp
kubectl get events --sort-by=.metadata.creationTimestamp --namespace=spark-jobs
进入全屏模式 退出全屏模式
最后看到的类型原因对象消息
69s Normal Scheduled podgroup/podgroup-6083b4f9-f240-4a0e-95f2-06882aec2942 pod group 准备就绪
93s Normal Scheduled pod/pyspark-pi-driver-routine-bf20cae50b6a8253 成功分配 spark-jobs/pyspark-pi-driver-routine-bf20cae50b6a8253 到 gke-yippee-spark-k8s-clus-default-pool-0b72dd1d-jpfp
92s 正常启动 pod/pyspark-pi-driver-routine-bf20cae50b6a8253 启动容器 pyspark-pi
92s Normal Pulled pod/pyspark-pi-driver-routine-bf20cae50b6a8253 容器映像“eu.gcr.io/yippee-spark-k8s/spark-py:3.0.1”已经存在于机器上
92s 正常 创建 pod/pyspark-pi-driver-routine-bf20cae50b6a8253 创建容器 pyspark-pi
82s 正常调度 pod/pythonpi-34b3597593d246a1-exec-2 成功分配 spark-jobs/pythonpi-34b3597593d246a1-exec-2 到 gke-yippee-spark-k8s-clus-default-pool-0b72dd1d-pvdl
82s Normal Scheduled pod/pythonpi-34b3597593d246a1-exec-1 成功分配 spark-jobs/pythonpi-34b3597593d246a1-exec-1 到 gke-yippee-spark-k8s-clus-default-pool-0b72dd1d-wxck
82s 正常 创建 pod/pythonpi-34b3597593d246a1-exec-1 创建容器 spark-kubernetes-executor
82s 正常启动 pod/pythonpi-34b3597593d246a1-exec-1 启动容器 spark-kubernetes-executor
82s Normal Pulled pod/pythonpi-34b3597593d246a1-exec-1 容器镜像“eu.gcr.io/yippee-spark-k8s/spark-py:3.0.1”已经存在于机器上
81s 正常 创建 pod/pythonpi-34b3597593d246a1-exec-2 创建容器 spark-kubernetes-executor
81s Normal Pulled pod/pythonpi-34b3597593d246a1-exec-2 容器镜像“eu.gcr.io/yippee-spark-k8s/spark-py:3.0.1”已经存在于机器上
80s 正常启动 pod/pythonpi-34b3597593d246a1-exec-2 启动容器 spark-kubernetes-executor
42s Normal Killing pod/pythonpi-34b3597593d246a1-exec-1 停止容器 spark-kubernetes-executor
42s Normal Killing pod/pythonpi-34b3597593d246a1-exec-2 停止容器 spark-kubernetes-executor
42 秒警告 FailedScheduling pod/pyspark-pi-driver-rush-bb25fc0b8efe7c4d 所有节点都不可用:3 个节点资源匹配失败。
**42s Normal Killing pod/pyspark-pi-driver-routine-bf20cae50b6a8253 停止容器 pyspark-pi
42s 警告 Evict pod/pyspark-pi-driver-routine-bf20cae50b6a8253 Pod 被驱逐,因为抢占**
34s 警告 Unschedulable podgroup/podgroup-ee4b9210-35c2-4d68-841a-2daf7712a816 组中的 0/1 个任务不可调度:pod 组未准备好,1 个流水线,1 分钟可用。
41s 警告 FailedScheduling pod/pyspark-pi-driver-rush-bb25fc0b8efe7c4d 1/1 团伙中的任务不可调度:pod 组未准备好,1 个流水线,1 分钟可用。
18s Normal Scheduled podgroup/podgroup-ee4b9210-35c2-4d68-841a-2daf7712a816 pod group 准备就绪
33s 正常调度 pod/pyspark-pi-driver-rush-bb25fc0b8efe7c4d 成功分配 spark-jobs/pyspark-pi-driver-rush-bb25fc0b8efe7c4d 到 gke-yippee-spark-k8s-clus-default-pool-0b72dd1d-jpfp
32s 正常启动 pod/pyspark-pi-driver-rush-bb25fc0b8efe7c4d 启动容器 pyspark-pi
32s 正常 创建 pod/pyspark-pi-driver-rush-bb25fc0b8efe7c4d 创建容器 pyspark-pi
32s 正常 拉出 pod/pyspark-pi-driver-rush-bb25fc0b8efe7c4d 容器映像“eu.gcr.io/yippee-spark-k8s/spark-py:3.0.1”已经存在于机器上
22s Normal Scheduled pod/pythonpi-f36cce7593d332f1-exec-1 成功分配 spark-jobs/pythonpi-f36cce7593d332f1-exec-1 到 gke-yippee-spark-k8s-clus-default-pool-0b72dd1d-wxck
22s Normal Scheduled pod/pythonpi-f36cce7593d332f1-exec-2 成功分配 spark-jobs/pythonpi-f36cce7593d332f1-exec-2 到 gke-yippee-spark-k8s-clus-default-pool-0b72dd1d-pvdl
22s Normal Pulled pod/pythonpi-f36cce7593d332f1-exec-1 容器镜像“eu.gcr.io/yippee-spark-k8s/spark-py:3.0.1”已经存在于机器上
21s 正常 启动 pod/pythonpi-f36cce7593d332f1-exec-1 启动容器 spark-kubernetes-executor
21s 正常 创建 pod/pythonpi-f36cce7593d332f1-exec-1 创建容器 spark-kubernetes-executor
21s Normal Pulled pod/pythonpi-f36cce7593d332f1-exec-2 容器镜像“eu.gcr.io/yippee-spark-k8s/spark-py:3.0.1”已经存在于机器上
21s 正常 创建 pod/pythonpi-f36cce7593d332f1-exec-2 创建容器 spark-kubernetes-executor
21s 正常 启动 pod/pythonpi-f36cce7593d332f1-exec-2 启动容器 spark-kubernetes-executor
在上面的输出中,我们可以看到 podpyspark-pi-driver-routine-bf20cae50b6a8253
已被另一个具有“rush”优先级的作业“驱逐,因为抢占”。
未来工作
Kubernetes 为容器提供了生命周期钩子。这些钩子使容器能够了解其管理生命周期中的事件,并在执行相应的生命周期钩子时运行在处理程序中实现的代码。
特别是,可以在容器因抢占(以及其他事件)而终止之前立即调用PreStop
挂钩。
因此,我们可以考虑在抢占的情况下触发一个动作,无论它是什么。您需要做的就是为这个钩子实现和注册一个处理程序。
请参阅容器生命周期挂钩。
我们很快就要结束旅程了。在祝你晚安之前,让我们看一下如何通过 Python 代码监控 Spark 应用程序。
监控
获取 Spark 应用程序的状态
Kubernetes Python 客户端中的许多操作都可以_watched_。这允许我们的 Python 程序监视特定资源中的更改,直到您获得所需的结果或监视到期。
在这里,我们要监控 pod 驱动程序的生命周期,从Pending
阶段开始,如果 Spark 容器启动正常,则通过Running
,然后通过Succeeded
或Failed
阶段:
from kubernetes import client, watch
app_name = 'pyspark-pi-routine-bf20cae50b6a8253'
v1 = client.CoreV1Api()
count = 2
w = watch.Watch()
label_selector = 'app-name=%s,spark-role=driver' % app_name
for event in w.stream(v1.list_namespaced_pod, namespace='spark-jobs', label_selector=label_selector, timeout_seconds=60):
print('Event: %s' % event['object'].status.phase)
count -= 1
if not count:
w.stop()
进入全屏模式 退出全屏模式
在这里,驱动程序 pod(因此是 Spark 应用程序)预计将在 60 秒或更短的时间内完成,无论成功与否。在此期间其状态应该只改变两次:理想情况下是Pending
>Running
>Succeeded
。
获取日志
可以检索驱动程序 pod 的日志并将它们混合到主机应用程序的日志中。
获取日志同样简单:
from kubernetes import client, watch
from threading import Thread
v1 = client.CoreV1Api()
pod_name = 'pyspark-pi-routine-bf20cae50b6a8253-driver'
def logs(pod_name):
w = watch.Watch()
for event in w.stream(v1.read_namespaced_pod_log, pod_name=pod_name, namespace='spark-jobs', _request_timeout=300):
yield event
# We surely don't want to block the main thread while reading the logs
def consumer():
for log in logs(pod_name):
print(log)
t = Thread(target=consumer)
t.start()
进入全屏模式 退出全屏模式
获取入口 URI
一旦 Spark 应用程序启动,暴露 Spark UI 的入口(至少是其公共 IP 地址)可能需要一段时间才能使用。在这里,我们也可以监控 Ingress 资源,如下所示:
from kubernetes import client, watch
app_name = 'pyspark-pi-routine-bf20cae50b6a8253'
networking_v1_beta1_api = client.NetworkingV1beta1Api()
w = watch.Watch()
label_selector = 'app-name=%s' % app_name
for event in w.stream(networking_v1_beta1_api.list_namespaced_ingress, namespace=namespace,
label_selector=label_selector,
timeout_seconds=30):
ingress = event['object'].status.load_balancer.ingress
if ingress:
external_ip = ingress[0].ip
print('Event: The Spark Web UI is available at http://%s/%s' % (external_ip, app_name))
w.stop()
else:
print('Event: Ingress not yet available')
进入全屏模式 退出全屏模式
结论
真是个地狱般的旅程!
我们已经看到,用于启动或删除 Spark 应用程序的 Python 代码略有不同,具体取决于我们使用的是 Spark Operator 还是 spark-submit。但是由于我们在两者之间一致地命名和标记 Kubernetes 对象,并且当我们正确设置所有权关系时,我们可以监控我们的 Spark 应用程序并平等地管理它们的生命周期。
你想知道更多吗?上一篇文章中解释的 Python 脚本可在GitHub 存储库中找到。为自己服务。
更多推荐
所有评论(0)