SparkCore 之旅
SparkCore 之旅Spark概述Spark是什么Spark and HadoopSpark or HadoopSpark 核心模块Spark快速上手增加Scala插件增加依赖关系WordCountSpark运行环境Local模式解压缩文件启动Local环境命令行工具退出本地模式提交应用Standalone模式Yarn模式K8S & Mesos模式Windows模式部署模式对比端口号S
Spark概述
Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎
- 官网地址:http://spark.apache.org/
- 文档查看地址:https://spark.apache.org/docs/3.0.0/
- 下载地址:https://spark.apache.org/downloads.html , https://archive.apache.org/dist/spark/
历史
Hadoop 历史
2011 年 发布 1.x 版本
- NameNode 不能高可用
- MR 框架将资源调度和任务调度耦合在一起
- MR 框架基于磁盘计算,性能比较低
2013 年 发布 2.x 版本
- NameNode 高可用
- 将资源调度和任务调度解耦
- 计算框架可插拔
Spark 历史
Hadoop :
2013 年 2.x 版本 ( YARN )
Spark :
2013 年 成为 Apache 孵化项目, 将资源和任务调度分开
对比
Hadoop 的 MR 框架和 Spark 框架都是数据处理框架
MR
- Hadoop是由 Java 语言编写
- MR 是一种编程模型
- MR 采用创建新的进程
- 多个 MR 作业之间的数据交互依赖于磁盘交互
- 多个作业之间数据通信是基于磁盘
- 内存资源不够 , 选择 MR
- MR 不能处理循环迭代式数据流处理
Spark
- Spark 由 Scala 语言开发
- Spark 用于数据计算
- Spark 采用 fork 线程的方式
- Spark 在 shuffle 时将数据写入磁盘
- Spark 多个作业之间数据通信是基于内存
- Spark 的缓存机制比 HDFS 的缓存机制高效
- Spark SQL 操作结构化数据
- Spark Streaming 对实时数据进行流式计算
- 支持复杂的数据挖掘算法和图形计算算法
- 支持 迭代式计算,图形计算
- 支持 机器学习中ALS、凸优化梯度下降
- Spark 将计算单元缩小到更适合并行计算和重复使用的 RDD 计算模型
Spark 核心模块
Spark Core
提供 Spark 的基本功能,如 : 任务调度、内存管理、错误恢复、与存储系统交互、弹性分布式数据集(Resilient Distributed DataSet,RDD),扩展了:Spark SQL,Spark Streaming,GraphX , MLlib
Spark SQL
Spark 操作结构化数据的组件。可以使用 SQL 或 HQL 来查询数据 , 支持多种数据源,如 : Hive表、Parquet 以 JSON
Spark Streaming
对实时数据进行流式计算的组件
Spark MLlib
提供的一个机器学习算法库。如 : 分类、回归、聚类、协同过滤等、模型评估、数据导入
Spark GraphX
面向图计算提供的框架与算法库
集群管理器
可以在一个计算节点到数千个计算节点之间伸缩计算 , 支持在各种集群管理器(Cluster Manager)上运行,如 : Hadoop YARN、Apache Mesos,Spark 独立调度器
Spark快速上手
在生产环境中,通常会在 IDEA 中编制程序,然后打成 Jar 包,然后提交到集群
增加Scala插件
Spark 由 Scala 语言开发的,当前使用的 Spark 版本为3.0.0,默认采用的Scala 编译版本为 2.12,IDEA 开发工具中含有 Scala 开发插件
增加依赖关系
修改 Maven 项目中的 POM 文件,增加 Spark 框架的依赖关系
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
右击模块 , 点击 Add Framework Support
WordCount
流程图 :
实现一个 WordCount
package com.cpucode.spark.wc.reduceByKey
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author : cpucode
* @date : 2022/2/9 19:52
* @github : https://github.com/CPU-Code
* @csdn : https://blog.csdn.net/qq_44226094
*/
object ReduceByKey {
def main(args: Array[String]): Unit = {
// Spark是一个计算【框架】
// 1. 能找到他 :增加依赖
// 2. 获取Spark的连接(环境)
val conf = new SparkConf().setMaster("local").setAppName("wordCount")
//2.创建SparkContext,该对象是提交Spark App的入口
val context = new SparkContext(conf)
// 读取文件
val lines = context.textFile("wordCount/src/main/resources/word.txt")
// 将文件中的数据进行了分词
val word = lines.flatMap(_.split(" "))
// word => (word, 1)
val wordToOne = word.map((_, 1))
// reduceByKey : 按照key分组, 对相同的key的value进行reduce
// (word, 1)(word, 1)(word, 1)(word, 1)(word, 1)
// reduce(1,1,1,1,1)
// 框架的核心就是封装
val wordCount = wordToOne.reduceByKey(_ + _)
// 将统计结果打印在控制台上
wordCount.collect().foreach(println)
//8.关闭连接
context.stop()
}
}
执行过程中,会产生的执行日志,在项目的 resources 目录中创建 log4j.properties 文件,并添加日志配置信息:
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to ERROR. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=ERROR
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=ERROR
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
链接 :
https://github.com/CPU-Code/spark
Spark运行环境
Spark 作为一个数据处理的 计算引擎,可以运行多个环境下
部署模式 :
- Local 模式:在本地部署单个 Spark 服务
- Standalone 模式:Spark 自带的任务调度模式(国内常用)
- YARN 模式:Spark 使用 Hadoop 的 YARN 组件进行资源与任务调度(国内常用)
- Mesos 模式:Spark 使用 Mesos 平台进行资源与任务的调度
- K8S 模式:
- Windows 模式:
Local模式
Local 模式运行在一台计算机上的模式,常用于在本机上练手和测试
解压缩文件
将 spark-3.0.0-bin-hadoop3.2.tgz
文件上传到 Linux 并解压缩,放置在指定位置,路径中不要包含中文或空格
tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module/
cd /opt/module
重命名
mv spark-3.0.0-bin-hadoop3.2/ spark-3.0.0-local/
启动Local环境
./bin/spark-shell
启动成功后,可以输入网址进行 Web UI 监控页面访问
http://cpucode101:4040
命令行工具
data 目录中,添加 word.txt 文件
vim word.txt
cpu text code
cpucode cpu
code
sc.textFile("data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
退出本地模式
按键
Ctrl+C
输入Scala指令
:quit
提交应用
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2] \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10
--class
: 执行程序的主类--master local[2]
: 部署模式,默认 : 本地模式,数字 : 分配的虚拟CPU核数量local
: 不指定线程数,所有计算运行在一个线程当中,无并行计算local[*]
:默认模式。根据 CPU 核来设置线程数。如 : 8 核,自动设置 8 个线程spark-examples_2.12-3.0.0.jar
: 运行的应用类所在的 jar 包- 数字10 : 设定当前应用的任务数量
Standalone 模式
Spark 的 独立部署(Standalone)模式体现了经典的 master-slave 模式
集群规划 :
cpu101 | cpu102 | cpu103 | ||
---|---|---|---|---|
Spark | Master | √ | ||
Spark | Worker | √ | √ | √ |
Master & Worker 关系 :
Driver & Executor 关系 :
部署
https://blog.csdn.net/qq_44226094/article/details/123851417
运行流程
Spark 两种模式 , 区别:Driver程序的运行节点 :
- standalone-client
- standalone-cluster
standalone-client
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://cpu101:7077,cpu102:7077 \
--executor-memory 2G \
--total-executor-cores 2 \
--deploy-mode client \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10
--deploy-mode client
: Driver 程序运行在本地客户端
standalone-cluster
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://cpu101:7077,cpu102:7077 \
--executor-memory 2G \
--total-executor-cores 2 \
--deploy-mode cluster \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10
--deploy-mode cluster
: Driver程序运行在集群
Yarn 模式
Spark 主要是计算框架,而不是资源调度框架,主流 : Yarn
部署
https://blog.csdn.net/qq_44226094/article/details/123851080
K8S & Mesos模式
Mesos 是Apache下的开源分布式资源管理框架,它被称为是分布式系统的内核
容器化部署是目前业界很流行的一项技术,基于 Docker 镜像运行能够让用户更加方便地对应用进行管理和运维。
容器管理工具中最为流行的就是 Kubernetes(k8s),而 Spark 也在最近的版本中支持了k8s部署模式
https://spark.apache.org/docs/latest/running-on-kubernetes.html
Windows模式
Spark 提供了可以在 windows 系统下启动本地集群的方式
解压缩文件
将文件 spark-3.0.0-bin-hadoop3.2.tgz
解压缩到无中文无空格的路径中
启动本地环境
执行解压缩文件路径下 bin 目录中的 spark-shell.cmd
文件,启动 Spark 本地环境
命令行提交应用
在DOS命令行窗口中执行提交指令
spark-submit --class org.apache.spark.examples.SparkPi --master local[2] ../examples/jars/spark-examples_2.12-3.0.0.jar 10
部署模式对比
模式 | Spark安装机器数 | 需启动的进程 | 所属者 | 应用场景 |
---|---|---|---|---|
Local | 1 | 无 | Spark | 测试 |
Standalone | 3 | Master 及 Worker | Spark | 单独部署 |
Yarn | 1 | Yarn 及 HDFS | Hadoop | 混合部署 |
端口号
- Spark 查看当前 Spark-shell 运行任务情况端口号:4040(计算)
- Spark Master 内部通信服务端口号:7077
- Standalone 模式下,Spark Master Web 端口号:8080(资源)
- Spark 历史服务器端口号:18080
- Hadoop YARN 任务运行情况查看端口号:8088
- Hadoop 历史服务器端口号:19888
Spark运行架构
运行架构
Spark : 一个计算引擎,采用标准 master-slave 的结构
Spark 执行结构 :
- Driver : master,负责管理整个集群中的作业任务调度
- Executor : slave,负责实际执行任务
核心组件
Spark 两个核心组件:
- Driver
- Executor
Driver
Driver : 驱使整个应用运行起来的程序 ( Driver 类 )
Spark 驱动器节点 : 执行 Spark 任务中的 main 方法,负责实际代码的执行工作
Driver 执行时主要负责:
- 将用户程序转化为作业( job )
- 在 Executor 之间调度任务( task )
- 跟踪 Executor 的执行情况
- 通过 UI 展示查询运行情况
Executor
Spark Executor 是集群中工作节点(Worker)中的一个JVM进程,负责在 Spark 作业中运行具体任务(Task),任务彼此之间相互独立
Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在
如果有 Executor 节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他 Executor 节点上继续运行
Executor 有两个核心功能:
- 负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程
- 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存数据加速运算
Master & Worker
Spark 集群的独立部署环境中,不需要依赖其他的资源调度框架,自身就实现了资源调度的功能
环境中还有其他两个核心组件:
- Master
- Worker
Master 是一个进程,主要负责资源的调度和分配,并进行集群的监控等职责,类似于 Yarn 环境中的 RM
Worker 也是进程,一个 Worker 运行在集群中的一台服务器上,由Master分配资源对数据进行并行的处理和计算,类似于Yarn环境中NM
ApplicationMaster
Hadoop 用户向 YARN 集群提交应用程序时 , 提交程序中应该包含 ApplicationMaster,用于向资源调度器申请执行任务的资源容器 Container,运行用户自己的程序任务 job,监控整个任务的执行,跟踪整个任务的状态,处理任务失败等异常情况
ResourceManager(资源)和 Driver(计算)之间的解耦合靠的就是 ApplicationMaster
Executor与Core(核)
Spark Executor 是集群中运行在工作节点(Worker)中的一个JVM进程,是整个集群中的专门用于计算的节点。在提交应用中,可以提供参数指定计算节点的个数,以及对应的资源
资源 : 工作节点 Executor 的内存大小和 使用的虚拟CPU核(Core)数量
应用程序相关启动参数如下:
名称 | 说明 |
---|---|
–num-executors | 配置 Executor 的数量 |
–executor-memory | 配置每个 Executor 的内存大小 |
–executor-cores | 配置每个 Executor 的虚拟 CPU core 数量 |
并行度(Parallelism)
在分布式计算框架中一般都是多个任务同时执行,由于任务分布在不同的计算节点进行计算,所以能够真正地实现多任务并行执行
将整个集群并行执行任务的数量称之为并行度
一个作业到底并行度取决于框架的默认配置 , 应用程序也可以在运行过程中动态修改
有向无环图(DAG)
大数据计算引擎框架 :
- 第一代计算引擎 : Hadoop 所承载的 MapReduce,它将计算分为两个阶段,分别为 Map阶段 和 Reduce阶段。上层应用设法去拆分算法,要在上层应用实现多个 Job 的串联,以完成一个完整的算法,例如迭代计算。 DAG 框架克服了弊端
- 第二代计算引擎 : 支持 DAG 的框架。如 Tez 以及更上层的 Oozie , 是批处理的任务
- 第三代计算引擎 : Spark 是 Job 内部的 DAG 支持(不跨越 Job),以及实时计算
有向无环图 : 由 Spark 程序直接映射成的数据流的高级抽象模型。就是将整个程序计算的执行过程用图形表示出来 , 用于表示程序的拓扑结构
DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环
提交流程
提交流程 : 应用程序通过 Spark 客户端提交给 Spark 运行环境执行计算的流程
提交流程是基于 Yarn 环境
Spark应用程序提交到 Yarn 环境中执行的两种部署执行的方式:
- Client
- Cluster
两种模式主要区别在于:Driver程序的运行节点位置
Spark 两种模式 , 区别:Driver程序的运行节点 :
- yarn-client : Driver 程序运行在客户端,适用于交互、调试
- yarn-cluster : Driver 程序运行在由 ResourceManager 启动 , 适用于生成
Yarn Client模式
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10
Client 模式将用于监控和调度的 Driver 模块在客户端执行,不在 Yarn中,一般用于测试
- Driver 在任务提交的本地机器上运行
- Driver 启动后会和 ResourceManager 通讯申请启动 ApplicationMaster
- ResourceManager 分配 container,在合适的 NodeManager 上启动 ApplicationMaster,负责向 ResourceManager 申请 Executor 内存
- ResourceManager 接到 ApplicationMaster 的资源申请后会分配 container,然后ApplicationMaster 在资源分配指定的 NodeManager 上启动 Executor 进程
- Executor 进程启动后会向 Driver 反向注册,Executor 全部注册完成后 Driver 开始执行 main 函数
- 之后执行到 Action 算子时,触发一个 Job,并根据宽依赖开始划分 stage,每个 stage 生成对应的 TaskSet,之后将 task 分发到各个 Executor 上执行
Yarn Cluster模式
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10
Cluster 模式将用于监控和调度的 Driver 模块启动在 Yarn集群资源中执行。一般应用于实际生产环境
- 在 YARN Cluster 模式下,任务提交后会和 ResourceManager 通讯申请启动 ApplicationMaster
- 随后ResourceManager 分配 container,在合适的 NodeManager 上启动 ApplicationMaster ,此时的 ApplicationMaster 就是 Driver
- Driver 启动后向 ResourceManager 申请 Executor 内存,ResourceManager 接到ApplicationMaster 的资源申请后会分配 container,然后在合适的 NodeManager 上启动Executor 进程
- Executor 进程启动后会向 Driver 反向注册,Executor 全部注册完成后 Driver 开始执行main函数
- 之后执行到 Action 算子时,触发一个Job,并根据宽依赖开始划分 stage,每个 stage 生成对应的 TaskSet,之后将 task 分发到各个 Executor 上执行
更多推荐
所有评论(0)