Spark 程序与PySpark交互流程及Spark-Submit相关参数说明
spark-submit 这个命令 是我们spark提供的一个专门用于提交spark程序的客户端, 可以将spark程序提交到各种资源调度平台上: 比如说 local(本地), spark集群,yarn集群, 云上调度平台(k8s …指的在将Spark任务提交到集群(YARN, Spark集群为主)的时候,提供两种提交部署方案: client模式 , cluster模式。Spark On Yarn
PySpark课程笔记
- 基于Pycharm实施入门案例
1.1 从HDFS上读取文件并实现排序
从HDFS中读取数据, 并对数据进行排序, 最后写入到HDFS上
# 演示: pySpark入门案例: WordCount
# 需求: 从HDFS中读取数据, 对数据进行统计分析(WordCount). 最后将结果根据单词数量进行倒序排序, 并将结果写出HDFS上
from pyspark import SparkContext, SparkConf
import os
# 锁定远端python版本:
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print("WordCount案例: 从HDFS读取数据")
# 1- 创建SparkContext对象:
conf = SparkConf().setMaster('local[*]').setAppName('wd')
sc = SparkContext(conf=conf)
# 2- 读取HDFS上文件数据
rdd_init = sc.textFile('hdfs://node1:8020/pyspark_data/words.txt')
# 3- 对数据执行切割: 每一行都有可能产生多个单词, 所以这里切割, 是一个 1对多操作 采用flatMap()
rdd_flatMap = rdd_init.flatMap(lambda line: line.split())
# 4- 将每一个单词转换为 单词,1 1对 1转换
rdd_map = rdd_flatMap.map(lambda word: (word,1))
# 5- 根据 key 对value数据进行聚合统计
rdd_res = rdd_map.reduceByKey(lambda agg,curr: agg + curr)
# 6 - 对结果数据进行排序
#rdd_sort = rdd_res.sortBy(lambda wd_tup: wd_tup[1],ascending=False)
# 此种没有任何意义, 仅仅为了演示这几个API
#rdd_res = rdd_res.map(lambda res_tup: (res_tup[1],res_tup[0]))
#rdd_sort = rdd_res.sortByKey(ascending=False)
#rdd_sort = rdd_sort.map(lambda res_tup: (res_tup[1], res_tup[0]))
# 7- 写出到HDFS中
print(rdd_res.top(10,lambda wd_tup: wd_tup[1])) # top只能进行降序排序,直接触发执行
#rdd_sort.saveAsTextFile('hdfs://node1:8020/pyspark_data/output')
# 8- 释放资源
sc.stop()
1.2 基于Spark-Submit方式运行
基础格式:
./spark-submit --master ... 文件
说明:
./spark-submit: 用于将spark程序提交到指定的平台来运行, 同时可以设置各种相关配置参数
示例:
./spark-submit --master local[*] /export/data/workspace/sz30_pyspark_parent/_01_pyspark_base/src/_02_pySpark_wd.py
- Spark On Yarn 环境搭建
2.1 Spark On Yarn的本质
Spark On Yarn的本质: 指的将Spark程序提交到Yarn集群中, 通过yarn进行统一的调度运行操作
这种操作, 将会是以后主要的提交上线部署的方式
2.2 配置 Spark On Yarn
整个配置操作, 大家可参考<<spark的部署文档>>即可
文档私信获取
在配置的时候, 一定要细心, 多校验
2.3 提交应用测试
- 测试1: 测试一下spark自带的一些测试脚本: pi
cd /export/server/spark/bin
./spark-submit \
--master yarn \
--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
--conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
/export/server/spark/examples/src/main/python/pi.py \
10
- 测试2: 测试一下我们自己编写的WordCount案例
cd /export/server/spark/bin
./spark-submit \
--master yarn \
--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
--conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
/export/data/workspace/sz30_pyspark_parent/_01_pyspark_base/src/_02_pySpark_wd.py
注意:
在python脚本中, 需要将setMaster参数设置为yarn 或者 直接删除此参数配置
说明:
Spark程序运行主要由两部分组成:
Driver程序(JVM程序): Spark驱动程序(任务的管家) 或者 类似于MR中applicationMaster
主要作用: 资源的申请, 任务的分配, 任务的监控管理
Executor程序(JVM程序):执行器, 可以理解为是一个线程池, 内部运行多个线程(Task)
主要作用: 通过内部多个线程来执行具体的任务
2.4 两种部署方式说明
指的在将Spark任务提交到集群(YARN, Spark集群为主)的时候,提供两种提交部署方案: client模式 , cluster模式
本质区别: Spark程序中Driver程序运行在什么位置上
client模式: 客户端, Driver程序运行在执行spark-submit所在节点上 默认就是client模式
好处: 由于Driver是运行在客户端, 当执行完成后, 需要查看结果, 此时executor会将结果返回给Driver, Driver在客户端, 直接答应, 我们直接在客户端看到执行结果 (方便测试)
在客户端模式下, 不存在Driver的日志, 因为日志是直接输出客户端
弊端: 由于Driver和executor有可能不在同一个环境中,会导致中间网络传输效率比较低, 从而影响整体的效率
此种方式一般在生产环境中不使用, 主要使用在测试环境
cluster模式: 集群模式, Driver程序运行在提交集群所在的某一个节点上
好处: Driver程序和executor都在同一个集群环境中, 在进行传输数据的时候, 可以更大利用内部网络带宽优势, 提升效率
弊端: 不方便测试, Driver运行在集群环境中,所有的内容全部都会记录到日志文件中, 无法会给提交的客户端, 所以客户端想要查看结果, 需要看日志
此种方式一般用于生产环境
如何使用这两种方式呢?
cd /export/server/spark/bin
./spark-submit \
--master yarn \
--deploy-mode client | cluster \
--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
--conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
/export/data/workspace/sz30_pyspark_parent/_01_pyspark_base/src/_02_pySpark_wd.py
演示cluster:
cd /export/server/spark/bin
./spark-submit \
--master yarn \
--deploy-mode cluster \
--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
--conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
/export/data/workspace/sz30_pyspark_parent/_01_pyspark_base/src/_02_pySpark_wd.py
- Spark 程序与PySpark交互流程
Spark和PySpark交互流程: 提交到Spark集群, 部署方式为Client
1- 启动Driver程序
2- 向Master申请资源
3- Master根据要申请的资源, 返回对应资源列表
executor1 : node1 1核 1GB
executor2: node3 1核 1GB
4- 连接对应worker节点, 通知他们启动Executor,当我worker启动完成后, 需要反向注册回Driver(通知)
5- Driver开始执行Main函数:
5.1 初始化sc对象: 构建SparkContext, 基于py4j. 将python中定义的如何初始化sc对象的代码转换为java代码进行执行处理
5.2 当Driver程序构建好SC对象后, 开始将后续的所有的RDD相关的算子全部合并在一起, 根据rdd之间的依赖的关系, 形成一个DAG执行流程图 , 划分出一共有多少个阶段 以及每个阶段需要运行多少个线程, 最后每个线程需要运行在那个executor上 (任务分配)
5.3 当确定每个节点中有多少个线程 以及每个线程应该运行在那个executor后, 接下来将这些任务信息发送给对应executor, 让其进行执行处理
5.4 当executor接收到任务后, 根据任务信息, 开始运行处理, 当Task运行完成后, 如果需要将结果返回给Driver(比如: 执行collect()),直接通过网络返回执行, 如果不需要返回(执行:saveasTextFile()),线程直接处理, 当内部Task执行完成后, executor会通知Driver已经执行完成了
5.5 当Driver程序接收到所有的节点都运行完成后, 执行 后续的非RDD的代码, 并最终将sc对象关闭, 通知Master执行完成了, Master回收资源
Spark和PySpark交互流程: 提交到Spark集群, 部署方式为Cluster
1- 首先会先将任务信息提交到Master主节点上
2- 当Master收到任务信息后, 首先会根据Driver的资源信息, 随机找一台worker节点用于启动Driver程序,并将任务信息交给Driver
3- 当对应worker节点收到请求后, 开启启动Driver, 启动后会和Master保持心跳机制,告知Master启动成功了, 启动后立即开始申请资源(executor)
4- Master根据要申请的资源, 返回对应资源列表
executor1 : node1 1核 1GB
executor2: node3 1核 1GB
5- 连接对应worker节点, 通知他们启动Executor,当我worker启动完成后, 需要反向注册回Driver(通知)
6- Driver开始执行Main函数:
6.1 初始化sc对象: 构建SparkContext, 基于py4j. 将python中定义的如何初始化sc对象的代码转换为java代码进行执行处理
6.2 当Driver程序构建好SC对象后, 开始将后续的所有的RDD相关的算子全部合并在一起, 根据rdd之间的依赖的关系, 形成一个DAG执行流程图 , 划分出一共有多少个阶段 以及每个阶段需要运行多少个线程, 最后每个线程需要运行在那个executor上 (任务分配)
6.3 当确定每个节点中有多少个线程 以及每个线程应该运行在那个executor后, 接下来将这些任务信息发送给对应executor, 让其进行执行处理
6.4 当executor接收到任务后, 根据任务信息, 开始运行处理, 当Task运行完成后, 如果需要将结果返回给Driver(比如: 执行collect()),直接通过网络返回执行, 如果不需要返回(执行:saveasTextFile()),线程直接处理, 当内部Task执行完成后, executor会通知Driver已经执行完成了
6.5 当Driver程序接收到所有的节点都运行完成后, 执行 后续的非RDD的代码, 并最终将sc对象关闭, 通知Master执行完成了, Master回收资源
Spark和PySpark交互流程: 提交到Yarn集群, 部署方式为Client
1- 启动Driver程序
2- 连接Yarn的主节点(resourceManager),向主节点提交一个资源的任务(目标: 启动executor)
3- Yarn的resourceManager接收到任务后, 开始随机在某一个nodemanager节点上启动appMaster, AappMaster启动后会和resourceManager建立心跳机制, 报告已经启动完成了
4- AppMaster根据资源信息要求,向resourceManager申请资源, 通过心跳的方式将申请资源信息发送到RM,然后不断的询问RM是否已经准备好了
5- 当AppMaster一旦检测到RM中已经将资源准备好了,那么就会立即将资源分配结果信息获取到, 根据分配资源信息在对应的nodemanager节点上启动进程(executor)
6- 当executor启动完成后, 通知appMaster 同时反向注册到Driver端(通知)
5- Driver收到各个executor的注册信息后, 开始执行Main函数:
5.1 初始化sc对象: 构建SparkContext, 基于py4j. 将python中定义的如何初始化sc对象的代码转换为java代码进行执行处理
5.2 当Driver程序构建好SC对象后, 开始将后续的所有的RDD相关的算子全部合并在一起, 根据rdd之间的依赖的关系, 形成一个DAG执行流程图 , 划分出一共有多少个阶段 以及每个阶段需要运行多少个线程, 最后每个线程需要运行在那个executor上 (任务分配)
5.3 当确定每个节点中有多少个线程 以及每个线程应该运行在那个executor后, 接下来将这些任务信息发送给对应executor, 让其进行执行处理
5.4 当executor接收到任务后, 根据任务信息, 开始运行处理, 当Task运行完成后, 如果需要将结果返回给Driver(比如: 执行collect()),直接通过网络返回执行, 如果不需要返回(执行:saveasTextFile()),线程直接处理, 当内部Task执行完成后, executor会通知Driver已经执行完成了, 同时会执行完成的状态通知appMaster, AppMaster收到全部的节点都结果完成后, 通知RM 任务已经执行完成, RM通知appMaster可以退出(自杀过程),并且回收yarn的资源
5.5 当Driver程序接收到所有的节点都运行完成后, 执行 后续的非RDD的代码, 并最终将sc对象关闭
Spark和PySpark交互流程: 提交到Yarn集群, 部署方式为Cluster
Yarn的client模式和集群模式的区别:
client模式下:
Driver程序和 appMaster程序都是独立的两个程序
Driver程序负责任务的分配, 任务的监控的工作,与任务相关的管理操作
appMaster程序负责申请资源, 启动executor
cluster模式下:
Driver程序和appMaster程序合二为一: 共同负责 资源申请, 任务的分配 任务的监控等各项工作
-
Spark-Submit相关参数说明
spark-submit 这个命令 是我们spark提供的一个专门用于提交spark程序的客户端, 可以将spark程序提交到各种资源调度平台上: 比如说 local(本地), spark集群,yarn集群, 云上调度平台(k8s …)
spark-submit在提交的过程中, 设置非常多参数, 调整任务相关信息
-
基本参数设置
-
Driver的资源配置参数
-
executor的资源配置参数
更多推荐
所有评论(0)