1 Background

离线计算的 Spark 任务中,会提供「最小副本数」作为离线计算任务的 Executor 数。也就是说,当 Spark Job 最大的资源数就是500个 Executor,这样会导致 task 数量较多的任务一直在等待资源,并发度其实是被限制的,当然,写成1000个,也可能会导致资源浪费的问题。task 数量多,并行处理数为 cores * executors = 2 * 500 = 1000,也就是说同时处理1000个 task。

on Yarn 很早就提供了 Dynamic Resource Allocation(DRA),Spark Executor 可以根据 workload(task 数量) 进行 scale。所以就不用填这个参数了,spark.executor.instances

Spark 2.2 on K8S 提供了一种 shuffle service 的实现,可以支持 DRA。

2 Practice

2.1 spark shuffle service

部署 Spark shuffle service 的 DaemonSet。

image_1ddievaa3fla1b771j7c2r7i52m.png-84kB

apiVersion: extensions/v1beta1
kind: DaemonSet
metadata:
  labels:
    app: spark-shuffle-service
    spark-version: 2.2.0
  name: shuffle
spec:
  template:
    metadata:
      labels:
        app: spark-shuffle-service
        spark-version: 2.2.0
    spec:
      volumes:
        - name: temp-volume
          hostPath:
            # 本地 cache/shuffle 的地址
            path: '/tmp/spark-local' # change this path according to your cluster configuration.
      containers:
        - name: shuffle
          # This is an official image that is built
          # from the dockerfiles/shuffle directory
          # in the spark distribution.
          image: kubespark/spark-shuffle:v2.2.0-kubernetes-0.4.0
          imagePullPolicy: IfNotPresent
          volumeMounts:
            - mountPath: '/tmp/spark-local'
              name: temp-volume
              # more volumes can be mounted here.
              # The spark job must be configured to use these
              # mounts using the configuration:
              #   spark.kubernetes.shuffle.dir=<mount-1>,<mount-2>,...
          resources:
             requests:
               cpu: "1"
             limits:
               cpu: "1"

image_1ddif3hvp9s61fp8p3t142calr13.png-169.3kB

2.2 spark groupbytest

测试一个 shuffle 程序,测试脚本如下。

bin/spark-submit \
 --deploy-mode cluster \
 --class org.apache.spark.examples.GroupByTest \
 --master=k8s://https://kubernetes.default.svc      \
 --kubernetes-namespace dbyin \
 --conf spark.local.dir=/tmp/spark-local \
 --conf spark.app.name=group-by-test \
 --conf spark.dynamicAllocation.enabled=true \
 # shuffle 启动
 --conf spark.shuffle.service.enabled=true \
 --conf spark.kubernetes.shuffle.namespace=dbyin \
 --conf spark.kubernetes.shuffle.labels="app=spark-shuffle-service,spark-version=2.2.0" \
 # spark.dynamicAllocation.minExecutors - Lower bound on the number of executors
 --conf spark.dynamicAllocation.minExecutors=13 \
 # spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors
 --conf spark.dynamicAllocation.maxExecutors=20 \
 # spark.dynamicAllocation.initialExecutors - Number of executors to start with
 --conf spark.dynamicAllocation.initialExecutors=13 \
 --conf spark.kubernetes.driver.docker.image=hub.oa.com/dbyin/spark-driver-runzhliu:0.0.2 \
 --conf spark.kubernetes.executor.docker.image=hub.oa.com/dbyin/spark-executor-runzhliu:0.0.2 \
 local:///opt/spark/examples/jars/spark-examples_2.11-2.2.0-k8s-0.5.0.jar 10 4000000 2

查看日志,Spark 启动的 Executor 按照最小值启动 13 个(默认10个)。

image_1ddifdqbkvgn1i0d19i61j8sfhj1g.png-248.4kB

随便调整一下 GroupByTest 的参数,让他的 task 更多。就会启动20个 Executor 了。

image_1ddifjubvju821k1l2a1f3q1ma01t.png-749.1kB

3 Summary

DRA 有个很合适的场景,就是用在流计算当中。当数据流有较大幅度的波动的时候,开启这个特性,可以让 Spark 根据自身的 workload 来调整 Executor 的数量,task 可以被调度到更多的 Executor 中。另外,在一些机器学习的场景中,以及 Spark SQL 中,这个特性都能带来资源的更好的利用率。

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐