PySpark课程笔记

  1. 基于Pycharm实施入门案例

1.1 从HDFS上读取文件并实现排序

从HDFS中读取数据, 并对数据进行排序, 最后写入到HDFS上

# 演示: pySpark入门案例:  WordCount
# 需求: 从HDFS中读取数据, 对数据进行统计分析(WordCount). 最后将结果根据单词数量进行倒序排序, 并将结果写出HDFS上
from pyspark import SparkContext, SparkConf
import os

# 锁定远端python版本:
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    print("WordCount案例: 从HDFS读取数据")

    # 1- 创建SparkContext对象:
    conf = SparkConf().setMaster('local[*]').setAppName('wd')
    sc = SparkContext(conf=conf)

    # 2- 读取HDFS上文件数据
    rdd_init = sc.textFile('hdfs://node1:8020/pyspark_data/words.txt')

    # 3- 对数据执行切割: 每一行都有可能产生多个单词, 所以这里切割, 是一个 1对多操作 采用flatMap()
    rdd_flatMap = rdd_init.flatMap(lambda line: line.split())

    # 4- 将每一个单词转换为 单词,1    1对 1转换
    rdd_map = rdd_flatMap.map(lambda word: (word,1))

    # 5- 根据 key 对value数据进行聚合统计
    rdd_res = rdd_map.reduceByKey(lambda agg,curr: agg + curr)

    # 6 - 对结果数据进行排序
    #rdd_sort = rdd_res.sortBy(lambda wd_tup: wd_tup[1],ascending=False)
    # 此种没有任何意义, 仅仅为了演示这几个API
    #rdd_res = rdd_res.map(lambda res_tup: (res_tup[1],res_tup[0]))
    #rdd_sort = rdd_res.sortByKey(ascending=False)
    #rdd_sort = rdd_sort.map(lambda res_tup: (res_tup[1], res_tup[0]))

    # 7- 写出到HDFS中
    print(rdd_res.top(10,lambda wd_tup: wd_tup[1])) # top只能进行降序排序,直接触发执行
    #rdd_sort.saveAsTextFile('hdfs://node1:8020/pyspark_data/output')

    # 8- 释放资源
    sc.stop()

1.2 基于Spark-Submit方式运行

基础格式: 
	./spark-submit --master ...   文件

说明: 
	./spark-submit: 用于将spark程序提交到指定的平台来运行, 同时可以设置各种相关配置参数

示例: 
./spark-submit --master local[*] /export/data/workspace/sz30_pyspark_parent/_01_pyspark_base/src/_02_pySpark_wd.py 
  1. Spark On Yarn 环境搭建

2.1 Spark On Yarn的本质

Spark On Yarn的本质: 指的将Spark程序提交到Yarn集群中, 通过yarn进行统一的调度运行操作

这种操作, 将会是以后主要的提交上线部署的方式

2.2 配置 Spark On Yarn

整个配置操作, 大家可参考<<spark的部署文档>>即可
文档私信获取
在配置的时候, 一定要细心, 多校验

2.3 提交应用测试

  • 测试1: 测试一下spark自带的一些测试脚本: pi
cd /export/server/spark/bin
./spark-submit \
--master yarn \
--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
--conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
/export/server/spark/examples/src/main/python/pi.py \
10
  • 测试2: 测试一下我们自己编写的WordCount案例
cd /export/server/spark/bin
./spark-submit \
--master yarn \
--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
--conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
/export/data/workspace/sz30_pyspark_parent/_01_pyspark_base/src/_02_pySpark_wd.py 


注意: 
	在python脚本中, 需要将setMaster参数设置为yarn 或者 直接删除此参数配置

说明:

Spark程序运行主要由两部分组成: 
	Driver程序(JVM程序): Spark驱动程序(任务的管家) 或者 类似于MR中applicationMaster
		主要作用: 资源的申请, 任务的分配, 任务的监控管理
	Executor程序(JVM程序):执行器, 可以理解为是一个线程池, 内部运行多个线程(Task) 
		主要作用: 通过内部多个线程来执行具体的任务

2.4 两种部署方式说明

指的在将Spark任务提交到集群(YARN, Spark集群为主)的时候,提供两种提交部署方案: client模式 , cluster模式

	本质区别:  Spark程序中Driver程序运行在什么位置上
	
	client模式: 客户端, Driver程序运行在执行spark-submit所在节点上  默认就是client模式
		好处: 由于Driver是运行在客户端, 当执行完成后, 需要查看结果, 此时executor会将结果返回给Driver, Driver在客户端, 直接答应, 我们直接在客户端看到执行结果 (方便测试)
			在客户端模式下, 不存在Driver的日志, 因为日志是直接输出客户端
		弊端: 由于Driver和executor有可能不在同一个环境中,会导致中间网络传输效率比较低, 从而影响整体的效率
		
		此种方式一般在生产环境中不使用, 主要使用在测试环境
	
	cluster模式: 集群模式, Driver程序运行在提交集群所在的某一个节点上
		好处: Driver程序和executor都在同一个集群环境中, 在进行传输数据的时候, 可以更大利用内部网络带宽优势, 提升效率
		弊端: 不方便测试, Driver运行在集群环境中,所有的内容全部都会记录到日志文件中, 无法会给提交的客户端, 所以客户端想要查看结果, 需要看日志
		
		此种方式一般用于生产环境

如何使用这两种方式呢?

cd /export/server/spark/bin
./spark-submit \
--master yarn \
--deploy-mode client | cluster \
--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
--conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
/export/data/workspace/sz30_pyspark_parent/_01_pyspark_base/src/_02_pySpark_wd.py 


演示cluster:  
cd /export/server/spark/bin
./spark-submit \
--master yarn \
--deploy-mode cluster \
--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
--conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
/export/data/workspace/sz30_pyspark_parent/_01_pyspark_base/src/_02_pySpark_wd.py 

  1. Spark 程序与PySpark交互流程

在这里插入图片描述
Spark和PySpark交互流程: 提交到Spark集群, 部署方式为Client

1- 启动Driver程序
2- 向Master申请资源
3- Master根据要申请的资源, 返回对应资源列表    
	executor1 : node1 1核 1GB    
	executor2:  node3 1核 1GB
4- 连接对应worker节点, 通知他们启动Executor,当我worker启动完成后, 需要反向注册回Driver(通知)
5- Driver开始执行Main函数:

	5.1 初始化sc对象: 构建SparkContext, 基于py4j. 将python中定义的如何初始化sc对象的代码转换为java代码进行执行处理

	5.2 当Driver程序构建好SC对象后, 开始将后续的所有的RDD相关的算子全部合并在一起, 根据rdd之间的依赖的关系, 形成一个DAG执行流程图 , 划分出一共有多少个阶段 以及每个阶段需要运行多少个线程, 最后每个线程需要运行在那个executor上 (任务分配)

	5.3 当确定每个节点中有多少个线程 以及每个线程应该运行在那个executor后, 接下来将这些任务信息发送给对应executor, 让其进行执行处理

	5.4 当executor接收到任务后, 根据任务信息, 开始运行处理, 当Task运行完成后, 如果需要将结果返回给Driver(比如: 执行collect()),直接通过网络返回执行, 如果不需要返回(执行:saveasTextFile()),线程直接处理, 当内部Task执行完成后, executor会通知Driver已经执行完成了

	5.5 当Driver程序接收到所有的节点都运行完成后, 执行 后续的非RDD的代码, 并最终将sc对象关闭, 通知Master执行完成了, Master回收资源

Spark和PySpark交互流程: 提交到Spark集群, 部署方式为Cluster

1- 首先会先将任务信息提交到Master主节点上
2- 当Master收到任务信息后, 首先会根据Driver的资源信息, 随机找一台worker节点用于启动Driver程序,并将任务信息交给Driver

3- 当对应worker节点收到请求后, 开启启动Driver, 启动后会和Master保持心跳机制,告知Master启动成功了, 启动后立即开始申请资源(executor)

4- Master根据要申请的资源, 返回对应资源列表    
	executor1 : node1 1核 1GB    
	executor2:  node3 1核 1GB
5- 连接对应worker节点, 通知他们启动Executor,当我worker启动完成后, 需要反向注册回Driver(通知)
6- Driver开始执行Main函数:

	6.1 初始化sc对象: 构建SparkContext, 基于py4j. 将python中定义的如何初始化sc对象的代码转换为java代码进行执行处理

	6.2 当Driver程序构建好SC对象后, 开始将后续的所有的RDD相关的算子全部合并在一起, 根据rdd之间的依赖的关系, 形成一个DAG执行流程图 , 划分出一共有多少个阶段 以及每个阶段需要运行多少个线程, 最后每个线程需要运行在那个executor上 (任务分配)

	6.3 当确定每个节点中有多少个线程 以及每个线程应该运行在那个executor后, 接下来将这些任务信息发送给对应executor, 让其进行执行处理

	6.4 当executor接收到任务后, 根据任务信息, 开始运行处理, 当Task运行完成后, 如果需要将结果返回给Driver(比如: 执行collect()),直接通过网络返回执行, 如果不需要返回(执行:saveasTextFile()),线程直接处理, 当内部Task执行完成后, executor会通知Driver已经执行完成了

	6.5 当Driver程序接收到所有的节点都运行完成后, 执行 后续的非RDD的代码, 并最终将sc对象关闭, 通知Master执行完成了, Master回收资源

Spark和PySpark交互流程: 提交到Yarn集群, 部署方式为Client
在这里插入图片描述

1- 启动Driver程序
2- 连接Yarn的主节点(resourceManager),向主节点提交一个资源的任务(目标: 启动executor)
3- Yarn的resourceManager接收到任务后, 开始随机在某一个nodemanager节点上启动appMaster, AappMaster启动后会和resourceManager建立心跳机制, 报告已经启动完成了

4- AppMaster根据资源信息要求,向resourceManager申请资源, 通过心跳的方式将申请资源信息发送到RM,然后不断的询问RM是否已经准备好了

5- 当AppMaster一旦检测到RM中已经将资源准备好了,那么就会立即将资源分配结果信息获取到, 根据分配资源信息在对应的nodemanager节点上启动进程(executor)

6- 当executor启动完成后, 通知appMaster 同时反向注册到Driver端(通知)

5- Driver收到各个executor的注册信息后, 开始执行Main函数:

	5.1 初始化sc对象: 构建SparkContext, 基于py4j. 将python中定义的如何初始化sc对象的代码转换为java代码进行执行处理

	5.2 当Driver程序构建好SC对象后, 开始将后续的所有的RDD相关的算子全部合并在一起, 根据rdd之间的依赖的关系, 形成一个DAG执行流程图 , 划分出一共有多少个阶段 以及每个阶段需要运行多少个线程, 最后每个线程需要运行在那个executor上 (任务分配)

	5.3 当确定每个节点中有多少个线程 以及每个线程应该运行在那个executor后, 接下来将这些任务信息发送给对应executor, 让其进行执行处理

	5.4 当executor接收到任务后, 根据任务信息, 开始运行处理, 当Task运行完成后, 如果需要将结果返回给Driver(比如: 执行collect()),直接通过网络返回执行, 如果不需要返回(执行:saveasTextFile()),线程直接处理, 当内部Task执行完成后, executor会通知Driver已经执行完成了, 同时会执行完成的状态通知appMaster, AppMaster收到全部的节点都结果完成后, 通知RM 任务已经执行完成, RM通知appMaster可以退出(自杀过程),并且回收yarn的资源

	5.5 当Driver程序接收到所有的节点都运行完成后, 执行 后续的非RDD的代码, 并最终将sc对象关闭

Spark和PySpark交互流程: 提交到Yarn集群, 部署方式为Cluster在这里插入图片描述

Yarn的client模式和集群模式的区别:
	client模式下: 
		Driver程序和 appMaster程序都是独立的两个程序 
			Driver程序负责任务的分配, 任务的监控的工作,与任务相关的管理操作
			appMaster程序负责申请资源, 启动executor
		
	cluster模式下:
		Driver程序和appMaster程序合二为一: 共同负责 资源申请, 任务的分配 任务的监控等各项工作
  1. Spark-Submit相关参数说明

    spark-submit 这个命令 是我们spark提供的一个专门用于提交spark程序的客户端, 可以将spark程序提交到各种资源调度平台上: 比如说 local(本地), spark集群,yarn集群, 云上调度平台(k8s …)

    spark-submit在提交的过程中, 设置非常多参数, 调整任务相关信息

  • 基本参数设置
    在这里插入图片描述

  • Driver的资源配置参数
    在这里插入图片描述

  • executor的资源配置参数
    在这里插入图片描述

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐