在YARN上运行Spark的常用配置参数讲解
本文针对在YARN上运行Spark的常用配置参数进行讲解1. 在yarn上启动spark application确保HADOOP_CONF_DIR或YARN_CONF_DIR指向包含Hadoop集群(客户端)配置文件的目录。这些configs用于写入HDFS并连接YARN ResourceManager。这个目录中包含的配置将被分发到YARN集群中,以便应用程序使用的所有容器使用相同的配置。如果配
本文针对在YARN上运行Spark的常用配置参数进行讲解
1. 在yarn上启动spark application
确保HADOOP_CONF_DIR或YARN_CONF_DIR指向包含Hadoop集群(客户端)配置文件的目录。
这些configs用于写入HDFS并连接YARN ResourceManager。这个目录中包含的配置将被分发到YARN集群中,以便应用程序使用的所有容器使用相同的配置。如果配置引用的Java系统属性或环境变量不是由YARN管理的,它们也应该在Spark应用程序的配置(driver, executor和AM)中设置
如在HDP 3.1.5中,部署spark 2.4, spark-env.sh的配置如下:
#!/usr/bin/env bash
export SPARK_CLASSPATH=/usr/hdp/current/tez-client:/usr/hdp/current/tez-client/lib:/usr/hdp/current/hive-client/lib:$SPARK_CLASSPATH
HADOOP_CONF_DIR=/etc/hadoop/conf
SPARK_MASTER_WEBUI_PORT=9090
export JAVA_HOME=/usr/java/jdk1.8.0_60
export HIVE_HOME=/usr/hdp/current/hive-client/bin
export SPARK_HISTORY_OPTS="-Dspark.history.retainedApplications=30 -Dspark.history.fs.logDirectory=hdfs://hdp01:8020/spark2.4-history"
export HDP_VERSION=3.1.5.0
SPARK_JAVA_OPTS= -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
在YARN上启动Spark应用程序有两种部署模式:
-
cluster模式下,Spark driver运行在集群中YARN管理的application master进程中,启动应用后客户端退出。
-
client模式下,driver运行在客户端进程中,application master程序仅用于向YARN请求资源。
不同于Spark支持的其他集群管理器(如Standalone, Mesos),在--master参数中指定了master的地址,在YARN模式下,ResourceManager的地址从Hadoop配置中获取。因此,--master参数是yarn。
如启动一个计算pi的Spark程序,命令如下:
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
--queue thequeue \
examples/jars/spark-examples*.jar \
10
2. spark-submit提交任务参数
2.1. 参数说明
蓝色字段是常用的参数
参数名称 | 参数说明 | 运行模式 |
---|---|---|
--master | master 的地址,提交任务到哪里执行,支持5种: spark://host:port, mesos://host:port, yarn, k8s://https://host:port local (Default: local[*]) | all |
--deploy-mode | 在本地 (client) 启动 driver 或在 cluster 上启动,默认是 client | all |
--class | 应用程序的主类,仅针对 java 或 scala 应用 | |
--name | 应用程序名称 | all |
--jars | 用逗号分隔的本地 jar 包,设置后,这些 jar 将包含在 driver 和 executor 的 classpath 下 | all |
--packages | 包含在driver 和executor 的 classpath 中的 jar 的 maven 坐标 | all |
--exclude-packages | 为了避免冲突 而指定不包含的 package | all |
--repositories | 远程 repository | all |
--file | 用逗号分隔的文件,这些文件被放在每个executor的工作目录,并通过SparkFiles.get(fileName)访问 | all |
--conf PROP=VALUE | 指定 spark 配置属性的值, 例如 -conf spark.executor.extraJavaOptions="-XX:MaxPermSize=256m" | all |
--properties-file | 加载的配置文件,默认为 conf/spark-defaults.conf | all |
--driver-memory | Driver内存,默认 1024M | all |
--driver-java-options | 传给 driver 的额外的 Java 选项 | all |
--driver-library-path | 传给 driver 的额外的库路径 | all |
--driver-class-path | 传给 driver 的额外的类路径 | all |
--executor-memory | 每个 executor 的内存,默认是1G | all |
--driver-cores | driver使用的cpu核数,默认为1 | yarn/standalone |
--total-executor-cores | 所有 executor 总共的核数。 | mesos/standalone |
--executor-core | 每个 executor 的核数。 | yarn/standalone |
--num-executors | 启动的 executor 数量。默认为2 | yarn |
--queue | 提交应用程序给哪个YARN的队列,默认是default队列 | yarn |
--archives | 被每个executor提取到工作目录的档案列表,用逗号隔开 | yarn |
2.2. 命令示例
spark-shell --packages mysql:mysql-connector-java:5.1.27 --repositories http://maven.aliyun.com/nexus/content/groups/public/
# yarn cluster提交,使用spark.driver.extraJavaOptions
spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster --driver-memory 1g --executor-memory 1g --executor-cores 1 --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-driver.xml" --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j-executor.xml" --files=/opt/test/conf/log4j-driver.xml,/opt/test/conf/log4j-executor.xml --jars "/opt/test/slf4j-api-1.7.25.jar" spark-examples_2.11-2.4.7.jar 10
3. spark application配置参数
3.1. heap内存模型
Spark 为了更高效的使用配置的堆内存,对堆内存进行了细分,下图(备注:此图源于互联网)对基于spark2(1.6+)对堆内存分配比例进行了描述
其中:
Reserved Memory 保留内存,系统默认值为300,一般无需改动,不用关心此部分内存。 但如果Executor分配的内存小于 1.5 * 300 = 450M时,Executor将无法执行。
Storage Memory 存储内存,用于存放广播数据及RDD缓存数据。由下图可知,Spark 2+中,初始状态下,Storage及Execution Memory均约占系统总内存的30%(1 * 0.6 * 0.5 = 0.3)。
在UnifiedMemory管理中,这两部分内存可以相互借用,为了方便描述,使用storageRegionSize来表示“spark.storage.storageFraction”。
当计算内存不足时,可以改造storageRegionSize中未使用部分,且StorageMemory需要存储内存时也不可被抢占; 若实际StorageMemory使用量超过storageRegionSize,那么当计算内存不足时,可以改造(StorageMemory – storageRegionSize)部分,而storageRegionSize部分不可被抢占。
详见参见:http://spark.apache.org/docs/2.4.0/configuration.html#memory-management
3.2. 参数说明
内存相关的配置项如下,默认值是指spark 2.4.x的默认值
配置项 | 说明 | 默认值 |
spark.executor.memory | 指JVM最大分配的堆内存【"-xmx" heap space - 300MB】中用于执行和存储的部分。这个值越低,溢出和缓存数据逐出(spills and cached data eviction)发生的频率就越高。此配置的目的是为内部元数据、用户数据结构预留内存,以及在稀疏、异常大的记录情况下不精确的大小估计 | 0.6 |
spark.memory.storageFraction | 指不会被逐出的存储内存数量,表示为spark.memory.fraction预留区域大小的一部分。这个值越高,可用来执行的工作内存就越少,任务可能会更频繁地溢出到磁盘上 | 0.5 |
spark.memory.useLegacyMode | 是否开启Spark 1.5及以前使用的遗留内存管理模式。遗留模式将堆空间严格地划分为固定大小的区域,如果没有对应用程序进行调优,可能会导致过度溢出。除非启用,否则不会读取以下已弃用的内存片段配置: spark.shuffle.memoryFraction spark.storage.memoryFraction spark.storage.unrollFraction | false |
spark.shuffle.memoryFraction | (在spark 2.4中已弃用) 仅在spark.memory.useLegacyMode被启用时读取。在shuffle期间用于聚合和cogroups 的Java堆的一部分。在任何给定时间,用于shuffle的所有内存映射的集合大小都受此限制的限制,超过此限制,内容将开始溢出到磁盘。如果经常发生溢出,可以考虑以牺牲spark.storage.memoryFraction为代价增加这个值 | 0.2 |
spark.storage.memoryFraction | (在spark 2.4中已弃用)仅在spark.memory.useLegacyMode被启用时读取。用于Spark内存缓存的Java堆的一部分。这个值不应该大于JVM中对象的老生代(默认情况下为堆的0.6),但是如果您配置自己的老代大小,可以增加它。 | 0.6 |
spark.storage.unrollFraction | (已弃用)仅在spark.memory.useLegacyMode被启用时读取。用于在内存中展开块的部分。这是通过在没有足够的空闲存储空间来展开整个新块时删除现有块来动态分配的。 | 0.2 |
spark.executor.memoryOverhead | 为每个执行程序分配的堆外内存(单位是MB). 用于 VM overheads, interned strings, other native overheads,等的内存。这倾向于随着executor 的数量而增长(通常是6-10%)。这个选项目前在YARN和Kubernetes上支持。
| executorMemory * 0.10, with minimum of 384 |
spark.driver.memoryOverhead | 在集群模式下,每个driver分配的堆外内存的数量(单位是MB). 用于 VM overheads, interned strings, other native overheads,等的内存。它会随着container 数量而增长(通常是6-10%)。这个选项目前在YARN和Kubernetes上支持。 | driverMemory * 0.10, with minimum of 384 |
spark.yarn.am.memoryOverhead | spark.driver一样。memoryOverhead,但是对于客户端模式下的YARN应用Master。 | AM memory * 0.10, with minimum of 384 |
以下参数已经弃用:
配置项 | 说明 | 默认值 |
| 如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,也就是开启优化后的HashShuffleManager。 作用:有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能 目前使用SortShuffleManager | true |
spark.yarn.executor.memoryOverhead | executor执行的时候,用的内存可能会超过executor-memoy,所以会为executor额外预留一部分内存。spark.yarn.executor.memoryOverhead代表了这部分内存。这个参数如果没有设置,会有一个自动计算公式(位于ClientArguments.scala中),请使用spark.executor.memoryOverhead
| executorMemory * 0.10, with minimum of 384 |
3.2. 使用示例
#代码
spark = (
SparkSession.builder
.master('yarn')
.appName('StackOverflow')
.config('spark.driver.memory', '35g')
.config('spark.executor.cores', 5)
.config('spark.executor.memory', '35g')
.config('spark.dynamicAllocation.enabled', True)
.config('spark.dynamicAllocation.maxExecutors', 25)
.config('spark.yarn.executor.memoryOverhead', '4096')
.getOrCreate()
)
sc = spark.sparkContext
#命令行
spark-submit --class com.suntek.algorithm.process.main \
--master yarn \
--driver-cores 1 \
--num-executors 3 \
--executor-cores 7 \
--conf "spark.driver.memory=1g" \
--conf "spark.executor.memory=3g" \
--conf "spark.memory.fraction=0.8" \
--conf "spark.memory.storageFraction=0.3" \
--conf "spark.shuffle.memoryFraction=0.7" \
--conf "spark.executor.memoryOverhead=3g" \
--conf "spark.task.cpus=2" \
--conf "spark.port.maxRetries=500" \
参考
更多推荐
所有评论(0)