Spark借助Alluxio保障在k8s中的数据本地性
Spark借助Alluxio保障在k8s中的数据本地性继续接着之前的文章谈一谈,Spark在k8s中的数据本地性。上一篇文章详见Spark在k8s中的数据本地性,这里开始实际操作。主要参考官网文档,下面主要记录几个需要注意的地方:开启短读【short-cuit】特性【默认开启】,短读特性是指client在读取数据时,如果数据在worker本地节点存在的话,client会直接读取数据而不是通过wor
Spark借助Alluxio保障在k8s中的数据本地性
继续接着之前的文章谈一谈,Spark在k8s中的数据本地性。上一篇文章详见Spark在k8s中的数据本地性,这里开始实际操作。
主要参考官网文档,下面主要记录几个需要注意的地方:
-
开启短读【short-cuit】特性【默认开启】,短读特性是指client在读取数据时,如果数据在worker本地节点存在的话,client会直接读取数据而不是通过worker读取数据,对性能有较好的提升。
# 用户自定义的config.yaml中,在worker定义部分设置 properties: alluxio.worker.data.server.domain.socket.address=/opt/domain alluxio.worker.data.server.domain.socket.as.uuid=true domainHostPath: "/tmp/alluxio-domain" worker: shortCircuit: enabled: true shortCiruitPolicy: "uuid" domainHostPath: "/tmp/alluxio-domain"
-
设置Alluxio Worker和Alluxio fuse所在的pod宿主机使用相同的ip
# 在worker定义部分设置 worker: hostNetWork: true fuse: hostNetWork: true
-
在Spark Driver和Executor中,挂载domain 相同的path,同时设置Executor pod和宿主机使用相同的IP
# 在specSpark-operator定义中设置 spec: executor: # spark源码中,在计算数据本地性时,在计算NODE_LOCAL这一级别时,会校验task执行的Executor的hostIP是否包含对于数据所在位置的hostIp,包含则数据本地性级别为NODE_LOCAL,详见我的另一篇文章。 hostNetwork: true sparkConf: spark.kubernetes.executor.volumes.hostPath.alluxio-domain.mount.path=/opt/domain spark.kubernetes.executor.volumes.hostPath.alluxio-domain.mount.readOnly=true spark.kubernetes.executor.volumes.hostPath.alluxio-domain.options.path=/tmp/alluxio-domain spark.kubernetes.executor.volumes.hostPath.alluxio-domain.options.type=Directory spark.driver.extraJavaOptions: "-Dalluxio.master.rpc.addresses=alluxio-master-0:19998,alluxio-master-1:19998,alluxio-master-2:19998" spark.executor.extraJavaOptions: "-Dalluxio.master.rpc.addresses=alluxio-master-0:19998,alluxio-master-1:19998,alluxio-master-2:19998"
-
设置selector选取部分k8s node节点,用于部署Alluxio
# 在Alluxio用户定义文件config.yaml中,为master、worker、fuse、fuse-client设置nodeSeletor # 由于在helm charts中,Alluxio master、worker、fuse除了使用各自的nodeSeletor外,还会统一使用主配置,这里直接在主配置上设置nodeSeletor。同时,修改templetes/fuse/client-daemonset.yaml ,为fuse-client单独添加nodeSeletor # 首先为k8s node 打标签 kubectl label node node1 alluxio=true ... # 修改confi.yaml nodeSeletor:{"alluxio": "true"} # 修改 templetes/fuse/client-daemonset.yaml spec: template: spec: # fuse 和 fuse-client 使用相同的 nodeseletor ,主要是两者功能相同。20200702 nodeSelector: {{- if .Values.fuse.nodeSelector }} {{ toYaml .Values.fuse.nodeSelector | trim | indent 8 }} {{- else if .Values.nodeSelector }} {{ toYaml .Values.nodeSelector | trim | indent 8 }} {{- end }}
-
修改 templates/config/alluxio-conf.yaml,修复Alluxio 在k8s中,分层存储时,不能定义多层存储的bug。主要添加 -Dalluxio.worker.tieredstore.level{{ .level }}.alias={{ .mediumtype }} 到 ALLUXIO_WORKER_JAVA_OPTS 中。
ALLUXIO_WORKER_JAVA_OPTS: |- -Dalluxio.worker.bind.host=0.0.0.0 {{ .Values.worker.jvmOptions }} {{- if eq .Values.worker.shortCircuitPolicy "uuid" }} -Dalluxio.worker.data.server.domain.socket.address=/opt/domain -Dalluxio.worker.data.server.domain.socket.as.uuid=true {{- end}} {{- if .Values.worker.resources }} {{- if .Values.worker.resources.requests }} {{- if .Values.worker.resources.requests.memory }} -Dalluxio.worker.memory.size={{ .Values.worker.resources.requests.memory }} {{- end}} {{- end}} {{- end}} -Dalluxio.worker.rpc.port={{ .Values.worker.resources.ports.rpc }} -Dalluxio.worker.web.port={{ .Values.worker.resources.ports.web }} {{- range $key, $val := .Values.worker.properties }} -D{{ $key }}={{ $val }} {{- end}} -Dalluxio.worker.hostname=${ALLUXIO_WORKER_HOSTNAME} {{- if .Values.tieredstore }} -Dalluxio.worker.tieredstore.levels={{ len .Values.tieredstore.levels }} {{- range .Values.tieredstore.levels }} -Dalluxio.worker.tieredstore.level{{ .level }}.alias={{ .mediumtype }} -Dalluxio.worker.tieredstore.level{{ .level }}.dirs.mediumtype={{ .mediumtype }} {{- if .path }} -Dalluxio.worker.tieredstore.level{{ .level }}.dirs.path={{ .path }} {{- end}} {{- if .quota }} -Dalluxio.worker.tieredstore.level{{ .level }}.dirs.quota={{ .quota }} {{- end}} {{- if .high }} -Dalluxio.worker.tieredstore.level{{ .level }}.watermark.high.ratio={{ .high }} {{- end}} {{- if .low }} -Dalluxio.worker.tieredstore.level{{ .level }}.watermark.low.ratio={{ .low }} {{- end}} {{- end}} {{ end }}
-
修改templates/fuse/daemonset.yaml,修复Alluxio在k8s中,alluxio-fuse pod失败后自动重启不能成功的问题。使用pv挂载的方式替代之前的hostpath。
apiVersion: v1 kind: PersistentVolumeClaim metadata: name: alluxio-fuse-mount-pvc namespace: spark spec: storageClassName: standard-alluxio accessModes: ["ReadWriteMany"] resources: requests: storage: 20Gi --- {{ if .Values.fuse.enabled -}} apiVersion: apps/v1 kind: DaemonSet metadata: name: {{ template "alluxio.fullname" . }}-fuse labels: app: {{ template "alluxio.name" . }} chart: {{ template "alluxio.chart" . }} release: {{ .Release.Name }} heritage: {{ .Release.Service }} role: alluxio-fuse spec: selector: matchLabels: app: {{ template "alluxio.name" . }} chart: {{ template "alluxio.chart" . }} release: {{ .Release.Name }} heritage: {{ .Release.Service }} role: alluxio-fuse template: metadata: labels: app: {{ template "alluxio.name" . }} chart: {{ template "alluxio.chart" . }} release: {{ .Release.Name }} heritage: {{ .Release.Service }} role: alluxio-fuse spec: hostNetwork: {{ .Values.fuse.hostNetwork }} dnsPolicy: {{ .Values.fuse.dnsPolicy }} nodeSelector: {{- if .Values.fuse.nodeSelector }} {{ toYaml .Values.fuse.nodeSelector | trim | indent 8 }} {{- else if .Values.nodeSelector }} {{ toYaml .Values.nodeSelector | trim | indent 8 }} {{- end }} securityContext: runAsUser: {{ .Values.fuse.user }} runAsGroup: {{ .Values.fuse.group }} fsGroup: {{ .Values.fuse.fsGroup }} affinity: podAffinity: RequiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: app operator: In values: - {{ template "alluxio.name" . }} - key: role operator: In values: - alluxio-worker containers: - name: alluxio-fuse image: {{ .Values.fuse.image }}:{{ .Values.fuse.imageTag }} imagePullPolicy: {{ .Values.fuse.imagePullPolicy }} {{- if .Values.fuse.resources }} resources: {{- if .Values.fuse.resources.limits }} limits: cpu: {{ .Values.fuse.resources.limits.cpu }} memory: {{ .Values.fuse.resources.limits.memory }} {{- end }} {{- if .Values.fuse.resources.requests }} requests: cpu: {{ .Values.fuse.resources.requests.cpu }} memory: {{ .Values.fuse.resources.requests.memory }} {{- end }} {{- end }} command: ["/entrypoint.sh"] {{- if .Values.fuse.args }} args: {{ toYaml .Values.fuse.args | trim | indent 12 }} {{- end }} env: - name: ALLUXIO_CLIENT_HOSTNAME valueFrom: fieldRef: fieldPath: status.hostIP - name: ALLUXIO_CLIENT_JAVA_OPTS value: " -Dalluxio.user.hostname=$(ALLUXIO_CLIENT_HOSTNAME) " securityContext: privileged: true capabilities: add: - SYS_ADMIN # lifecycle: # preStop: # exec: # command: ["/opt/alluxio/integration/fuse/bin/alluxio-fuse", "unmount", "/alluxio-fuse"] envFrom: - configMapRef: name: {{ template "alluxio.fullname" . }}-config volumeMounts: - name: alluxio-fuse-device mountPath: /dev/fuse - name: alluxio-fuse-mount mountPath: /alluxio-fuse mountPropagation: Bidirectional {{- if eq .Values.worker.shortCircuitPolicy "uuid" }} - name: alluxio-domain mountPath: /opt/domain {{- end }} {{- if eq .Values.worker.shortCircuitPolicy "local" }} {{- if .Values.tieredstore }} {{- if .Values.tieredstore.levels }} {{- range .Values.tieredstore.levels }} {{- if .mediumtype }} {{- if contains "," .mediumtype }} {{- $type := .type }} {{- $path := .path }} {{- $split := split "," .mediumtype }} {{- range $key, $val := $split }} {{- if eq $type "hostPath"}} - mountPath: {{ index ($path | split ",") $key }} name: {{ $val | lower }}-{{ $key | replace "_" "" }} {{- end}} {{- end}} {{- else}} {{- if eq .type "hostPath"}} - name: {{ .mediumtype | replace "," "-" | lower }} mountPath: {{ .path }} {{- else }} # report error and exit {{ .name }} with {{ .type }} is not supported in shortCircuitPolicy local {{- end }} {{- end }} {{- end}} {{- end }} {{- end }} {{- end }} {{- end }} restartPolicy: Always volumes: - name: alluxio-fuse-device hostPath: path: /dev/fuse type: File - name: alluxio-fuse-mount persistentVolumeClaim: claimName: alluxio-fuse-mount-pvc # hostPath: # path: /alluxio-fuse # type: DirectoryOrCreate {{- if eq .Values.worker.shortCircuitPolicy "uuid" }} - name: alluxio-domain hostPath: path: {{ .Values.domainHostPath }} type: "Directory" {{- end }} {{- if eq .Values.worker.shortCircuitPolicy "local" }} {{- if .Values.tieredstore }} {{- if .Values.tieredstore.levels }} {{- range .Values.tieredstore.levels }} {{- if .mediumtype }} {{- if contains "," .mediumtype }} {{- $split := split "," .mediumtype }} {{- $type := .type }} {{- $path := .path }} {{- range $key, $val := $split }} {{- if eq $type "hostPath"}} - hostPath: path: {{ index ($path | split ",") $key }} type: DirectoryOrCreate name: {{ $val | lower }}-{{ $key | replace "_" "" }} {{- else }} - name: {{ $val | lower }}-{{ $key | replace "_" "" }} emptyDir: medium: "Memory" {{- if .quota }} sizeLimit: {{ .quota }} {{- end}} {{- end}} {{- end}} {{- else}} {{- if eq .type "hostPath"}} - hostPath: path: {{ .path }} type: DirectoryOrCreate name: {{ .mediumtype | replace "," "-" | lower }} {{- end }} {{- end }} {{- end}} {{- end }} {{- end }} {{- end }} {{- end }} {{- end }}
-
修改master 中的动态pv改为静态pv
更多推荐
所有评论(0)