GoogleCloud Spark Operator Chart(未完待续)
Spark Application CRDGoogleCloud Spark Operator,基于Spark官方Kubernetes资源管理器的实现,实现了通过K8S接口提交Spark应用的功能。下面列出了一些官方Operator Chart中核心yaml文件的定义,可能与最新版本的文件有些不同,请读者自行更新。sparkapps-crd.yaml通过Opertaor方式提交Spark应...
Spark Operator Chart
GoogleCloud Spark Operator,基于Spark官方Kubernetes资源管理器的实现,实现了通过K8S接口提交Spark应用的功能。
下面列出了一些官方Operator Chart中核心yaml文件的定义,可能与最新版本的文件有些不同,请读者自行更新,同时欢迎交流。
sparkapps-crd.yaml
通过Opertaor方式提交Spark应用时的自定义资源描述文件,描述了所有类型为SparkApplication
的资源对象的关键Field的定义,各字段的验证规范须符合给定的JSON Template规则,如cores字段的值必须大于等于0等。
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
creationTimestamp: null
generation: 1
name: sparkapplications.sparkoperator.k8s.io
spec:
group: sparkoperator.k8s.io
names:
kind: SparkApplication
listKind: SparkApplicationList
plural: sparkapplications
shortNames:
- sparkapp
singular: sparkapplication
scope: Namespaced
validation:
openAPIV3Schema:
properties:
metadata:
properties:
name:
maxLength: 63
minLength: 1
type: string
spec:
properties:
deps:
properties:
downloadTimeout:
minimum: 1
type: integer
maxSimultaneousDownloads:
minimum: 1
type: integer
driver:
properties:
cores:
exclusiveMinimum: true
minimum: 0
type: number
podName:
pattern: '[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*'
executor:
properties:
cores:
exclusiveMinimum: true
minimum: 0
type: number
instances:
minimum: 1
type: integer
mode:
enum:
- cluster
- client
monitoring:
properties:
prometheus:
properties:
port:
maximum: 49151
minimum: 1024
type: integer
pythonVersion:
enum:
- "2"
- "3"
restartPolicy:
properties:
onFailureRetries:
minimum: 0
type: integer
onFailureRetryInterval:
minimum: 1
type: integer
onSubmissionFailureRetries:
minimum: 0
type: integer
onSubmissionFailureRetryInterval:
minimum: 1
type: integer
type:
enum:
- Never
- OnFailure
- Always
type:
enum:
- Java
- Scala
- Python
- R
version: v1beta2
status:
acceptedNames:
kind: ""
plural: ""
conditions: []
spark-operator-rbac.yaml
定义了Operator服务的RBAC规则,指定了能够有权限使用的API接口,如对Pod资源的增、删、改、查等。
{{- if .Values.rbac.create }}
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: {{ include "sparkoperator.fullname" . }}-cr
labels:
app.kubernetes.io/name: {{ include "sparkoperator.name" . }}
helm.sh/chart: {{ include "sparkoperator.chart" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["*"]
- apiGroups: [""]
resources: ["services", "configmaps", "secrets"]
verbs: ["create", "get", "delete", "update"]
- apiGroups: ["extensions"]
resources: ["ingresses"]
verbs: ["create", "get", "delete"]
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get"]
- apiGroups: [""]
resources: ["events"]
verbs: ["create", "update", "patch"]
- apiGroups: ["apiextensions.k8s.io"]
resources: ["customresourcedefinitions"]
verbs: ["create", "get", "update", "delete"]
- apiGroups: ["admissionregistration.k8s.io"]
resources: ["mutatingwebhookconfigurations"]
verbs: ["create", "get", "update", "delete"]
- apiGroups: ["sparkoperator.k8s.io"]
resources: ["sparkapplications", "scheduledsparkapplications"]
verbs: ["*"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: {{ include "sparkoperator.fullname" . }}-crb
labels:
app.kubernetes.io/name: {{ include "sparkoperator.name" . }}
helm.sh/chart: {{ include "sparkoperator.chart" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
subjects:
- kind: ServiceAccount
name: {{ include "sparkoperator.serviceAccountName" . }}
namespace: {{ .Release.Namespace }}
roleRef:
kind: ClusterRole
name: {{ include "sparkoperator.fullname" . }}-cr
apiGroup: rbac.authorization.k8s.io
{{- end }}
spark-rbac.yaml
定义了Spark应用,Kind为SparkApplication
,能够有权限访问的API接口,这里只需要拥有POD资源的所有权限即可,因为一个Spark应用可以认为就是启动了Driver&Executor两种容器,而Driver需要调用Pod接口来管理所有的Executor。
{{- if and (.Values.rbac.create) (ne .Values.sparkJobNamespace "") }}
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: {{ .Values.sparkJobNamespace }}
name: spark-role
labels:
app.kubernetes.io/name: {{ include "sparkoperator.name" . }}
helm.sh/chart: {{ include "sparkoperator.chart" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
rules:
- apiGroups:
- "" # "" indicates the core API group
resources:
- "pods"
verbs:
- "*"
- apiGroups:
- "" # "" indicates the core API group
resources:
- "services"
verbs:
- "*"
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: spark-role-binding
namespace: {{ .Values.sparkJobNamespace }}
labels:
app.kubernetes.io/name: {{ include "sparkoperator.name" . }}
helm.sh/chart: {{ include "sparkoperator.chart" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
subjects:
- kind: ServiceAccount
name: {{ include "spark.serviceAccountName" . }}
namespace: {{ .Values.sparkJobNamespace }}
roleRef:
kind: Role
name: spark-role
apiGroup: rbac.authorization.k8s.io
{{- end }}
webhook-init-job.yaml
截至v2.4.4-v1beta2版本,Operator官方只支持Service的方式注册Webhook服务到Kubernetes集群,因此本人修改了一相关的脚本和代码,以支持通过外部域名或是Ingress来注册服务,并可以通过参数的选择。
{{ if .Values.enableWebhook }}
apiVersion: batch/v1
kind: Job
metadata:
name: {{ include "sparkoperator.fullname" . }}-init
namespace: {{ .Release.Namespace }}
annotations:
"helm.sh/hook": post-install, post-upgrade
"helm.sh/hook-delete-policy": hook-succeeded
labels:
app.kubernetes.io/name: {{ include "sparkoperator.name" . }}
helm.sh/chart: {{ include "sparkoperator.chart" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
spec:
template:
spec:
serviceAccountName: {{ include "sparkoperator.serviceAccountName" . }}
restartPolicy: OnFailure
containers:
- name: main
image: {{ .Values.operatorImageName }}:{{ .Values.operatorVersion }}
imagePullPolicy: {{ .Values.imagePullPolicy }}
{{- if .Values.environments }}
env:
- name: WEBHOOK_INGRESS_NAME
value: {{ include "sparkoperator.webhookIngressName" . }}
{{- range $key, $val := .Values.environments }}
- name: {{ $key }}
value: {{ $val | quote }}
{{- end }}
{{- end }}
## create secret bind to a Service, the original definition
#command: ["/bin/bash", "-l", "-c", "/usr/bin/gencerts.sh -n {{ .Release.Namespace }} -s {{ .Release.Name }}-webhook -p"]
## create secret bind to an Ingress
## gencerts.sh usage:
## -h | --help Display help information.
## -n | --namespace <namespace> The namespace where the Spark operator is installed.
## -s | --service <service> The name of the webhook service.
## -i | --ingress <ingress> The name of the webhook ingress name if using url mode
## -p | --in-pod Whether the script is running inside a pod or not.
command: ["/bin/bash", "-l", "-c", "sudo sed -i 's/kubernetes.default.svc/xx-k8s-master-domain.com/g' /usr/bin/gencerts.sh; /usr/bin/gencerts.sh -n {{ .Release.Namespace }} -i ${WEBHOOK_INGRESS_NAME} -p"]
{{ end }}
spark-operator-deployment.yaml
Spark Operator服务的定义文件,指定如果启动一个Controller、Webhook进程,所有的相关组件都围绕这个Deployment开展工作,截至v2.4.4-v1beta2版本,已经可以支持Scala、Python、R程序。
# If the admission webhook is enabled, then a post-install step is required
# to generate and install the secret in the operator namespace.
# In the post-install hook, the token corresponding to the operator service account
# is used to authenticate with the Kubernetes API server to install the secret bundle.
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "sparkoperator.fullname" . }}
namespace: {{ .Release.Namespace }}
labels:
app.kubernetes.io/name: {{ include "sparkoperator.name" . }}
helm.sh/chart: {{ include "sparkoperator.chart" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: {{ include "sparkoperator.name" . }}
app.kubernetes.io/version: {{ .Values.operatorVersion }}
strategy:
type: Recreate
template:
metadata:
{{- if .Values.enableMetrics }}
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "{{ .Values.metricsPort }}"
prometheus.io/path: {{ .Values.metricsEndpoint }}
{{- end }}
labels:
app.kubernetes.io/name: {{ include "sparkoperator.name" . }}
app.kubernetes.io/version: {{ .Values.operatorVersion }}
initializers:
pending: []
spec:
serviceAccountName: {{ include "sparkoperator.serviceAccountName" . }}
volumes:
- name: hdfs-conf
configMap:
name: demo-hdfs-conf
- name: spark-conf
configMap:
name: demo-spark-operator-2.4.3
- name: hive-conf
configMap:
name: demo-hive-conf
{{- if .Values.enableWebhook }}
- name: webhook-certs
secret:
secretName: {{ include "sparkoperator.webhookSecretName" . }}
{{- end }}
containers:
- name: sparkoperator
image: {{ .Values.operatorImageName }}:{{ .Values.operatorVersion }}
imagePullPolicy: {{ .Values.imagePullPolicy }}
{{- if .Values.environments }}
env:
{{- range $key, $val := .Values.environments }}
- name: {{ $key }}
value: {{ $val | quote }}
{{- end }}
{{- end }}
volumeMounts:
- name: hdfs-conf
mountPath: /etc/conf/hdfs
- name: spark-conf
mountPath: /etc/conf/spark
- name: hive-conf
mountPath: /etc/conf/hive
{{- if .Values.enableWebhook }}
- name: webhook-certs
mountPath: /etc/webhook-certs
{{- end }}
{{- if .Values.enableMetrics }}
ports:
- containerPort: {{ .Values.metricsPort }}
{{ end }}
args:
- -v={{ .Values.logLevel }}
- -namespace={{ .Values.sparkJobNamespace }}
- -ingress-url-format={{ .Values.ingressUrlFormat }}
- -controller-threads={{ .Values.controllerThreads }}
- -resync-interval={{ .Values.resyncInterval }}
- -logtostderr
{{- if .Values.enableMetrics }}
- -enable-metrics=true
- -metrics-labels=app_type
- -metrics-port={{ .Values.metricsPort }}
- -metrics-endpoint={{ .Values.metricsEndpoint }}
- -metrics-prefix={{ .Values.metricsPrefix }}
{{- end }}
{{- if .Values.enableWebhook }}
- -enable-webhook=true
- -webhook-svc-namespace={{ .Release.Namespace }}
- -webhook-port={{ .Values.webhookPort }}
- -webhook-svc-name={{ include "sparkoperator.webhookServiceName" . }}
- -webhook-config-name={{ include "sparkoperator.fullname" . }}-webhook-config
## two self-defined args for supporting URL
- -webhook-mode={{ .Values.webhookMode }}
- -webhook-url={{ .Values.webhookUrl }}
{{- end }}
{{- if .Values.nodeSelector }}
nodeSelector:
{{ toYaml .Values.nodeSelector | indent 8 }}
{{- end }}
Spark Application Chart
定义了一个Spark应用需要的资源信息,如Driver容器的可能CPU数、内存大小、是否挂载Volumes等,同理定义Executor容器。
sparkapp-template.yaml
apiVersion: {{ required "You must specify the active api version!" .Values.apiVersion | quote }}
kind: SparkApplication
metadata:
name: {{ include "sparkapp.fullname" . }}
namespace: {{ .Values.namespace | default "mlp" }}
annotations:
business_domain: {{ .Values.businessDomain }}
spec:
type: {{ .Values.type | default "Scala" }}
mode: {{ .Values.mode | default "cluster" }}
image: {{ .Values.image }}
{{- if eq .Values.type "Python" }}
pythonVersion: {{ .Values.pythonVersion | default 3 | quote }}
mainApplicationFile: {{required "Then entrypoint python file should be given!" .Values.mainApplicationFile | quote }}
{{- end }}
{{- if eq .Values.type "Scala" }}
mainClass: {{ required "While submitting a Scala application, you should specify the main class!" .Values.mainClass | quote }}
mainApplicationFile: {{ required "While submitting a Scala application, a jar file should be supplied!" .Values.mainApplicationFile | quote }}
{{- end }}
{{- if .Values.arguments }}
arguments:
{{- range $val := .Values.arguments }}
- {{ $val | quote }}
{{- end }}
{{- end }}
{{- with .Values.deps }}
deps:
{{- if .jars }}
jars:
{{- range .jars }}
- {{ . }}
{{- end }}
{{- end }}
{{- if .files }}
files:
{{- range .files }}
- {{ . }}
{{- end }}
{{- end }}
{{- if .pyFiles }}
pyFiles:
{{- range .pyFiles }}
- {{ . }}
{{- end }}
{{- end }}
{{- end }}
{{- if .Values.sparkConf }}
sparkConf:
{{- range $key, $val := .Values.sparkConf }}
{{ $key }}: {{ quote $val }}
{{- end }}
{{- include "sparkapp.envs" . | indent 4 }}
{{- end }}
{{- if .Values.sparkConfigMap }}
sparkConfigMap: {{ .Values.sparkConfigMap }}
{{- end }}
sparkVersion: {{ .Values.sparkVersion }}
{{- include "sparkapp.volumes" . }}
{{- include "sparkapp.driver" . }}
{{- include "sparkapp.executor" . }}
{{- if .Values.nodeSelector }}
nodeSelector:
{{ toYaml .Values.nodeSelector | indent 4 }}
{{- end }}
{{- if .Values.restartPolicy }}
restartPolicy:
{{ toYaml .Values.restartPolicy | indent 4 }}
{{- end }}
_helpers.tpl
以.tpl为后缀的文件,用于定义模板对象,在此文件中定义的变量,可以被其它yaml直接引用,请参考详细的语法规则。
我在这个文件中定义了Volume、Driver、Executor对象,但不一定是最合适的地方,仅供参考。
{{/* Define executor args */}}
{{- define "sparkapp.labels" }}
annotations:
business_domain: {{ .Values.businessDomain }}
labels:
version: {{ .Values.sparkVersion }}
falcon: spark-operator
vip.com/helm-reference: {{ .Release.Name }}
{{- end }}
{{- define "sparkapp.driverlabels" -}}
{{ include "sparkapp.labels" . }}
{{- range $key, $value := .Values.driver.labels }}
{{ $key }}: {{ $value }}
{{- end }}
{{- end -}}
{{- define "sparkapp.executorlabels" -}}
{{ include "sparkapp.labels" . }}
{{- range $key, $value := .Values.executor.labels }}
{{ $key }}: {{ $value }}
{{- end }}
{{- end -}}
{{/* Define driver & executor environments */}}
{{- define "sparkapp.envs" -}}
{{- range $kv := .Values.envs }}
{{ printf "%s.%s" "spark.kubernetes.driverEnv" $kv.name | quote }}: {{ $kv.value | quote }}
{{ printf "%s.%s" "spark.executorEnv" $kv.name | quote }}: {{ $kv.value | quote }}
{{- end }}
{{- end -}}
{{- define "sparkapp.fullname" -}}
{{- $name := default "spark-app" .Values.appname -}}
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" -}}
{{- end -}}
{{- define "sparkapp.volumes" }}
volumes:
{{- with .Values.configMaps }}
- name: hdfs-conf
configMap:
name: {{ .hdfsconf }}
- name: hive-conf
configMap:
name: {{ .hiveconf }}
{{- end }}
- name: spark-log
hostPath:
# spark logs location on host
path: {{ .Values.volumes.hostPath.sparkLog | default "/tmp/operator/log" }}
# optional
type: DirectoryOrCreate
- name: spark-work-dir
hostPath:
# spark worker location on host
path: {{ .Values.volumes.hostPath.sparkWork | default "/tmp/operator/worker" }}
# optional
type: DirectoryOrCreate
- name: spark-hard-disk
hostPath:
# spark local data location on host
path: {{ .Values.volumes.hostPath.sparkLocalData | default "/tmp/operator/data" }}
# optional
type: DirectoryOrCreate
{{- end }}
{{/* Spark driver defines. */}}
{{- define "sparkapp.driver" }}
driver:
cores: {{ .Values.driver.cores | default 1 }}
coreLimit: {{ .Values.driver.coreLimit | default 1 | quote }}
memory: {{ .Values.driver.memory | default "512m" }}
{{- if .Values.driver.configMaps }}
configMaps:
{{- range $kv := .Values.driver.configMaps }}
- name: {{ $kv.name }}
path: {{ $kv.path }}
{{- end }}
{{- end }}
{{- if .Values.driver.volumeMounts }}
volumeMounts:
{{- range $kv := .Values.driver.volumeMounts }}
- name: {{ $kv.name }}
mountPath: {{ $kv.mountPath }}
{{- end }}
{{- end }}
{{- with .Values.driver.gpu }}
gpu:
name: {{ .name | default "nvidia.com/gpu" }}
quantity: {{ .quantity | default 0 }}
{{- end }}
{{- include "sparkapp.driverlabels" . }}
serviceAccount: {{ .Values.serviceAccount | default "default" }}
{{- end }}
{{/* Spark executor defines. */}}
{{- define "sparkapp.executor" }}
executor:
{{- include "sparkapp.executorlabels" . }}
cores: {{ .Values.executor.cores | default 1 }}
instances: {{ .Values.executor.instances | default 1 }}
memory: {{ .Values.executor.memory | default "512m" }}
{{- if .Values.executor.volumeMounts }}
volumeMounts:
{{- range $kv := .Values.executor.volumeMounts }}
- name: {{ $kv.name }}
mountPath: {{ $kv.mountPath }}
{{- end }}
{{- end }}
{{- with .Values.executor.gpu }}
gpu:
name: {{ .name | default "nvidia.com/gpu" }}
quantity: {{ .quantity | default 0 }}
{{- end }}
{{- end }}
更多推荐
所有评论(0)