DataBrick Spark 认证助理开发者指南(一)
欢迎您加入 Apache Spark Python 认证的综合指南,这是为有志于通过 Databricks 获得认证的开发者准备的。在本书中,《使用 Python 的 Apache Spark 认证 Databricks 认证助理开发者》,我将多年的专业知识和实践经验提炼成一本全面指南,以帮助您导航数据科学、人工智能和云计算技术的复杂性,并帮助您为 Spark 认证做准备。通过有洞察力的轶事、可操
原文:
zh.annas-archive.org/md5/5e217f5bf66327f4a93816a8c91503e2
译者:飞龙
前言
欢迎您加入 Apache Spark Python 认证的综合指南,这是为有志于通过 Databricks 获得认证的开发者准备的。
在本书中,《使用 Python 的 Apache Spark 认证 Databricks 认证助理开发者》,我将多年的专业知识和实践经验提炼成一本全面指南,以帮助您导航数据科学、人工智能和云计算技术的复杂性,并帮助您为 Spark 认证做准备。通过有洞察力的轶事、可操作的见解和经过验证的策略,我将为您提供在不断发展的大数据和人工智能技术领域中茁壮成长的工具和知识。
Apache Spark 已成为处理大规模数据的首选框架,使组织能够提取有价值的见解并推动明智的决策制定。凭借其强大的功能和多功能性,Spark 已成为全球数据工程师、分析师和科学家工具箱中的基石。本书旨在成为您掌握 Apache Spark 的全面伴侣,通过结构化的方法来理解核心概念、高级技术和利用 Spark 全部潜力的最佳实践。
本书精心制作,旨在指导您走上成为认证 Apache Spark 开发者的旅程。专注于认证准备,我提供了一种结构化的方法来掌握使用 Python 的 Apache Spark,确保您为通过认证考试和验证您的专业知识做好了充分准备。
本书面向的对象
本书专为有志于成为使用 Python 进行 Apache Spark 认证的开发者而定制。无论您是一位经验丰富的数据专业人士,希望验证您的专业知识,还是一位渴望深入了解大数据分析的新手,本指南都能满足所有技能水平的需求。从寻求在 Spark 中建立坚实基础的新手到希望提高技能并为认证做准备的经验丰富的从业者,本书为任何热衷于利用 Apache Spark 力量的个人提供了宝贵的资源。
无论您是希望提升职业前景、验证您的技能,还是在数据工程领域寻求新的机会,本指南都针对您的认证目标量身定制。专注于考试准备,我们提供针对性的资源和实用见解,以确保您在认证旅程中的成功。
本书提供了指导性的建议和相关方法,帮助您在拥有 Spark 工作知识的基础上在大数据领域留下足迹,并帮助您通过 Spark 认证考试。本书期望您具备 Python 的工作知识,但不需要任何先前的 Spark 知识,尽管拥有 PySpark 的工作知识将非常有帮助。
本书涵盖的内容
在接下来的章节中,我们将涵盖以下主题。
第一章,认证指南和考试概述,介绍了 PySpark 认证考试的基础知识以及如何准备考试。
第二章,理解 Apache Spark 及其应用,深入探讨了 Apache Spark 的基本原理,探讨了其核心功能、生态系统和实际应用。它介绍了 Spark 在处理各种数据处理任务中的多功能性,如批量处理、实时分析、机器学习和图处理。实际示例说明了 Spark 在各个行业中的应用以及其在现代数据架构中的演变角色。
第三章,Spark 架构和转换,深入剖析了 Apache Spark 的架构,阐述了 RDD(弹性分布式数据集)抽象、Spark 的执行模型以及转换和操作的重要性。它探讨了窄转换和宽转换的概念,它们对性能的影响,以及 Spark 的执行计划如何优化分布式计算。实际示例有助于更好地理解这些概念。
第四章,Spark DataFrame 及其操作,专注于 Spark 的 DataFrame API,并探讨了它在结构化数据处理和分析中的作用。它涵盖了 DataFrame 的创建、操作以及各种操作,如过滤、聚合、连接和分组。示例说明了 DataFrame API 在处理结构化数据中的易用性和优势。
第五章,Spark 的高级操作和优化,在扩展你的基础知识的基础上,深入探讨了 Spark 的高级操作,包括广播变量、累加器、自定义分区以及与外部库协同工作。它探讨了处理复杂数据类型、优化内存使用以及利用 Spark 的可扩展性进行高级数据处理任务的技术。
本章还探讨了 Spark 的性能优化策略,强调了自适应查询执行的重要性。它探讨了优化 Spark 作业动态技术,包括运行时查询规划、自适应连接和数据倾斜处理。提供了实用技巧和最佳实践,以微调 Spark 作业以增强性能。
第六章,Spark 中的 SQL 查询,专注于 Spark 的 SQL 模块,并探讨了 Spark 中类似 SQL 的查询能力。它涵盖了 DataFrame API 与 SQL 的互操作性,使用户能够在分布式数据集上运行 SQL 查询。示例展示了如何使用 SQL 查询在 Spark 中表达复杂的数据操作和分析。
第七章,Spark 中的结构化流,专注于实时数据处理,并介绍了 Spark 处理连续数据流的 API——结构化流。它涵盖了事件时间处理、水印、触发器和输出模式等概念。实际示例演示了如何使用结构化流构建和部署流应用程序。
本章不包括在 Spark 认证考试中,但了解流式处理概念是有益的,因为它们是现代数据工程世界中的核心概念。
第八章,使用 Spark ML 进行机器学习,探讨了 Spark 的机器学习库 Spark ML,深入监督和无监督机器学习技术。它涵盖了各种算法的模型构建、评估和超参数调整。实际示例说明了 Spark ML 在现实世界机器学习任务中的应用。
本章不包括在 Spark 认证考试中,但了解 Spark 中的机器学习概念是有益的,因为它们是现代数据科学世界中的核心概念。
第九章,模拟测试 1,为你提供了第一个模拟测试,以备实际认证考试之用。
第十章,模拟测试 2,为你提供了第二个模拟测试,以备实际认证考试之用。
要充分利用本书
在深入研究章节之前,对 Python 编程的基本理解以及熟悉基本数据处理概念是必不可少的。此外,掌握分布式计算原理以及数据操作和分析经验将大有裨益。全书假设您具备 Python 的工作知识以及数据工程和数据分析的基础概念。具备这些先决条件后,您将准备好开始您的 Apache Spark 认证开发者之旅。
本书涵盖的软件/硬件 | 操作系统要求 |
---|---|
Python | Windows、macOS 或 Linux |
Spark |
代码在您注册 Databricks 社区版并导入您的账户中的 python 文件时将运行最佳。
如果您使用的是本书的数字版,我们建议您亲自输入代码或从本书的 GitHub 仓库(下一节中提供链接)获取代码。这样做将有助于避免与代码复制和粘贴相关的任何潜在错误。
下载示例代码文件
您可以从 GitHub 下载本书的示例代码文件 github.com/PacktPublishing/Databricks-Certified-Associate-Developer-for-Apache-Spark-Using-Python
。如果代码有更新,它将在 GitHub 仓库中更新。
我们还有其他来自我们丰富的书籍和视频目录的代码包可供使用,请访问 github.com/PacktPublishing/
。查看它们!
使用的约定
本书中使用了多种文本约定。
文本中的代码
: 表示文本中的代码单词、数据库表名、文件夹名、文件名、文件扩展名、路径名、虚拟 URL、用户输入和 Twitter 昵称。以下是一个示例:“createOrReplaceTempView()
方法允许我们将处理后的数据作为视图保存在 Spark SQL 中。”
代码块设置如下:
# Perform an aggregation to calculate the average salary
average_salary = spark.sql("SELECT AVG(Salary) AS average_salary FROM employees")
粗体: 表示新术语、重要单词或您在屏幕上看到的单词。例如,菜单或对话框中的单词以粗体显示。以下是一个示例:“考试由60 个问题组成。您尝试这些问题的时间是120 分钟。”
小贴士或重要提示
看起来像这样。
联系我们
读者反馈始终欢迎。
一般反馈: 如果您对本书的任何方面有疑问,请通过电子邮件发送至 customercare@packtpub.com,并在邮件主题中提及书名。
勘误: 尽管我们已经尽最大努力确保内容的准确性,但错误仍然可能发生。如果您在这本书中发现了错误,如果您能向我们报告,我们将不胜感激。请访问 www.packtpub.com/support/errata 并填写表格。
盗版: 如果您在互联网上以任何形式发现我们作品的非法副本,如果您能提供位置地址或网站名称,我们将不胜感激。请通过电子邮件发送至 copyright@packt.com 并附上材料的链接。
如果您有兴趣成为作者: 如果您在某个领域有专业知识,并且您有兴趣撰写或为书籍做出贡献,请访问 authors.packtpub.com。
分享您的想法
现在您已经完成了 Databricks 认证的 Apache Spark 使用 Python 的副开发人员,我们很乐意听听您的想法!如果您从亚马逊购买了这本书,请点击此处直接进入本书的亚马逊评论页面并分享您的反馈或留下评论。
您的评论对我们和科技社区都至关重要,并将帮助我们确保我们提供高质量的内容。
下载本书的免费 PDF 副本
感谢您购买本书!
您喜欢在路上阅读,但无法携带您的印刷书籍到处走吗?
您的电子书购买是否与您选择的设备不兼容?
请放心,现在每购买一本 Packt 书籍,您都可以免费获得该书的 DRM 免费 PDF 版本。
在任何地方、任何地点、任何设备上阅读。直接从您最喜欢的技术书籍中搜索、复制和粘贴代码到您的应用程序中。
优惠不仅限于此,您还可以获得独家折扣、新闻通讯和每天收件箱中的优质免费内容。
按照以下简单步骤获取福利:
- 扫描二维码或访问以下链接
packt.link/free-ebook/9781804619780
-
提交您的购买证明
-
就这样!我们将直接将您的免费 PDF 和其他福利发送到您的邮箱
第一部分:考试概述
这一部分将展示 PySpark 认证考试的基础知识和需要牢记的规则。它将展示考试中提出的不同类型的问题以及如何准备这些问题。
这一部分包含以下章节:
- 第一章,认证指南和考试概述
第一章:认证指南和考试的概述
准备任何任务最初涉及彻底理解手头的问题,然后制定一个应对挑战的策略。在这个规划阶段,为应对挑战的每个方面创建一个逐步的方法是有效的方法。这种方法使得可以单独处理较小的任务,有助于系统地通过挑战,而无需感到不知所措。
本章旨在展示通过 Spark 认证考试逐步方法。在本章中,我们将涵盖以下主题:
-
认证考试的概述
-
考试中可能遇到的不同类型的问题
-
本书其余章节的概述
我们首先概述一下认证考试。
认证考试的概述
考试由 60 个问题 组成。你被给予的时间来尝试这些问题是 120 分钟。这给你大约 每题 2 分钟。
要通过考试,你需要得到 70% 的分数,这意味着你需要在 60 个问题中正确回答 42 个才能通过。
如果你准备充分,这段时间应该足够你回答问题,并在时间结束前进行复习。
接下来,我们将看到这些问题如何在整个考试中分布。
问题的分布
考试问题被分为以下几个广泛的类别。以下表格根据不同类别提供了问题的细分:
主题 | 考试百分比 | 问题数量 |
---|---|---|
Spark 架构:概念理解 | 17% | 10 |
Spark 架构:应用理解 | 11% | 7 |
Spark DataFrame API 应用 | 72% | 43 |
表 1.1:考试细分
通过观察这个分布,你可能会想要在考试准备中更加关注 Spark DataFrame API,因为这个部分涵盖了大约 72% 的考试(大约 43 个问题)。如果你能正确回答这些问题,通过考试就会变得更容易。
但这并不意味着你不应该关注 Spark 架构领域。Spark 架构问题难度各异,有时可能会令人困惑。同时,它们允许你轻松得分,因为架构问题通常很简单。
让我们看看一些其他可用的资源,这些资源可以帮助你为这次考试做准备。
准备考试的资源
当你开始计划参加认证考试时,你必须做的第一件事是掌握 Spark 概念。这本书将帮助你理解这些概念。一旦你完成了这些,进行模拟考试将很有用。这本书中提供了两个模拟考试供你利用。
此外,Databricks 提供了模拟考试,这对考试准备非常有用。您可以在以下位置找到它:files.training.databricks.com/assessments/practice-exams/PracticeExam-DCADAS3-Python.pdf
.
考试期间可用的资源
在考试期间,您将能够访问 Spark 文档。这是通过 Webassessor 实现的,其界面与您在互联网上找到的常规 Spark 文档略有不同。熟悉这个界面会很好。您可以在 www.webassessor.com/zz/DATABRICKS/Python_v2.html
找到该界面。我建议您浏览它,并尝试通过此文档找到 Spark 的不同包和函数,以便在考试期间更舒适地导航。
接下来,我们将查看如何注册考试。
注册您的考试
Databricks 是准备这些考试和认证的公司。您可以在此处注册考试:www.databricks.com/learn/certification/apache-spark-developer-associate
.
接下来,我们将查看考试的一些先决条件。
考试先决条件
在您参加考试之前,需要一些先决条件,以便您能够成功通过认证。以下是一些主要的先决条件:
-
掌握 Spark 架构的基本原理,包括自适应查询执行的原则。
-
熟练使用 Spark DataFrame API 进行各种数据操作任务,如下所示:
-
执行列操作,例如选择、重命名和操作
-
执行行操作,包括过滤、删除、排序和聚合数据
-
执行与 DataFrame 相关的任务,例如连接、读取、写入和实现分区策略
-
展示使用 用户定义函数(UDFs)和 Spark SQL 函数的熟练程度
-
-
虽然没有明确测试,但预期您对 Python 或 Scala 有功能性的理解。考试可用两种编程语言进行。
希望到这本书的结尾,您能够完全掌握所有这些概念,并且已经足够练习,以便对考试充满信心。
现在,让我们讨论一下在线监考考试期间可以期待什么。
在线监考考试
Spark 认证考试是一个在线监考考试。这意味着您将在家中参加考试,但有人将在网上监考。我鼓励您提前了解监考考试的程序和规则。这将为您在考试时节省很多麻烦和焦虑。
为了给你一个概述,在整个考试过程中,以下程序将生效:
-
Webassessor 监考员将进行网络摄像头监控,以确保考试诚信。
-
你需要出示带有照片的有效身份证明。
-
你需要独自进行考试。
-
你的桌子需要整理干净,并且除了你用于考试的笔记本电脑外,房间里不应有其他电子设备。
-
房间的墙上不应有任何可能帮助你考试的海报或图表。
-
监考员在考试期间也会监听你,所以你想要确保你坐在一个安静舒适的环境中。
-
建议不要使用你的工作笔记本电脑参加这次考试,因为它需要安装软件,并且需要禁用你的防病毒软件和防火墙。
监考员的职责如下:
-
监督你的考试过程以保持考试诚信。
-
解决与考试交付过程相关的任何疑问。
-
如有必要,提供技术支持。
-
需要注意的是,监考员不会就考试内容提供任何形式的帮助。
我建议你在考试前有足够的时间设置你将进行考试的环境。这将确保一个顺畅的在线考试流程,让你可以专注于问题,不必担心其他任何事情。
现在,让我们谈谈考试中可能出现的不同类型的题目。
题目类型。
考试中你会遇到不同类别的题目。它们可以大致分为理论题和代码题。在本节中,我们将探讨这两个类别及其各自的子类别。
理论题。
理论题是那些会要求你对某些主题的概念性理解的问题。理论题可以进一步细分为不同的类别。让我们看看这些类别,以及从之前的考试中选取的属于这些类别的示例问题。
解释题。
解释题是需要定义和解释某事的问题。它也可以包括某物是如何工作的以及它做什么。让我们看看一个例子。
以下哪项描述了一个工作节点?
-
工作节点是集群中执行计算的节点。
-
工作节点与执行器是同义词。
-
工作节点总是与执行器保持一对一的关系。
-
工作节点是 Spark 执行层次结构中最细粒度的执行级别。
-
工作节点是 Spark 执行层次结构中最粗粒度的执行级别。
连接题。
连接题是需要定义不同事物之间是如何相互关联的,或者它们是如何相互区别的问题。让我们通过一个例子来展示这一点。
以下哪项描述了工作节点与执行器之间的关系?
-
执行器是在工作节点上运行的 Java 虚拟机(JVM)。
-
工作节点是在执行器上运行的 JVM。
-
总是存在比执行器更多的工作节点。
-
总是存在相同数量的执行器和工作节点。
-
执行器和工作节点之间没有关系。
场景问题
场景问题涉及定义在不同 if-else 场景中事物是如何工作的——例如,“如果 ______ 发生,那么 _____ 就会发生。”此外,它还包括关于场景的陈述不正确的问题。让我们通过一个例子来展示这一点。
如果 Spark 以集群模式运行,以下关于节点的说法中哪一个是错误的?
-
有一个包含 Spark 驱动程序和执行器的工作节点。
-
Spark 驱动程序在其自己的非工作节点上运行,没有任何执行器。
-
每个执行器都是工作节点内部运行的 JVM。
-
总是存在多个节点。
-
可能会有比总节点更多的执行器,或者比执行器更多的总节点。
分类问题
分类问题是这样的问题,你需要描述某个事物所属的类别。让我们通过一个例子来展示这一点。
以下哪个说法准确地描述了阶段?
-
阶段内的任务可以由多台机器同时执行。
-
作业中的各个阶段可以并发运行。
-
阶段包括一个或多个作业。
-
阶段在提交之前暂时存储事务。
配置问题
配置问题是这样的问题,你需要根据不同的集群配置概述事物的行为。让我们通过一个例子来展示这一点。
以下哪个说法准确地描述了 Spark 的集群执行模式?
-
集群模式在网关节点上运行执行器进程。
-
集群模式涉及驱动程序托管在网关机器上。
-
在集群模式下,Spark 驱动程序和集群管理器不在同一位置。
-
集群模式下的驱动程序位于工作节点上。
接下来,我们将探讨基于代码的问题及其子类别。
基于代码的问题
下一个类别是基于代码的问题。大量基于 Spark API 的问题属于这个类别。基于代码的问题是你会得到一个代码片段,然后你会被问及关于它的问题。基于代码的问题可以进一步细分为不同的类别。让我们看看这些类别,以及从之前的考试中选取的属于这些不同子类别的示例问题。
函数识别问题
函数识别问题是这样的问题,你需要定义某个事物是由哪个函数执行的。了解 Spark 中用于数据操作的不同函数及其语法非常重要。让我们通过一个例子来展示这一点。
以下哪个代码块返回了 df
DataFrame 的副本,其中 column
的 salary
已重命名为 employeeSalary
?
-
df.withColumn(["salary", "employeeSalary"])
-
df.withColumnRenamed("salary").alias("employeeSalary ")
-
df.withColumnRenamed("salary", "
employeeSalary ")
-
df.withColumn("salary", "
employeeSalary ")
填空题
填空题是这样的问题,您需要通过填写空白来完成代码块。让我们通过一个示例来演示这一点。
以下代码块应返回一个 DataFrame,其中包含transactionsDf
DataFrame 中的employeeId
、salary
、bonus
和department
列。请选择正确填充空白的答案来完成此操作。
df.__1__(__2__)
-
-
drop
-
"employeeId", "salary", "``bonus", "department"
-
-
-
filter
-
"employeeId, salary,
bonus, department"
-
-
-
select
-
["employeeId", "salary", "``bonus", "department"]
-
-
-
select
-
col(["employeeId", "salary", "``bonus", "department"])
-
代码行顺序问题
代码行顺序问题是这样的问题,您需要将代码行按特定顺序排列,以便正确执行操作。让我们通过一个示例来演示这一点。
以下哪个代码块创建了一个 DataFrame,该 DataFrame 显示了基于department
和state
列的salary
列的平均值,其中age
大于 35?
-
salaryDf.filter(col("age") >
35)
-
.``filter(col("employeeID")
-
.``filter(col("employeeID").isNotNull())
-
.``groupBy("department")
-
.``groupBy("department", "state")
-
.``agg(avg("salary").alias("mean_salary"))
-
.``agg(average("salary").alias("mean_salary"))
-
i, ii, v, vi
-
i, iii, v, vi
-
i, iii, vi, vii
-
i, ii, iv, vi
摘要
本章提供了认证考试的概述。到目前为止,您已经知道考试中可以期待什么,以及如何最好地准备它。为此,我们介绍了您将遇到的不同类型的问题。
今后,本书的每一章都将为您提供实用的知识和动手实践示例,以便您能够利用 Apache Spark 进行各种数据处理和分析任务。
第二部分:介绍 Spark
本部分将为您提供对 Spark 功能和操作原理的全面了解。它将涵盖 Spark 是什么,为什么它很重要,以及 Spark 最有用的应用领域。它将介绍可以从 Spark 中受益的不同类型的用户。它还将涵盖 Spark 的基本架构以及如何在 Spark 中导航应用程序。它将详细说明窄和宽 Spark 转换,并讨论 Spark 中的懒加载评估。这种理解很重要,因为 Spark 的工作方式与其他传统框架不同。
本部分包含以下章节:
-
第二章*,理解 Apache Spark 及其应用*
-
第三章*,Spark 架构和转换*
第二章:理解 Apache Spark 及其应用
随着机器学习和数据科学的兴起,世界正在经历一个范式转变。每秒钟都在收集大量的数据,而计算能力难以跟上这种快速数据增长的速度。为了利用所有这些数据,Spark 已经成为大数据处理的事实标准。将数据处理迁移到 Spark 不仅是一个节省资源的问题,让你能专注于你的业务;它也是一种现代化你的工作负载,利用 Spark 的能力和现代技术堆栈来创造新的商业机会的手段。
在本章中,我们将涵盖以下主题:
-
什么是 Apache Spark?
-
为什么选择 Apache Spark?
-
Spark 的不同组件
-
Spark 有哪些用例?
-
Spark 的用户是谁?
什么是 Apache Spark?
Apache Spark 是一个开源的大数据框架,用于多个大数据应用。Spark 的强大之处在于其卓越的并行处理能力,使其在其领域成为领导者。
根据其网站(spark.apache.org/
),“最广泛使用的可扩展计算引擎”
Apache Spark 的历史
Apache Spark 始于 2009 年在加州大学伯克利分校 AMPLab 的一个研究项目,并于 2010 年转为开源许可。后来,在 2013 年,它成为 Apache 软件基金会的一部分(spark.apache.org/
)。它在 2013 年之后获得了流行,如今,它已成为众多财富 500 强公司大数据产品的支柱,有成千上万的开发者正在积极为其工作。
Spark 的出现是由于 Hadoop MapReduce 框架的限制。MapReduce 的主要前提是从磁盘读取数据,将数据分发以进行并行处理,对数据应用 map 函数,然后将这些函数减少并保存回磁盘。这种来回读取和保存到磁盘的过程很快就会变得耗时且成本高昂。
为了克服这一限制,Spark 引入了内存计算的概念。除此之外,Spark 还拥有由不同研究倡议带来的几个能力。你将在下一节中了解更多关于它们的信息。
理解 Spark 的不同之处
Spark 的基础在于其主要能力,如内存计算、延迟评估、容错性和支持 Python、SQL、Scala 和 R 等多种语言。我们将在下一节中详细讨论每一个。
让我们从内存计算开始。
内存计算
Spark 基础构建的第一个主要区分性技术是它利用内存计算。记得我们讨论 Hadoop MapReduce 技术时吗?它的一个主要限制是在每个步骤都将数据写回磁盘。Spark 将其视为改进的机会,并引入了内存计算的概念。主要思想是数据在处理过程中始终保持在内存中。如果我们能够处理一次性存储在内存中的数据量,我们就可以消除在每个步骤中写入磁盘的需要。因此,如果我们能够处理所有计算的数据量,整个计算周期就可以在内存中完成。现在,需要注意的是,随着大数据的出现,很难将所有数据都包含在内存中。即使我们在云计算世界的重型服务器和集群中看,内存仍然是有限的。这就是 Spark 并行处理内部框架发挥作用的地方。Spark 框架以最有效的方式利用底层硬件资源。它将计算分布在多个核心上,并充分利用硬件能力。
这极大地减少了计算时间,因为只要数据可以适应 Spark 计算内存,写入磁盘和读取回磁盘的开销就会最小化。
延迟评估
通常,当我们使用编程框架时,后端编译器会查看每个语句并执行它。虽然这对于编程范式来说效果很好,但在大数据和并行处理的情况下,我们需要转向一种前瞻性的模型。Spark 因其并行处理能力而闻名。为了实现更好的性能,Spark 不会在读取代码时执行代码,而是在代码存在并且我们提交 Spark 语句以执行时,第一步是 Spark 构建查询的逻辑映射。一旦这个映射建立,然后它会规划最佳的执行路径。你将在 Spark 架构章节中了解更多关于其复杂性的内容。一旦计划确定,执行才会开始。即使执行开始,Spark 也会推迟执行所有语句,直到遇到一个“操作”语句。Spark 中有两种类型的语句:
-
转换
-
操作
你将在第三章中详细了解 Spark 语句的不同类型,其中我们讨论了 Spark 架构。以下是延迟评估的一些优点:
-
效率
-
代码可管理性
-
查询和资源优化
-
简化复杂性
弹性数据集/容错性
Spark 的基础是建立在弹性分布式数据集(RDDs)之上的。它是一个不可变的分布式对象集合,代表了一组记录。RDDs 分布在多个服务器上,它们在多个集群节点上并行计算。RDDs 可以通过代码生成。当我们从外部存储位置读取数据到 Spark 中时,RDDs 会保存这些数据。这些数据可以在多个集群之间共享,并且可以并行计算,从而为 Spark 提供了非常高效的在 RDD 数据上运行计算的方法。RDDs 被加载到内存中进行处理;因此,与 Hadoop 不同,不需要将数据加载到和从内存中进行计算。
RDDs 具有容错性。这意味着如果出现故障,RDDs 有自我恢复的能力。Spark 通过将这些 RDD 分布到不同的工作节点上,同时考虑到每个工作节点执行的任务,来实现这一点。这种对工作节点的处理由 Spark 驱动器完成。我们将在后续章节中详细讨论这一点。
RDDs 在弹性和容错性方面赋予了 Spark 很大的能力。这种能力,结合其他特性,使得 Spark 成为任何生产级应用的工具选择。
多语言支持
Spark 支持多种开发语言,如 Java、R、Scala 和 Python。这使用户能够灵活地使用任何选择的编程语言在 Spark 中构建应用程序。
Spark 的组件
让我们来谈谈 Spark 的不同组件。正如你在图 1.1中可以看到的,Spark Core 是 Spark 操作的核心,横跨 Spark 的所有其他组件。本节我们将讨论的其他组件包括 Spark SQL、Spark Streaming、Spark MLlib 和 GraphX。
图 2.1:Spark 组件
让我们来看看 Spark 的第一个组件。
Spark Core
Spark Core 是 Spark 所有其他组件的核心。它为所有不同的组件提供功能和核心特性。Spark SQL、Spark Streaming、Spark MLlib 和 GraphX 都使用 Spark Core 作为其基础。Spark 的所有功能和特性都由 Spark Core 控制。它提供了内存计算能力以提供速度,一个通用的执行模型以支持广泛的各类应用,以及 Java、Scala 和 Python API 以简化开发。
在所有这些不同的组件中,你可以使用支持的语言编写查询。然后 Spark 将这些查询转换为有向无环图(DAGs),而 Spark Core 负责执行它们。
Spark Core 的关键职责如下:
-
与存储系统交互
-
内存管理
-
任务分配
-
任务调度
-
任务监控
-
内存计算
-
容错性
-
优化
Spark Core 包含一个用于 RDDs 的 API,RDDs 是 Spark 的组成部分。它还提供了不同的 API 来交互和工作与 RDDs。Spark 的所有组件都使用底层的 RDDs 进行数据处理。RDDs 使得 Spark 能够拥有数据的历史记录,因为它们是不可变的。这意味着每次对 RDD 执行需要更改的操作时,Spark 都会为它创建一个新的 RDD。因此,它维护 RDD 及其对应操作的历史记录信息。
Spark SQL
SQL 是数据库和数据仓库应用中最流行的语言。分析师使用这种语言进行所有基于关系数据库和传统数据仓库的探索性数据分析。Spark SQL 将这一优势添加到 Spark 生态系统中。Spark SQL 用于使用 DataFrame API 以 SQL 查询结构化数据。
如其名称所示,Spark SQL 为 Spark 提供了 SQL 支持。这意味着我们可以使用 DataFrame API 查询 RDD 和其他外部源中的数据。这是 Spark 的一个强大功能,因为它为开发者提供了在 RDD 和其他文件格式之上使用关系表结构的灵活性,并允许在上面编写 SQL 查询。这也增加了在必要时使用 SQL 的能力,并将其与分析应用程序和用例统一,从而提供了平台的统一。
使用 Spark SQL,开发者可以轻松完成以下操作:
-
他们可以从不同的文件格式和不同的来源读取数据到 RDDs 和 DataFrames 中
-
他们可以在 DataFrame 中的数据上运行 SQL 查询,从而为开发者提供使用编程语言或 SQL 处理数据的灵活性
-
一旦完成数据处理,他们就有能力将 RDDs 和 DataFrames 写入外部源
Spark SQL 包含一个基于成本的优化器,它优化查询,同时考虑资源;它还具有生成这些优化代码的能力,这使得这些查询非常快速和高效。为了支持更快的查询时间,它可以在 Spark Core 的帮助下扩展到多个节点,并提供诸如容错和弹性等特性。这被称为 Catalyst 优化器。我们将在第五章中了解更多关于它。
Spark SQL 最显著的特点如下:
-
它提供了一个高级结构化 API 的引擎
-
读取/写入到和从大量文件格式,如 Avro、Delta、逗号分隔值(CSV)和 Parquet
-
提供了开放数据库连接(ODBC)和Java 数据库连接(JDBC)连接器,用于连接商业智能(BI)工具,如 PowerBI 和 Tableau,以及流行的关系数据库(RDBMs)
-
提供了一种将文件中的结构化数据查询为表和视图的方法
-
它支持符合 ANSI SQL:2003 命令和 HiveQL
既然我们已经涵盖了 SparkSQL,让我们来讨论 Spark Streaming 组件。
Spark Streaming
我们已经讨论了当今时代数据的快速增长。如果我们将这些数据分组,实际上有两种数据集类型,批处理和流式处理:
-
批处理数据是指存在一块数据,你必须一次性摄取并转换。想象一下,当你想要获取一个月内所有销售的报告时。你将拥有作为批处理的月度数据,并一次性处理它。
-
流数据是指你需要实时数据的输出。为了满足这一需求,你必须实时摄取和处理这些数据。这意味着每个数据点都可以作为一个单独的数据元素摄取,我们不会等待收集到数据块后再进行摄取。想象一下,自动驾驶汽车需要根据收集到的数据实时做出决策。所有数据都需要实时摄取和处理,以便汽车在特定时刻做出有效的决策。
有许多行业正在生成流数据。为了利用这些数据,你需要实时摄取、处理和管理这些数据。对于组织来说,使用流数据作为实时分析和其他用例已经成为一项基本要求。这使他们比竞争对手具有优势,因为这使他们能够实时做出决策。
Spark Streaming 使组织能够利用流数据。Spark Streaming 最重要的因素之一是它易于使用,并且可以与批处理数据一起使用。你可以在一个框架内结合批处理和流数据,并使用它来增强你的分析应用程序。Spark Streaming 还继承了 Spark Core 的弹性和容错特性,使其在行业中占据主导地位。它集成了大量流数据源,如 HDFS、Kafka 和 Flume。
Spark Streaming 的美丽之处在于,批处理数据可以作为流进行处理,以利用流数据的内置范式和回溯能力。当我们处理实时数据时,需要考虑某些因素。当我们处理实时数据流时,可能会因为系统故障或完全失败而错过一些数据。Spark Streaming 以无缝的方式处理这个问题。为了满足这些需求,它有一个内置机制,称为检查点。这些检查点的目的是跟踪传入的数据,了解下游处理了什么数据,以及在下一次周期中还有哪些数据需要处理。我们将在详细讨论 Spark Streaming 的第七章中了解更多关于这一点。
这使得 Spark 对故障具有弹性。如果有任何故障,您需要做最少的工作来重新处理旧数据。您还可以定义缺失数据或延迟处理数据的机制和算法。这为数据管道提供了很大的灵活性,并使它们在大规模生产环境中更容易维护。
Spark MLlib
Spark 提供了一个用于分布式和可扩展机器学习的框架。它将计算分布在不同的节点上,从而在模型训练方面实现更好的性能。它还分布了超参数调整。您将在第八章中了解更多关于超参数调整的内容,我们将讨论机器学习。因为 Spark 可以扩展到大型数据集,所以它是机器学习生产管道的首选框架。当您构建产品时,执行和计算速度非常重要。Spark 让您能够处理大量数据,并构建运行非常高效的先进机器学习模型。与需要数天训练的模型相比,Spark 将时间缩短到数小时。此外,处理更多数据在大多数情况下会导致性能更好的模型。
大多数常用的机器学习算法都是 Spark 库的一部分。Spark 中有两个机器学习包可用:
-
Spark MLlib
-
Spark ML
这两个之间的主要区别是它们处理的数据类型。Spark MLlib 建立在 RDD 之上,而 Spark ML 与 DataFrame 一起工作。Spark MLlib 是较旧的库,现在已进入维护模式。更先进的库是 Spark ML。您还应注意,Spark ML 不是库本身的官方名称,但它通常用于指代 Spark 中基于 DataFrame 的 API。官方名称仍然是 Spark MLlib。然而,了解这些差异是很重要的。
Spark MLlib 包含了最常用的机器学习库,用于分类、回归、聚类和推荐系统。它还支持一些频繁模式挖掘算法。
当需要将这些模型服务于数百万甚至数十亿用户时,Spark 也非常有帮助。您可以使用 Spark 分发和并行化数据处理(提取、转换、加载(ETL))和模型评分。
GraphX
GraphX 是 Spark 的图和图并行计算的 API。GraphX 扩展了 Spark 的 RDD 以支持图,并允许您使用图对象运行并行计算。这显著提高了计算速度。
这里有一个表示图外观的网络图。
图 2.2:一个网络图
图是一个具有顶点和边的对象。属性附加到每个顶点和边上。Spark 支持一些主要的图操作,例如subgraph
和joinVertices
。
主要前提是你可以使用 GraphX 进行探索性分析和 ETL,并使用 RDD 高效地转换和连接图。有两种类型的操作符——Graph
和GraphOps
。在此基础上,还有图聚合操作符。Spark 还包括许多在常见用例中使用的图算法。以下是一些最受欢迎的算法:
-
PageRank
-
连通分量
-
标签传播
-
SVD++
-
强连通分量
-
三角形计数
现在,让我们讨论为什么我们想在应用中使用 Spark 以及它提供的一些特性。
为什么选择 Apache Spark?
在本节中,我们将讨论 Apache Spark 的应用及其特性,例如速度、可重用性、内存计算以及 Spark 是如何成为一个统一平台的。
速度
Apache Spark 是目前可用的最快数据处理框架之一。它比 Hadoop MapReduce 快得多。主要原因在于其内存计算能力和延迟评估。我们将在下一章讨论 Spark 架构时了解更多关于这一点。
可重用性
可重用性对于使用现代平台的大型组织来说是一个非常重要的考虑因素。Spark 可以无缝地连接批处理和流数据。此外,你可以通过添加历史数据来增强数据集,以更好地满足你的用例。这为运行查询或构建现代分析系统提供了大量历史数据视图。
内存计算
使用内存计算,消除了读取和写入磁盘的所有开销。数据被缓存,在每一步中,所需的数据已经存在于内存中。在处理结束时,结果被汇总并发送回驱动程序以进行后续步骤。
所有这些都得益于 Spark 固有的 DAG 创建过程。在执行之前,Spark 创建必要的步骤的 DAG 并根据其内部算法对其进行优先排序。我们将在下一章中了解更多关于这一点。这些功能支持内存计算,从而实现快速处理速度。
统一平台
Spark 提供了一个统一的数据工程、数据科学、机器学习、分析、流处理和图处理平台。所有这些组件都与 Spark Core 集成。核心引擎非常高速,并概括了其其他组件所需的一些常用任务。这使得 Spark 在与其他平台相比时具有优势,因为其不同组件的统一。这些组件可以协同工作,为软件应用提供统一的体验。在现代应用中,这种统一使得使用变得容易,并且应用的不同部分可以充分利用这些组件的核心功能,而不会牺牲功能。
现在你已经了解了使用 Spark 的好处,让我们来谈谈 Spark 在行业中的不同用例。
Spark 有哪些用例?
在本节中,我们将了解 Spark 在行业中的应用。目前 Spark 有各种用例,包括大数据处理、机器学习应用、近实时和实时流处理,以及使用图分析。
大数据处理
Spark 最流行的用例之一是大数据处理。你可能想知道什么是大数据,那么让我们来看看标记数据为大数据的组成部分。
大数据的第一个组成部分是数据量。按数据量来说,数据非常大,在某些情况下,数据量可能达到太字节、拍字节甚至更多。多年来,组织收集了大量的数据。这些数据可以用于分析。然而,在这个活动的第一步是处理这些大量的数据。此外,只有最近,计算能力才增长到能够处理如此庞大的数据量。
大数据的第二个组成部分是数据速度。数据速度指的是数据生成、摄取和分布的速度。这意味着近年来数据生成的速度已经大幅增加。以你的智能设备为例,它每秒向服务器发送数据。在这个过程中,服务器还需要跟上数据的摄取,然后可能需要将数据分布到不同的来源。
大数据的第三个组成部分是数据多样性。数据多样性指的是生成数据的不同来源。它还指的是生成的不同类型的数据。那些只有结构化格式可以保存为数据库中的表的数据生成时代已经过去了。目前,数据可以是结构化的、半结构化的或非结构化的。现在的系统必须处理所有这些不同的数据类型,工具应该能够操作这些不同的数据类型。想想需要处理的照片或可以使用高级分析进行分析的音频和视频文件。
还可以将其他一些组件添加到原始的三个 V 中,例如真实性和价值。然而,这些组件超出了我们讨论的范围。
大数据太大,常规机器无法处理。这就是为什么它被称为大数据。高容量、高速度、高多样性的大数据需要使用像 Spark 这样的高级分析工具进行处理,Spark 可以在不同的机器或集群之间分配工作负载,并并行处理以利用机器上的所有可用资源。因此,Spark 使我们能够将数据分成不同的部分,并在不同的机器上并行处理。这极大地加快了整个过程,并利用了所有可用资源。
由于上述所有原因,Spark 是使用最广泛的的大数据处理技术之一。大型组织利用 Spark 来分析和操作他们的大数据堆栈。Spark 在复杂分析用例中作为大数据处理的基础。
以下是一些大数据用例的例子:
-
报告和仪表板业务智能
-
复杂应用的数据仓库
-
应用监控的操作分析
这里需要特别注意的是,与 Spark 合作需要从单节点处理模式转变为大数据处理模式。你现在必须开始思考如何最好地利用和优化大型集群进行处理,以及并行处理的一些最佳实践。
机器学习应用
随着数据的增长,对机器学习模型利用更多数据的需要也在增加。目前在机器学习社区中普遍认为,提供给模型的数据越多,模型就越好。这导致了需要大量数据提供给模型进行预测分析的需求。当我们处理大量数据时,训练机器学习模型的挑战比数据处理更加复杂。原因是机器学习模型通过压缩数据并运行统计估计来达到最小误差点。为了达到这个最小误差,模型必须执行复杂的数学运算,如矩阵乘法。这些计算需要在内存中提供大量数据,并在其上运行计算。这为机器学习中的并行处理提供了案例。
机器学习为产品增加了预测元素。我们不再只是对已经发生的变化做出反应,而是可以根据历史数据和趋势主动寻找改进我们产品和服务的途径。组织的每个方面都可以利用机器学习进行预测分析。机器学习可以应用于许多行业,从医院到零售店再到制造组织。我们在互联网上完成任务时,如在线买卖、浏览和搜索网站、使用社交媒体平台,都遇到过某种机器学习算法。不知不觉中,机器学习已经成为我们生活的重要组成部分。
尽管在机器学习方面,组织可以利用大量用例,但我在这里只突出强调几个:
-
个性化购物
-
网站搜索和排名
-
银行和保险业的欺诈检测
-
客户情感分析
-
客户细分
-
推荐引擎
-
价格优化
-
预测性维护和支持
-
文本和视频分析
-
客户/患者 360 度
让我们继续讨论实时流处理。
实时流处理
实时流是 Spark 真正发光的用例之一。提供与 Spark Streaming 相同灵活性的竞争框架非常少。
Spark Streaming 提供了一种机制,可以从多个流数据源(如 Kafka 和 Amazon Kinesis)中摄取数据。一旦数据被摄取,就可以使用非常高效的 Spark Streaming 处理实时处理。
有许多实时用例可以利用 Spark Streaming。以下是一些例子:
-
自动驾驶汽车
-
实时报告和分析
-
提供股市数据的更新
-
物联网(IoT)数据摄取和处理
-
实时新闻数据处理
-
实时分析以优化库存和运营
-
信用卡实时欺诈检测系统
-
实时事件检测
-
实时推荐
大型全球组织利用 Spark Streaming 实时处理数十亿甚至数万亿的数据行。我们在日常生活中也能看到一些这样的应用。例如,当你外出购物时,你的信用卡阻止了一笔交易,这就是实时欺诈检测的一个例子。Netflix 和 YouTube 使用实时交互,视频平台推荐用户观看下一部视频。
随着我们进入一个每个设备都将数据发送回其服务器进行分析的世界,对流和实时分析的需求增加。使用 Spark Streaming 进行此类数据的主要优势之一是其内置的回溯和延迟处理数据的能力。我们之前也讨论了这种方法的实用性,并且由于这些能力,大量手动管道处理工作被移除。当我们讨论 Spark Streaming 时,我们将了解更多关于这一点,第七章 将会涉及。
图分析
图分析通过分析不同实体之间的关系,提供了一种独特的数据观察方式。图中的顶点代表实体,图的边代表两个实体之间的关系。以你在 Facebook 或 Instagram 上的社交网络为例。你代表一个实体,而你连接的人代表另一个实体。你和你朋友之间的联系(连接)就是边。同样,你在社交媒体上的兴趣可能都是不同的边。然后,可以有一个位置类别,属于同一位置的所有人都会与该位置有一个边(关系),依此类推。因此,可以与任何不同类型的实体建立联系。你连接得越多,你与志同道合的人或兴趣相连接的可能性就越高。这是衡量不同实体之间关系的一种方法。这些类型的图有几种用途。Spark 的美丽之处在于,它可以通过分布式处理这些图来快速找到这些关系。对于数十亿个实体,可能会有数百万甚至数十亿个连接。Spark 具有分配这些工作负载和快速计算复杂算法的能力。
以下是一些图分析的应用场景:
-
社交网络分析
-
欺诈检测
-
基于相关性的页面排名
-
天气预测
-
搜索引擎优化
-
供应链分析
-
在社交媒体上寻找影响者
-
洗钱和欺诈检测
随着图分析用例数量的不断增长,这证明在当今行业中,我们需要分析实体之间关系的网络,这是一个关键的应用场景。
在下一节中,我们将讨论 Spark 用户是谁以及他们在组织中的典型角色。
Spark 用户是谁?
随着世界向数据驱动的决策方法转变,数据和能够利用它做出关键业务决策的不同类型用户的作用变得至关重要。在数据中存在不同类型的用户,他们可以利用 Spark 实现不同的目的。在本节中,我将介绍其中一些不同的用户。这不是一个详尽的列表,但它应该能给你一个关于今天数据驱动组织中存在的不同角色的概念。然而,随着行业的发展,许多新的角色正在出现,它们与以下章节中提到的角色相似,尽管每个角色可能都有其独特的职责。
我们将从数据分析师的角色开始。
数据分析师
在当今的数据领域,更传统的角色是数据分析师。数据分析师通常是数据的第一层级角色。这意味着数据分析师是组织决策的核心。这个角色跨越组织的不同业务部门,并且通常,数据分析师需要与多个业务利益相关者互动,以传达他们的需求。这需要了解业务领域及其流程。当分析师对业务及其目标有了解时,他们才能最好地履行他们的职责。此外,很多时候,需求是使当前流程更有效率,这最终会为业务带来更好的底线。因此,不仅需要了解业务目标,还需要了解它们是如何协同工作的,这是这个角色的一项主要要求。
数据分析师的一个典型工作角色可能如下所示:
-
当数据分析师在一个组织中接到一个项目时,项目的第一步是从多个利益相关者那里收集需求。让我们用一个例子来说明。假设你加入了一个组织作为数据分析师。这个组织生产和销售计算机硬件。你被分配的任务是报告过去 10 年中每个月的收入。对你来说,第一步就是收集所有需求。可能有些利益相关者想知道每个月销售了多少个特定产品的单位,而其他人可能想知道收入是否持续增长。记住,你的报告的最终用户可能工作在组织的不同业务部门。
-
一旦你收集了所有相关利益相关者的需求,接下来就是进行下一步,也就是寻找相关的数据源来回答你所负责的问题。你可能需要与组织或平台架构师中的数据库管理员交谈,以了解不同数据源的位置,这些数据源中包含对你提取信息有用的相关信息。
-
一旦你有了所有相关的来源,那么你想要以编程方式(在大多数情况下)与这些来源连接,清理并合并一些数据,以满足你的要求,从而得出相关的统计数据。这就是 Spark 可以帮助你连接到这些不同的数据来源,并高效地读取和操作数据的地方。你还需要根据业务需求对数据进行切割和细分。一旦数据清理完成并生成了统计数据,你想要基于这些统计数据生成一些报告。市场上有很多生成报告的工具,如 Qlik 和 Tableau,你可以使用它们。一旦生成了报告,你可能想要与利益相关者分享你的结果。你可以向他们展示你的结果,或者与他们分享报告,具体取决于首选的媒介。这将帮助利益相关者做出基于数据的、信息化的、业务关键性的决策。
不同角色之间的协作对于数据分析师来说也起着重要的作用。由于组织已经收集数据很长时间了,最重要的事情是与多年来收集的所有数据进行工作,并从中找到意义,帮助企业在关键决策中做出贡献。帮助进行数据驱动的决策是成为一名成功的数据分析师的关键。
这是前几段讨论的项目中采取的步骤的总结:
-
从利益相关者那里收集需求。
-
确定相关的数据来源。
-
与主题专家(SMEs)合作。
-
切割和细分数据。
-
生成报告。
-
分享结果。
接下来让我们看看数据工程师。这个角色在当今行业中获得了很大的关注。
数据工程师
在行业中越来越普遍的新角色是数据工程师。这是一个相对较新的角色,但近年来获得了巨大的流行度。原因是数据增长速度极快。现在每秒钟产生的数据比几年前一个月产生的数据还要多。处理所有这些数据需要专业的技能。这些数据已经无法被大多数计算机的适度内存所容纳,因此我们必须利用云计算的巨大规模来满足这一需求。随着数据需求变得更加复杂,我们需要复杂的架构来处理和使用这些数据以进行商业决策。这就是数据工程师角色发挥作用的地方。数据工程师的主要工作是准备数据以供不同目的的摄取。利用这些准备好的数据的下游系统可能是基于这些数据运行报告的仪表板,也可能是与高级机器学习算法合作进行预测分析的解决方案,以便根据数据做出主动决策。
更广泛地说,数据工程师负责创建、维护、优化和监控为组织中的不同用例提供服务的管道。这些管道通常被称为提取、转换、加载(ETL)管道。主要区别在于数据工程师必须处理的数据规模之大。当有下游需求用于 BI 报告、高级分析以及/或机器学习时,这就是数据管道在大项目中发挥作用的地方。
组织中数据工程师的一个典型工作角色可能如下。当数据工程师被分配创建一个项目数据管道的任务时,他们首先需要考虑的是应用程序的整体架构。在一些组织中可能有数据架构师来帮助处理一些架构需求,但这并不总是如此。因此,数据工程师会提出如下问题:
-
数据的不同来源有哪些?
-
数据的大小是多少?
-
数据目前存储在哪里?
-
我们是否需要在不同的工具之间迁移数据?
-
我们如何连接到数据?
-
需要什么样的数据转换?
-
数据多久更新一次?
-
我们是否应该预期新数据会有模式变更?
-
如果出现故障,我们如何监控管道?
-
我们是否需要创建一个用于故障的通知系统?
-
我们是否需要为故障添加重试机制?
-
故障的超时策略是什么?
-
如果出现故障,我们如何运行过期的管道?
-
我们如何处理不良数据?
-
我们应该遵循什么策略——ETL 还是 ELT?
-
我们如何节省计算成本?
一旦他们回答了这些问题,他们就开始着手构建一个具有弹性的架构来构建数据管道。一旦这些管道运行并经过测试,下一步就是维护这些管道,使处理更加高效和易于故障检测。目标是构建这些管道,以便一旦运行完毕,数据的最终状态对于不同的下游用例是一致的。过于频繁的是,数据工程师必须与数据分析师和数据科学家合作,根据所需用例制定正确的数据转换要求。
让我们谈谈数据科学家,这是一个在多个论坛上被宣传为“21 世纪最性感的工作”的职位。
数据科学家
传统上,数据一直被用于基于过去发生的事情进行决策。这意味着组织是基于数据做出反应的。现在,在高级和预测分析方面已经发生了范式转变。这意味着组织在决策上可以变得主动,而不是被动。他们通过现在组织可用的所有数据来实现这一点。为了有效地利用这些数据,数据科学家扮演着重要的角色。他们将分析提升到下一个层次,而不是仅仅查看过去发生的事情,他们拥有复杂的机器学习算法来预测未来可能发生的事情。所有这些都是基于他们可用的海量数据。
在一个组织中,数据科学家的典型工作角色可能如下所示。
数据科学家被分配了一个需要解决的问题或一个需要回答的问题。首要任务是查看他们可以用来回答这个问题的数据类型。他们会基于给定的数据提出一些假设进行测试。如果结果积极,并且数据能够回答一些问题陈述,那么他们就会继续通过实验来处理数据,并寻求更有效地回答手头问题的方法。为此,他们会将不同的数据集合并在一起,并且也会转换数据,使其适合某些机器学习算法使用。在这个阶段,他们还需要决定他们旨在解决的机器学习问题类型。
他们可以使用三种主要的机器学习技术:
-
回归
-
分类
-
聚类
根据决定的技术和数据转换,他们接下来会使用几个机器学习算法进行原型设计,以创建一个基线模型。基线模型是一个非常基本的模型,用于回答原始问题。基于这个基线模型,可以创建其他模型,这些模型能够更好地回答问题。在某些情况下,一些预定义的规则也可以作为基线模型。这意味着企业可能已经在一些预定义的规则上运行,这些规则可以作为基准来比较机器学习模型。一旦完成初步的原型设计,数据科学家就会继续进行更高级的模型优化。他们可以与模型的不同的超参数一起工作,或者尝试不同的数据转换和样本大小。所有这些都可以在 Spark 或其他工具和语言中完成,具体取决于他们的偏好。Spark 具有并行运行这些算法的优势,使整个过程非常高效。一旦数据科学家根据不同的指标对模型结果感到满意,他们就会将模型移动到生产环境,在这些环境中,这些模型可以服务于客户解决特定问题。在这个阶段,他们会将这些模型交给机器学习工程师,开始将它们集成到管道中。
以下是前一段讨论的项目中采取的步骤总结:
-
创建并测试一个假设。
-
转换数据。
-
我们决定使用哪种机器学习算法?
-
使用不同的机器学习模型进行原型设计。
-
创建一个基线模型。
-
调整模型。
-
调整数据。
-
将模型过渡到生产环境。
接下来,让我们讨论机器学习工程师的角色。
机器学习工程师
与数据工程师一样,机器学习工程师也构建管道,但这些管道主要是为机器学习模型部署而构建的。机器学习工程师通常使用数据科学家创建的原型模型,并围绕它们构建机器学习管道。我们将讨论机器学习管道是什么,以及构建这些管道需要回答的一些问题。
机器学习模型是为了解决复杂问题并提供高级分析方法以服务于企业而构建的。在原型设计之后,这些模型需要在组织的生产环境中运行并部署以服务于客户。对于部署,需要考虑以下几个因素:
-
用于模型训练的数据有多少?
-
我们计划同时服务于多少客户?
-
我们需要多久重新训练一次模型?
-
我们预计数据多久会变化一次?
-
我们如何根据需求进行管道的扩展和缩减?
-
我们如何监控模型训练中的失败?
-
我们是否需要为失败发送通知?
-
我们是否需要为失败添加重试机制?
-
失败的超时策略是什么?
-
我们如何在生产中衡量模型性能?
-
我们如何应对数据漂移?
-
我们如何应对模型漂移?
一旦这些问题得到解答,下一步就是围绕这些模型构建一个管道。管道的主要目的是,当新数据到来时,预训练的模型能够根据新的数据集回答问题。
让我们用一个例子来更好地理解这些管道。我们将继续使用一个组织销售计算机硬件的第一个例子:
-
假设该组织希望在网站上构建一个推荐系统,向用户推荐购买哪些产品。
-
数据科学家已经构建了一个与测试数据表现良好的原型模型。现在,他们想将其部署到生产环境中。
-
为了部署这个模型,机器学习工程师需要看看他们如何将这个模型整合到网站上。
-
他们将从从网站上获取数据开始,以获取用户信息。
-
一旦他们有了信息,他们就会将其通过数据管道进行清洗和合并数据。
-
他们还可能想将一些预计算的特征添加到模型中,例如一年中的时间,以更好地了解是否是假日季节以及是否有一些特别优惠正在进行。
-
然后,他们需要一个 REST API 端点来获取网站上每个用户的最新推荐。
-
之后,网站需要连接到 REST 端点以服务实际客户。
-
一旦这些模型部署到实际系统中(在我们的例子中是网站),就需要有一个监控系统来监控模型或数据中的任何错误和变化。这分别被称为模型漂移和数据漂移。
数据漂移
数据可能会随时间变化。在我们的例子中,人们的偏好可能会随着时间的推移或季节性而变化,数据可能不同。例如,在假日季节,人们的偏好可能会略有变化,因为他们正在寻找为朋友和家人购买礼物,因此根据这些偏好推荐相关产品对业务至关重要。监控这些趋势和数据变化将随着时间的推移产生更好的模型,并最终有利于业务。
模型漂移
与数据漂移类似,我们还有模型漂移的概念。这意味着模型会随时间变化,最初构建的旧模型在向网站访客推荐项目方面不再是表现最佳的。随着数据的变化,模型也需要不时地进行更新。为了了解何时需要更新模型,我们需要对模型进行监控。这种监控会持续比较旧模型的结果与新数据,看模型性能是否下降。如果是这样,那就需要更新模型了。
这个模型部署的全生命周期通常是机器学习工程师的责任。请注意,对于不同的问题,这个过程会有所不同,但总体思路保持不变。
摘要
在本章中,我们学习了 Apache Spark 的基础知识以及为什么 Spark 在工业界的大数据应用中变得越来越普遍。我们还学习了 Spark 的不同组件以及这些组件在应用开发方面的帮助。然后,我们讨论了当前行业中存在的不同角色以及谁可以利用 Spark 的能力。最后,我们讨论了 Spark 在不同行业用例中的现代应用。
样题
虽然这些问题不是 Spark 认证的一部分,但回答这些问题以评估你对 Spark 基础知识的理解是很好的:
-
Spark 的核心组件有哪些?
-
我们在什么时候想使用 Spark Streaming?
-
Spark Streaming 中的回溯机制是什么?
-
Spark 有哪些好的用例?
-
在组织中哪些角色应该使用 Spark?
第三章:Spark 架构和转换
Spark 在数据处理方面与传统工具和技术不同。为了理解 Spark 的独特方法,我们必须了解其基本架构。深入了解 Spark 的架构及其组件将给你一个关于 Spark 如何实现其在大数据分析中突破性处理速度的思路。
在本章中,你将了解以下更广泛的话题:
-
Spark 架构和执行层次结构
-
Spark 的不同组件
-
Spark 驱动程序和 Spark 执行器的角色
-
Spark 的不同部署模式
-
作为 Spark 操作,转换和动作
到本章结束时,你将对 Spark 的内部工作原理有宝贵的见解,并知道如何有效地应用这些知识以通过你的认证考试。
Spark 架构
在前面的章节中,我们讨论了 Apache Spark 是一个开源的分布式计算框架,旨在处理大数据分析和处理。其架构旨在高效地处理各种工作负载,提供速度、可扩展性和容错性。理解 Spark 的架构对于理解其处理大量数据的能力至关重要。
Spark 架构的组成部分协同工作以高效处理数据。以下是一些主要组件:
-
Spark 驱动程序
-
SparkContext
-
集群管理器
-
工作节点
-
Spark 执行器
-
任务
在我们讨论这些组件之前,了解它们的执行层次结构对于知道每个组件在 Spark 程序启动时如何交互非常重要。
执行层次结构
让我们借助图 3.1中的架构来查看 Spark 应用程序的执行流程:
图 3.1:Spark 架构
这些步骤概述了从提交 Spark 作业到作业完成后释放资源的流程:
-
Spark 执行开始于用户向 Spark 引擎提交
spark-submit
请求。这将创建一个 Spark 应用程序。一旦执行了操作,就会创建一个作业。 -
此请求将启动与集群管理器的通信。作为回应,集群管理器初始化 Spark 驱动程序以执行 Spark 应用程序的
main()
方法。为了执行此方法,将创建SparkSession
。 -
驱动程序开始与集群管理器通信,并请求资源以开始规划执行。
-
集群管理器随后启动执行器,它们可以直接与驱动程序通信。
-
驱动程序创建一个逻辑计划,称为有向无环图(DAG),以及基于需要执行的任务总数的执行计划。
-
驱动程序还将数据分配给每个执行器运行,包括任务。
-
每个任务运行完成后,驱动程序会获取结果。
-
当程序运行完成后,
main()
方法退出,Spark 释放所有 executor 和驱动器资源。
现在你已经了解了执行层次结构,让我们详细讨论 Spark 的每个组件。
Spark 组件
让我们深入了解每个 Spark 组件的内部工作原理,以了解它们如何在每个组件中发挥关键作用,从而实现高效的分布式数据处理。
Spark 驱动器
Spark 驱动器是 Spark 中智能和高效计算的核心。Spark 遵循一种在网络拓扑中通常被称为主从架构的架构。将 Spark 驱动器视为主节点,将 Spark executors 视为从节点。驱动器在任何给定时间都控制并了解所有 executor。知道有多少 executor 存在以及是否有 executor 失败是驱动器的责任,以便它可以回退到其替代方案。Spark 驱动器还始终与 executor 保持通信。驱动器在机器或集群的主节点上运行。当 Spark 应用程序开始运行时,驱动器会跟踪所有成功运行应用程序所需的信息。
如图 3.1所示,驱动器节点包含SparkSession
,这是 Spark 应用程序的入口点。以前,这被称为SparkContext
对象,但在 Spark 2.0 中,SparkSession
处理所有上下文以启动执行。应用程序的主方法在驱动器上运行以协调整个应用程序。它在自己的Java 虚拟机(JVM)上运行。Spark 驱动器可以作为一个独立进程运行,也可以根据架构在工作者节点之一上运行。
Spark 驱动器负责将应用程序划分为更小的执行实体。这些实体被称为任务。你将在本章接下来的部分中了解更多关于任务的内容。Spark 驱动器还决定 executor 将处理哪些数据以及哪些任务将在哪个 executor 上运行。这些任务将在集群管理器的帮助下在 executor 节点上调度运行。由驱动器驱动的这些信息使得容错成为可能。由于驱动器拥有所有关于可用工作者数量以及每个工作者上运行的任务的信息,以及数据,以防工作者失败,因此可以将该任务重新分配到不同的集群。即使一个任务运行时间过长,如果另一个 executor 空闲,它也可以被分配到另一个 executor。在这种情况下,哪个 executor 先返回任务,哪个就占上风。Spark 驱动器还维护关于弹性分布式数据集(RDD)及其分区的元数据。
设计完整的执行图是 Spark 驱动程序的责任。它确定哪些任务在哪些执行器上运行,以及数据如何在这些执行器之间分布。这是通过内部创建 RDDs 来实现的。基于这种数据分布,确定所需的操作,例如程序中定义的转换和动作。基于这些决策创建一个 DAG。Spark 驱动程序优化逻辑计划(DAG),并为 DAG 寻找最佳执行策略,除了确定特定任务执行的最优位置。这些执行是并行进行的。执行器只是遵循这些命令,而不会在其端进行任何优化。
考虑性能因素,Spark 驱动程序靠近执行器工作是最优的。这大大减少了延迟。这意味着在进程的响应时间上会有更少的延迟。这里要注意的另一点是,这也适用于数据。读取数据的执行器靠近它会有比其他情况下更好的性能。理想情况下,驱动程序和工作节点应该在同一个 局域网(LAN)中运行以获得最佳性能。
Spark 驱动程序还会为执行细节创建一个 Web UI。这个 UI 在确定应用程序性能方面非常有帮助。在需要故障排除并需要在 Spark 过程中识别瓶颈的情况下,这个 UI 非常有用。
SparkSession
SparkSession
是与 Spark 交互的主要入口点。如前所述,在 Spark 的早期版本中,SparkContext
扮演着这个角色,但在 Spark 2.0 中,可以为此创建 SparkSession
。Spark 驱动程序创建一个 SparkSession
对象来与集群管理器交互,并通过它获取资源分配。
在应用程序的生命周期中,SparkSession
也用于与所有底层 Spark API 交互。我们曾在 第二章 中讨论了不同的 Spark API,即 SparkSQL、Spark Streaming、MLlib 和 GraphX。所有这些 API 都从其核心使用 SparkSession
来与 Spark 应用程序交互。
SparkSession
会跟踪整个应用程序执行过程中的 Spark 执行器。
集群管理器
Spark 是一个分布式框架,它需要访问计算资源。这种访问由称为集群管理器的过程进行管理和控制。当应用程序开始执行时,集群管理器的责任是为 Spark 应用程序分配计算资源。这些资源在应用程序主节点的请求下变得可用。在 Apache Spark 生态系统中,应用程序主节点在管理和协调分布式集群环境中 Spark 应用程序的执行中起着至关重要的作用。它是一个基本组件,负责协商资源、调度任务和监控应用程序的执行。
一旦资源可用,驱动程序就会知道这些资源。根据 Spark 应用程序需要执行的任务,管理这些资源是驱动程序的责任。一旦应用程序完成执行,这些资源就会释放回集群管理器。
应用程序有自己的专用执行程序进程,这些进程并行运行任务。其优势是每个应用程序都是独立的,并且按照自己的时间表运行。数据对于这些应用程序中的每一个也是独立的,因此数据共享只能通过将数据写入磁盘来实现,以便可以在应用程序之间共享。
集群模式
集群模式定义了 Spark 应用程序如何利用集群资源、管理任务执行以及与集群管理器进行资源分配的交互。
如果集群上有多个用户共享资源,无论是 Spark 应用程序还是需要集群资源的其他应用程序,它们必须根据不同的模式进行管理。集群管理器提供了两种模式类型——独立客户端模式和集群模式。以下表格突出了这两种模式之间的一些差异:
客户端模式 | 集群模式 |
---|---|
在客户端模式中,驱动程序程序在提交 Spark 应用程序的机器上运行。 | 在集群模式中,驱动程序程序在集群内部运行,在工作节点之一上。 |
驱动程序程序负责协调 Spark 应用程序的执行,包括创建 SparkContext 和协调任务。 |
集群管理器负责启动驱动程序程序并为执行分配资源。 |
客户端机器直接与集群管理器交互,请求资源并在工作节点上启动执行程序。 | 一旦启动驱动程序程序,它就会与集群管理器协调,请求资源并将任务分配给工作节点。 |
它可能不适合大规模应用程序的生产部署。 | 它通常用于生产部署,因为它允许更好的资源利用和可伸缩性。它还确保了容错性。 |
表 3.1:客户端模式与集群模式
现在,我们将讨论不同的部署模式和 Spark 中相应的管理器:
-
内置独立模式(Spark 的原生管理器):Spark 随带的一个简单集群管理器,适用于无需外部依赖的小到中等规模部署。
-
Apache YARN(Hadoop 的资源管理器):与 Spark 集成,YARN 使 Spark 应用程序能够高效地共享 Hadoop 的集群资源。
-
Apache Mesos(资源共享平台):Mesos 提供了跨多个应用程序的高效资源共享,允许 Spark 与其他框架并行运行。
我们将在本章后面更详细地讨论部署模式。
Spark 执行器
Spark 执行器是在工作节点上运行的进程,执行由驱动程序发送的任务。数据主要存储在内存中,但也可以写入它们最近的磁盘存储。驱动程序根据 Spark 为其执行生成的 DAG 启动执行器。一旦任务执行完成,执行器将结果发送回驱动程序。
由于驱动程序是 Spark 应用的主要控制器,如果执行器失败或执行任务花费时间过长,驱动程序可以选择将任务发送到其他可用的执行器。这确保了 Spark 的可靠性和容错性。我们将在本章后面了解更多关于这一点。
执行器的责任是从外部读取运行任务所需的数据。它还可以根据需要将其分区数据写入磁盘。一个任务的所有处理都由执行器完成。
执行器的主要功能如下:
-
任务执行:执行器运行 Spark 应用程序分配的任务,处理存储在 RDD 或 DataFrame 中的数据
-
资源分配:每个 Spark 应用程序都有一组由集群管理器分配的执行器,用于管理资源,如 CPU 内核和内存
在 Apache Spark 中,作业、阶段和任务的概念构成了其分布式计算框架的基本构建块。理解这些组件对于掌握 Spark 并行处理和任务执行的核心工作至关重要。见图 3.2* 了解这些概念之间的关系,同时我们详细讨论它们:
图 3.2:作业、阶段和任务之间的交互
让我们更仔细地看看:
-
collect
)。我们将在后面了解更多关于动作的内容。当在数据集上调用动作(如collect
或count
)时,它将触发一个或多个作业的执行。一个作业由几个阶段组成,每个阶段包含执行数据分区上一系列转换的任务。
-
阶段:每个作业被划分为可能依赖于其他阶段的阶段。阶段充当转换边界 - 它们在需要跨分区进行数据洗牌的宽转换的边界处创建。如果一个阶段依赖于前一个阶段的输出,那么这个阶段将不会开始执行,直到依赖的前一个阶段完成执行。
每个阶段被划分为一组在集群节点上执行的任务,以并行方式处理数据。
-
任务:在 Spark 中,任务是最小的执行单元。它是 Spark 编译和运行以执行一组操作的最小对象。它在 Spark 执行器上执行。任务本质上是一系列操作,如过滤、groupBy 等。
任务在执行器上并行运行。它们可以在多个节点上运行,并且彼此独立。这是通过槽位来实现的。每个任务处理数据分区的一部分。偶尔,一组任务需要完成执行才能开始下一个任务的执行。
现在我们已经理解了这些概念,让我们看看为什么它们在 Spark 中很重要:
-
并行处理:执行器、作业、阶段和任务协作以实现计算的并行执行,通过利用分布式计算优化性能。
-
任务粒度和效率:任务将计算分解成更小的单元,便于在集群节点间实现高效的资源利用和并行处理。
接下来,我们将讨论一个增强计算效率的重要概念。
Spark 中的分区
在 Apache Spark 中,分区是一个关键概念,用于在集群的多个节点上划分数据以实现并行处理。分区提高了数据局部性,增强了性能,并通过以结构化的方式分配数据来实现高效的计算。Spark 支持静态和动态分区策略来组织集群节点上的数据:
-
资源的静态分区:静态分区在所有集群管理器上可用。使用静态分区时,每个应用程序都分配了最大资源,并且这些资源在其生命周期内保持专用。
-
动态资源共享:动态分区仅在 Mesos 上可用。当动态共享资源时,Spark 应用程序获得固定的独立内存分配,类似于静态分区。主要区别在于,当任务没有被应用程序运行时,这些核心也可以被其他应用程序使用。
让我们讨论一下为什么分区很重要:
-
性能优化:有效的分区策略,无论是静态还是动态,都能通过提高数据局部性和减少数据洗牌来显著影响 Spark 的性能。
-
适应性和灵活性:动态分区能够在无需人工干预的情况下适应变化的数据大小或分布模式。
-
控制和可预测性:静态分区提供了对数据分布的控制和可预测性,这在某些用例中可能是有利的。
总结来说,在 Spark 中,无论是静态还是动态的分区策略,在优化跨集群节点数据分布、提高性能和确保数据高效并行处理方面都发挥着至关重要的作用。
Apache Spark 提供了不同的集群和部署模式,以在分布式计算环境中运行应用程序。我们将在下一节中探讨它们。
部署模式
Spark 中有几种不同的部署模式。这些部署模式定义了 Spark 应用程序如何在不同的计算基础设施中启动、执行和管理。基于这些不同的部署模式,将决定 Spark 驱动程序、执行器和集群管理器将在哪里运行。
Spark 中可用的不同部署模式如下:
-
本地:在本地模式下,Spark 驱动程序和执行器在单个 JVM 上运行,集群管理器在驱动程序和执行器相同的宿主机上运行。
-
独立:在独立模式下,驱动程序可以在集群的任何节点上运行,执行器将启动自己的独立 JVM。集群管理器可以保留在集群中的任何主机上。
-
YARN (客户端):在此模式下,Spark 驱动程序在客户端运行,YARN 的资源管理器在 NodeManagers 上为执行器分配容器。
-
YARN (集群):在此模式下,Spark 驱动程序与 YARN 应用程序主控一起运行,而 YARN 的资源管理器在 NodeManagers 上为执行器分配容器。
-
Kubernetes:在此模式下,驱动程序在 Kubernetes 容器中运行。执行器有自己的容器。
让我们看看关于不同部署模式的一些重要点:
-
资源利用率:不同的部署模式通过确定驱动程序程序运行的位置以及客户端和集群之间如何分配资源来优化资源利用率。
-
可访问性和控制:客户端模式提供对驱动程序日志和输出的轻松访问,便于开发和调试,而集群模式则更有效地利用集群资源来处理生产工作负载。
-
与容器编排集成:Kubernetes 部署模式允许与容器化环境无缝集成,利用 Kubernetes 的编排能力进行高效的资源管理。
选择部署模式时有一些考虑事项需要注意:
-
开发和生产:客户端模式适合开发和调试,而集群模式则适用于生产工作负载。
-
资源管理:根据应用程序的需求评估客户端和集群节点之间资源的分配
-
容器化需求:考虑在容器化环境中使用 Kubernetes 部署,利用 Kubernetes 的功能进行高效的容器管理
总结来说,Apache Spark 的部署模式提供了灵活性,以适应不同的开发、生产和容器化部署场景。
接下来,我们将探讨 RDD,它是 Apache Spark 中的基础数据抽象,使分布式处理、容错性和处理大规模数据操作时的灵活性成为可能。虽然 RDD 仍然是一个基本概念,但 Spark 的 DataFrame 和 Dataset API 在结构化数据处理和性能优化方面提供了进步。
RDD
Apache Spark 的 RDD 作为基础抽象,支撑了 Spark 框架内的分布式计算能力。RDD 是 Spark 中的核心数据结构,它使对大规模分布式数据集进行容错性和并行操作成为可能,并且它们是不可变的。这意味着它们不能随时间改变。对于任何操作,都必须从现有的 RDD 生成一个新的 RDD。当一个新 RDD 从原始 RDD 起源时,新 RDD 有一个指向其起源的 RDD 的指针。这是 Spark 记录 RDD 上所有变换谱系的方式。这种谱系使得 Spark 中的延迟评估成为可能,它为不同的操作生成 DAGs。
这种不可变性和谱系赋予了 Spark 在失败情况下重新生成任何 DataFrame 的能力,并使其设计上具有容错性。由于 RDD 是 Spark 中抽象层次最低的,因此所有建立在 RDD 之上的其他数据集都共享这些属性。高级 DataFrame API 也是建立在低级 RDD API 之上的,因此 DataFrame 也共享相同的属性。
RDD 也被 Spark 分区,并且每个分区被分配到集群中的多个节点。
这里是 Spark RDD 的一些关键特性:
-
不可变性质:RDD 是不可变的,确保一旦创建,就不能被修改,从而允许有变换的谱系。
-
通过谱系实现弹性:RDD 存储谱系信息,在发生故障时能够重建丢失的分区。Spark 被设计成具有容错性。因此,如果一个工作节点上的执行器在计算 RDD 时失败,可以使用 Spark 创建的谱系由另一个执行器重新计算该 RDD。
-
分区数据:RDD 将数据划分为分区,这些分区分布在集群中的多个节点上以实现并行处理。
-
并行执行:Spark 在分布式分区上并行执行 RDD 上的操作,从而提高性能。
让我们详细讨论一些其他特性。
懒计算
RDDs 支持惰性评估,将转换的执行延迟到动作被调用。Spark 在处理和容错方面的效率是通过惰性评估实现的。Spark 中的代码执行是延迟的。除非调用一个动作操作,否则 Spark 不会开始代码执行。这有助于 Spark 实现优化。对于所有转换和动作,Spark 通过为这些操作创建一个 DAG 来跟踪代码中需要执行的步骤。因为 Spark 在执行之前创建查询计划,所以它可以就执行层次结构做出明智的决策。为了实现这一点,Spark 使用的一个功能称为谓词下推。
谓词下推意味着 Spark 可以优先执行操作,使其最有效。一个例子可以是一个过滤操作。如果过滤操作可以在其他转换之前应用,那么过滤操作通常会减少后续操作需要处理的数据量。这正是 Spark 的操作方式。它会在流程中尽可能早地执行过滤操作,从而使后续操作更高效。
这也意味着 Spark 作业只有在执行时才会失败。由于 Spark 使用惰性评估,直到调用动作之前,代码不会执行,某些错误可能会被忽略。为了捕获这些错误,Spark 代码需要有一个动作用于执行和错误处理。
转换
转换通过将函数应用于现有的 RDDs(例如,map
、filter
和 reduce
)来创建新的 RDDs。转换是那些不会导致任何代码执行的操作。这些语句会导致 Spark 为执行创建一个 DAG。一旦创建了 DAG,Spark 就需要在最后运行代码时需要一个动作操作。由于这个原因,当某些开发者尝试测量 Spark 中的代码时间时,他们会看到某些操作的运行时间非常快。可能的原因是,代码直到那个点只包含转换。由于没有动作,代码不会运行。为了准确测量每个操作的运行时间,必须调用动作来强制 Spark 执行这些语句。
以下是一些可以被归类为转换的操作:
-
orderBy()
-
groupBy()
-
filter()
-
select()
-
join()
当这些命令执行时,它们是惰性评估的。这意味着所有这些对 DataFrames 的操作都会产生一个新的 DataFrame,但它们不会执行,直到一个动作跟随它们。当触发动作时,这将返回一个 DataFrame 或 RDD。
动作和计算执行
行动(例如,collect
、count
和 saveAsTextFile
)会触发 RDD 上的变换执行。执行仅由行动触发,而不是由变换触发。当调用行动时,这是 Spark 开始在代码分析阶段创建的 DAG 上执行的时候。有了创建的 DAG,Spark 会根据其内部优化创建多个查询计划。然后,它执行最有效和成本效益最高的计划。我们将在本书的后面讨论查询计划。
这里有一些可以归类为行动的操作:
-
show()
-
take()
-
count()
-
collect()
-
save()
-
foreach()
-
first()
所有这些操作都会导致 Spark 触发代码执行,因此操作会运行。
让我们看一下以下代码,以更好地理解这些概念:
# Python
>>> df = spark.read.text("{path_to_data_file}")
>>> names_df = df.select(col("firstname"),col("lastname"))
>>> names_df.show()
在前面的代码中,直到第 2 行,什么都不会执行。在第 3 行,触发了一个行动,因此触发了整个代码的执行。因此,如果你在第 1 行提供了错误的数据路径或在第 2 行提供了错误的列名,Spark 不会在执行到第 3 行之前检测到这一点。这与大多数其他编程范式不同。这就是我们所说的 Spark 中的懒加载。
行动会导致计算并收集结果以发送给驱动程序。
既然我们已经了解了 Spark 中变换和行动的基础知识,让我们继续了解它提供的两种变换类型。
变换类型
Apache Spark 的变换大致分为窄变换和宽变换,每种变换在分布式数据处理环境中都服务于不同的目的。
窄变换
窄变换,也称为本地变换,在数据单个分区上操作,不涉及在分区之间洗牌或重新分配数据。这些变换使 Spark 能够独立地在单个分区内处理数据。在窄变换中,Spark 将与单个输入分区和单个输出分区一起工作。这意味着这些类型的变换会导致可以在单个分区上执行的操作。数据不必从多个分区中取出或写回到多个分区。这导致不需要洗牌的操作。
这里是它们的一些特征:
-
分区级操作:窄变换在分区级别处理数据,在每个分区内执行计算。
-
独立性和本地处理:它们不需要数据在分区之间移动或通信,允许在分区内并行执行。
-
map
、filter
和flatMap
是窄变换的典型例子。
现在,让我们看看它们的重要性:
-
效率和速度:窄变换是高效的,因为它们涉及分区内的本地处理,减少了通信开销。
-
并行性:它们通过独立地对分区进行操作,实现最大并行性,从而优化性能
宽转换
宽转换,也称为全局或洗牌相关转换,涉及需要跨分区进行数据洗牌和重新分配的操作。这些转换涉及分区之间的依赖关系,需要数据交换。使用宽转换时,Spark 将使用多个分区上的数据,并且也可能将结果写回到多个分区。这些转换将强制进行洗牌操作,因此它们也被称为洗牌转换。
宽转换是复杂的操作。如果需要,它们需要在操作之间写入结果,并且在某些情况下还必须跨不同机器聚合数据。
这里是一些它们的特征:
-
数据洗牌:宽转换通过重新排列或聚合来自多个分区的数据来跨分区重新组织数据
-
对多个分区的依赖性:它们依赖于来自各个分区的数据,导致在集群中跨分区进行数据交换和重组
-
groupBy
、join
和sortByKey
是宽转换的典型例子
现在,让我们看看它们的显著性:
-
网络和磁盘开销:宽转换由于数据洗牌而引入网络和磁盘开销,影响性能
-
阶段边界创建:它们在 Spark 作业中定义阶段边界,导致在作业执行期间出现不同的阶段
以下是比较窄转换和宽转换之间的差异:
-
数据移动:窄转换在分区内部本地处理数据,最小化数据移动,而宽转换涉及数据洗牌和跨分区的数据移动
-
性能影响:窄转换通常由于数据移动减少而提供更高的性能,而宽转换由于数据洗牌而涉及额外的开销
-
并行性范围:窄转换在分区内实现最大并行性,而宽转换可能会由于对多个分区的依赖而限制并行性
在 Apache Spark 中,理解窄转换和宽转换之间的区别至关重要。窄转换在分区内的本地处理中表现出色,优化性能,而宽转换,尽管对于某些操作是必要的,但由于数据洗牌和跨分区的全局重组而引入了开销。
让我们看看 Spark RDDs 的重要性:
-
分布式数据处理:RDDs 允许在机器集群上分布式处理大规模数据,促进并行性和可伸缩性
-
容错性和可靠性:它们的不可变性和基于血缘的恢复确保了在分布式环境中的容错性和可靠性
-
操作的灵活性:RDDs 支持广泛的转换和操作,允许多样化的数据处理和操作。
进化与替代
虽然 RDDs 仍然是基础性的,但 Spark 的 DataFrame 和 Dataset API 提供了优化、高级别的抽象,适合结构化数据处理和优化。
Spark RDDs 是 Apache Spark 框架内分布式数据处理的基础,提供不可变性、容错性和在分布式数据集上执行并行操作的基础结构。尽管 RDDs 是基础性的,但 Spark 的 DataFrame 和 Dataset API 提供了性能和结构化数据处理方面的进步,满足 Spark 生态系统中的各种用例和偏好。
摘要
在本章中,我们学习了 Spark 的架构及其内部工作原理。这次对 Spark 分布式计算景观的探索涵盖了不同的 Spark 组件,如 Spark 驱动程序和 SparkSession
。我们还讨论了 Spark 中可用的不同类型的集群管理器。然后,我们简要介绍了 Spark 及其部署模式的不同分区类型。
接下来,我们讨论了 Spark 执行器、作业、阶段和任务,在学习 RDDs 及其转换类型之前,强调了它们之间的区别,并更多地了解了窄转换和宽转换。
这些概念构成了利用 Spark 在分布式数据处理和分析中巨大能力的基础。
在下一章中,我们将讨论 Spark DataFrame 及其相应的操作。
样题
问题 1:
关于 Spark 的执行层次结构,以下哪项是正确的?
-
在 Spark 的执行层次结构中,一个作业可能会达到多个阶段边界。
-
在 Spark 的执行层次结构中,作业描述文件位于作业之上的一层。
-
在 Spark 的执行层次结构中,一个阶段包含多个作业。
-
在 Spark 的执行层次结构中,执行器是最小的单元。
-
在 Spark 的执行层次结构中,任务位于槽位之上的一层。
问题 2:
执行器的作用是什么?
-
执行器在每个工作节点上托管 Spark 驱动程序。
-
执行器负责执行由驱动程序分配给它们的工作。
-
Spark 应用程序启动后,每个任务都会启动执行器。
-
执行器位于工作节点内的槽位中。
-
执行器的存储是短暂的,因此它将直接缓存数据的任务推迟到工作节点线程。
答案
-
A
-
B
第三部分:Spark 操作
在本部分,我们将涵盖 Spark DataFrame 及其操作,强调它们在结构化数据处理和分析中的作用。这包括 DataFrame 的创建、操作以及各种操作,如过滤、聚合、连接和分组,通过示例进行演示。然后,我们将讨论高级操作和优化技术,包括广播变量、累加器和自定义分区。本部分还将讨论性能优化策略,强调自适应查询执行的重要性,并提供提高 Spark 作业性能的实用技巧。此外,我们将探索 Spark 中的 SQL 查询,重点关注其类似 SQL 的查询能力和与 DataFrame API 的互操作性。示例将通过 Spark 中的 SQL 查询展示复杂的数据操作和分析。
本部分包含以下章节:
-
第四章, Spark DataFrame 及其操作
-
第五章, Spark 中的高级操作和优化
-
第六章, Spark 中的 SQL 查询
第四章:Spark DataFrame 及其操作
在本章中,我们将学习 Spark 中的一些不同 API,并讨论它们的特性。我们还将开始学习 Spark 的 DataFrame 操作,并查看不同的数据查看和操作技术,例如过滤、添加、重命名和删除 Spark 中可用的列。
我们将在以下主题下介绍这些概念:
-
Spark DataFrame API
-
创建 DataFrame
-
查看 DataFrame
-
操作 DataFrame
-
聚合 DataFrame
到本章结束时,你将了解如何使用 PySpark DataFrame。你还将发现各种数据操作技术,并了解在操作数据后如何查看数据。
PySpark 入门
在前面的章节中,我们讨论了 Spark 主要使用四种语言,即 Scala、Python、R 和 SQL。当使用这些语言中的任何一种时,底层的执行引擎是相同的。这为我们之前在第二章中提到的必要统一性提供了支持。这意味着开发者可以使用他们选择的任何语言,也可以在应用程序中在不同 API 之间切换。
对于本书的上下文,我们将重点关注 Python 作为主要语言。与 Python 一起使用的 Spark 被称为PySpark。
让我们开始安装 Spark。
安装 Spark
要开始使用 Spark,你首先需要在你的电脑上安装它。安装 Spark 有几种方法。在本节中,我们将只关注其中一种。
PySpark 提供了从PyPI的pip安装。你可以按照以下方式安装它:
pip install pyspark
一旦 Spark 安装完成,你需要创建一个 Spark 会话。
创建 Spark 会话
一旦你在系统中安装了 Spark,你就可以开始创建 Spark 会话。Spark 会话是任何 Spark 应用程序的入口点。要创建 Spark 会话,你需要按照以下方式初始化它:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
当你在 Spark shell 中运行代码时,Spark 会话会自动为你创建,因此你不需要手动执行此代码来创建 Spark 会话。这个会话通常在一个名为spark
的变量中创建。
重要的是要注意,在任何给定时间,我们只能创建一个 Spark 会话。在 Spark 中无法复制 Spark 会话。
现在,让我们看看 Spark DataFrames 中的不同数据 API。
Dataset API
Dataset 是添加到Spark 1.6的新接口。它是一个数据分布式集合。Dataset API 在 Java 和 Scala 中可用,但在 Python 和 R 中不可用。Dataset API 使用Resilient Distributed Datasets(RDDs),因此提供了 RDD 的附加功能,如固定类型。它还使用 Spark SQL 的优化引擎以实现更快的查询。
由于许多数据工程和数据科学社区已经熟悉 Python 并在生产中的数据架构中广泛使用它,PySpark 也为此提供了等效的 DataFrame API。让我们在下一节中看看它。
DataFrame API
Spark DataFrame 的动机来自 Python 中的 Pandas DataFrame。DataFrame 实质上是一组行和列。你可以将其想象成一个表格,其中表格头作为列名,在这些头下面是相应排列的数据。这种类似表格的格式已经在诸如关系数据库和逗号分隔文件等工具的计算中存在很长时间了。
Spark 的 DataFrame API 是建立在 RDD 之上的。存储数据的底层结构仍然是 RDD,但 DataFrame 在 RDD 上创建了一个抽象,以隐藏其复杂性。正如 RDD 是惰性评估且不可变的一样,DataFrame 也是惰性评估且不可变的。如果你还记得前面的章节,惰性评估通过仅在需要时运行计算为 Spark 带来了性能提升和优化。这也使得 Spark 在其 DataFrame 中通过规划如何最佳地计算操作而拥有大量的优化。计算是在对 DataFrame 调用动作时开始的。有无数种不同的方法可以创建 Spark DataFrame。我们将在本章中学习其中的一些。
让我们看看在 Spark 中 DataFrame 是什么。
创建 DataFrame 操作
正如我们已经讨论过的,DataFrame 是 Spark 数据的主要构建块。它们由行和列数据结构组成。
PySpark 中的 DataFrame 是通过 pyspark.sql.SparkSession.createDataFrame
函数创建的。你可以使用列表、列表的列表、元组、字典、Pandas DataFrame、RDD 和 pyspark.sql.Rows
来创建 DataFrame。
Spark DataFrame 也有一个名为 schema 的参数,用于指定 DataFrame 的模式。你可以选择显式指定模式,或者让 Spark 从 DataFrame 本身推断模式。如果你在代码中没有指定此参数,Spark 将会自行推断模式。
在 Spark 中创建 DataFrame 有不同的方法。其中一些将在以下章节中解释。
使用数据行列表
我们看到创建 DataFrame 的第一种方法是通过使用数据行。你可以将数据行想象成列表。它们将为列表中的每个值共享共同的头值。
创建新的 DataFrame 时使用数据行列表的代码如下:
import pandas as pd
from datetime import datetime, date
from pyspark.sql import Row
data_df = spark.createDataFrame([
Row(col_1=100, col_2=200., col_3='string_test_1', col_4=date(2023, 1, 1), col_5=datetime(2023, 1, 1, 12, 0)),
Row(col_1=200, col_2=300., col_3='string_test_2', col_4=date(2023, 2, 1), col_5=datetime(2023, 1, 2, 12, 0)),
Row(col_1=400, col_2=500., col_3='string_test_3', col_4=date(2023, 3, 1), col_5=datetime(2023, 1, 3, 12, 0))
])
因此,你会看到具有我们指定的列及其数据类型的 DataFrame:
DataFrame[col_1: bigint, col_2: double, col_3: string, col_4: date, col_5: timestamp]
现在,我们将看到如何可以显式指定 Spark DataFrame 的模式。
使用具有模式的行列表
DataFrame 的模式定义了 DataFrame 的每一行和每一列中可能存在的不同数据类型。显式定义模式有助于我们想要强制某些数据类型到数据集的情况。
现在,我们将明确告诉 Spark 我们创建的 DataFrame 应使用哪种模式。请注意,大部分代码保持不变——我们只是在创建 DataFrame 的代码中添加了一个名为schema
的另一个参数,以明确指定哪些列将有什么样的数据类型:
import pandas as pd
from datetime import datetime, date
from pyspark.sql import Row
data_df = spark.createDataFrame([
Row(col_1=100, col_2=200., col_3='string_test_1', col_4=date(2023, 1, 1), col_5=datetime(2023, 1, 1, 12, 0)),
Row(col_1=200, col_2=300., col_3='string_test_2', col_4=date(2023, 2, 1), col_5=datetime(2023, 1, 2, 12, 0)),
Row(col_1=400, col_2=500., col_3='string_test_3', col_4=date(2023, 3, 1), col_5=datetime(2023, 1, 3, 12, 0))
], schema=' col_1 long, col_2 double, col_3 string, col_4 date, col_5 timestamp')
因此,您将看到包含我们指定列及其数据类型的 DataFrame:
data_df
DataFrame[col_1: bigint, col_2: double, col_3: string, col_4: date, col_5: timestamp]
现在,让我们看看如何使用 Pandas DataFrame 创建 DataFrame。
使用 Pandas DataFrame
DataFrame 也可以使用 Pandas DataFrame 创建。为此,您首先需要在 Pandas 中创建一个 DataFrame。一旦创建,然后将其转换为 PySpark DataFrame。以下代码演示了此过程:
from datetime import datetime, date
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
rdd = spark.sparkContext.parallelize([
(100, 200., 'string_test_1', date(2023, 1, 1), datetime(2023, 1, 1, 12, 0)),
(200, 300., 'string_test_2', date(2023, 2, 1), datetime(2023, 1, 2, 12, 0)),
(300, 400., 'string_test_3', date(2023, 3, 1), datetime(2023, 1, 3, 12, 0))
])
data_df = spark.createDataFrame(rdd, schema=['col_1', 'col_2', 'col_3', 'col_4', 'col_5'])
因此,您将看到包含我们指定列及其数据类型的 DataFrame:
DataFrame[col_1: bigint, col_2: double, col_3: string, col_4: date, col_5: timestamp]
现在,让我们看看如何使用元组创建 DataFrame。
使用元组
创建 DataFrame 的另一种方式是通过元组。这意味着我们可以创建一个元组作为一行,并将每个元组作为 DataFrame 中的单独一行添加。每个元组将包含 DataFrame 中每列的数据。以下代码演示了这一点:
import pandas as pd
from datetime import datetime, date
from pyspark.sql import Row
rdd = spark.sparkContext.parallelize([
(100, 200., 'string_test_1', date(2023, 1, 1), datetime(2023, 1, 1, 12, 0)),
(200, 300., 'string_test_2', date(2023, 2, 1), datetime(2023, 1, 2, 12, 0)),
(300, 400., 'string_test_3', date(2023, 3, 1), datetime(2023, 1, 3, 12, 0))
])
data_df = spark.createDataFrame(rdd, schema=['col_1', 'col_2', 'col_3', 'col_4', 'col_5'])
因此,您将看到包含我们指定列及其数据类型的 DataFrame:
DataFrame[col_1: bigint, col_2: double, col_3: string, col_4: date, col_5: timestamp]
现在,让我们看看在 Spark 中我们可以以不同的方式查看 DataFrame,并查看我们刚刚创建的 DataFrame 的结果。
如何查看 DataFrame
Spark 中有不同的语句用于查看数据。我们在上一节中通过不同方法创建的 DataFrame,其结果都与 DataFrame 相同。让我们看看几种不同的查看 DataFrame 的方法。
查看 DataFrame
显示 DataFrame 的第一种方式是通过DataFrame.show()
语句。以下是一个示例:
data_df.show()
因此,您将看到包含我们指定列及其数据类型的 DataFrame:
+-----+-----+-------------+----------+-------------------+
|col_1|col_2| col_3 | col_4 | col_5 |
+-----+-----+-------------+----------+-------------------+
| 100|200.0|string_test_1|2023-01-01|2023-01-01 12:00:00|
| 200|300.0|string_test_2|2023-02-01|2023-01-02 12:00:00|
| 300|400.0|string_test_3|2023-03-01|2023-01-03 12:00:00|
+-----+-----+-------------+----------+-------------------+
我们还可以在单个语句中选择要查看的总行数。让我们在下一个主题中看看如何做到这一点。
查看前 n 行
我们也可以在单个语句中指定要查看的行数。我们可以通过DataFrame.show()
中的参数来控制这一点。以下是一个仅查看 DataFrame 前两行的示例。
如果您指定n为特定的数字,则只会显示那些行集。以下是一个示例:
data_df.show(2)
因此,您将看到包含其前两行的 DataFrame:
+-----+-----+-------------+----------+-------------------+
|col_1|col_2| col_3 | col_4 | col_5 |
+------+------+-----------+----------+-------------------+
| 100|200.0|string_test_1|2023-01-01|2023-01-01 12:00:00|
| 200|300.0|string_test_2|2023-02-01|2023-01-02 12:00:00|
------+-----+-------------+----------+-------------------+
only showing top 2 rows.
查看 DataFrame 模式
我们还可以选择使用printSchema()
函数查看 DataFrame 的模式:
data_df.printSchema()
因此,您将看到 DataFrame 的模式,包括我们指定的列及其数据类型:
root
|-- col_1: long (nullable = true)
|-- col_2: double (nullable = true)
|-- col_3: string (nullable = true)
|-- col_4: date (nullable = true)
|-- col_5: timestamp (nullable = true)
垂直查看数据
当数据太长而无法适应屏幕时,以垂直格式查看数据而不是水平表格视图有时很有用。以下是一个示例,说明您如何以垂直格式查看数据:
data_df.show(1, vertical=True)
因此,您将看到包含我们指定列及其数据但以垂直格式的 DataFrame:
-RECORD 0------------------
col_1 | 100
col_2 | 200.0
col_3 | string_test_1
col_4 | 2023-01-01
col_5 | 2023-01-01 12:00:00
only showing top 1 row
查看数据列
当我们只需要查看 DataFrame 中存在的列时,我们会使用以下方法:
data_df.columns
因此,您将看到一个 DataFrame 中列的列表:
['col_1', 'col_2', 'col_3', 'col_4', 'col_5']
查看摘要统计信息
现在,让我们看看我们如何查看 DataFrame 的摘要统计信息:
Show the summary of the DataFrame
data_df.select('col_1', 'col_2', 'col_3').describe().show()
因此,您将看到一个 DataFrame,其中定义了每列的摘要统计信息:
+-------+-------+-------+-------------+
|summary| col_1 | col_2 | col_3 |
+-------+-------+-------+-------------+
| count| 3 | 3 | 3|
| mean| 200.0 | 300.0 | null|
| stddev| 100.0 | 100.0 | null|
| min| 100 | 200.0 |string_test_1|
| max| 300 | 400.0 |string_test_3|
+-------+-------+-------+-------------+
现在,让我们看一下 collect 语句。
收集数据
当我们想要将不同集群中正在处理的所有数据收集回驱动器时,会使用 collect 语句。在使用 collect 语句时,我们需要确保驱动器有足够的内存来存储处理后的数据。如果驱动器没有足够的内存来存储数据,我们将遇到内存不足错误。
这就是显示 collect 语句的方法:
data_df.collect()
此语句将显示如下结果:
[Row(col_1=100, col_2=200.0, col_3='string_test_1', col_4=datetime.date(2023, 1, 1), col_5=datetime.datetime(2023, 1, 1, 12, 0)),
Row(col_1=200, col_2=300.0, col_3='string_test_2', col_4=datetime.date(2023, 2, 1), col_5=datetime.datetime(2023, 1, 2, 12, 0)),
Row(col_1=300, col_2=400.0, col_3='string_test_3', col_4=datetime.date(2023, 3, 1), col_5=datetime.datetime(2023, 1, 3, 12, 0))]
有几种方法可以避免内存不足错误。我们将探讨一些避免内存不足错误的方法,例如 take、tail 和 head 语句。这些语句仅返回数据的一个子集,而不是 DataFrame 中的所有数据,因此,它们在无需将所有数据加载到驱动器内存中时非常有用。
现在,让我们看一下 take 语句。
使用 take
take 语句接受一个参数,用于从 DataFrame 顶部返回元素的数量。我们将在下面的代码示例中看到它是如何使用的:
data_df.take(1)
因此,您将看到一个包含其顶部行的 DataFrame:
[Row(col_1=100, col_2=200.0, col_3='string_test_1', col_4=datetime.date(2023, 1, 1), col_5=datetime.datetime(2023, 1, 1, 12, 0))]
在这个例子中,我们通过将1
作为take()
函数的参数值来仅返回 DataFrame 的第一个元素。因此,结果中只返回了一行。
现在,让我们看一下 tail 语句。
使用 tail
tail 语句接受一个参数,用于从 DataFrame 底部返回元素的数量。我们将在下面的代码示例中看到它是如何使用的:
data_df.tail(1)
因此,您将看到一个包含其最后一行数据的 DataFrame:
[Row(col_1=300, col_2=400.0, col_3='string_test_3', col_4=datetime.date(2023, 3, 1), col_5=datetime.datetime(2023, 1, 3, 12, 0))]
在这个例子中,我们通过将1
作为tail()
函数的参数值来仅返回 DataFrame 的最后一个元素。因此,结果中只返回了一行。
现在,让我们看一下 head 语句。
使用 head
head 语句接受一个参数,用于从 DataFrame 顶部返回元素的数量。我们将在下面的代码示例中看到它是如何使用的:
data_df.head(1)
因此,您将看到一个包含其数据顶部行的 DataFrame:
[Row(col_1=100, col_2=200.0, col_3='string_test_1', col_4=datetime.date(2023, 1, 1), col_5=datetime.datetime(2023, 1, 1, 12, 0))]
在这个例子中,我们通过将1
作为head()
函数的参数值来仅返回 DataFrame 的第一个元素。因此,结果中只返回了一行。
现在,让我们看看我们如何计算 DataFrame 中的行数。
计算数据行数
当我们只需要计算 DataFrame 中的行数时,我们会使用以下方法:
data_df.count()
因此,您将看到一个 DataFrame 中的总行数:
3
在 PySpark 中,有几种方法可以用于从 DataFrame 或 RDD 中检索数据,每种方法都有其自身的特性和用例。以下是我们在本节早期用于数据检索的 take
、collect
、show
、head
和 tail
之间主要差异的总结。
take(n)
此函数返回一个包含 DataFrame 或 RDD 的前 n 个元素的数组
-
它用于快速检查数据的小子集
-
它执行惰性评估,这意味着它只计算所需数量的元素
collect()
此函数从 DataFrame 或 RDD 中检索所有元素,并将它们作为列表返回
-
应谨慎使用,因为它会将所有数据带到驱动节点,这可能导致大型数据集出现内存不足错误
-
它适用于小型数据集或处理适合内存的聚合结果时
show(n)
此函数以表格格式显示 DataFrame 的前 n 行
-
它主要用于在 探索性数据分析 (EDA) 或调试期间对数据进行视觉检查
-
它以具有列标题和格式的用户友好的方式显示数据
head(n)
此函数返回 DataFrame 的前 n 行,作为 Row
对象的列表
-
它与
take(n)
类似,但返回Row
对象而不是简单值 -
当处理结构化数据时,需要访问特定列值时经常使用
tail(n)
此函数返回 DataFrame 的最后 n 行
-
它在检查数据集的末尾时很有用,尤其是在数据按降序排序的情况下
-
与
head(n)
相比,它执行的操作更昂贵,因为它可能涉及扫描整个数据集
总结来说,take
和 collect
用于检索数据元素,其中 take
更适合小子集,而 collect
用于检索所有数据(需谨慎)。show
用于视觉检查,head
检索前几行作为 Row
对象,而 tail
检索数据集的最后几行。每种方法都有不同的用途,应根据数据分析任务的具体要求进行选择。
当在 PySpark 中处理数据时,有时您需要在 DataFrames 上使用一些 Python 函数。为了实现这一点,您必须将 PySpark DataFrames 转换为 Pandas DataFrames。现在,让我们看看如何将 PySpark DataFrame 转换为 Pandas DataFrame。
将 PySpark DataFrame 转换为 Pandas DataFrame
在您的工作流程的各个阶段,您可能希望从 PySpark DataFrame 切换到 Pandas DataFrame。有选项可以将 PySpark DataFrame 转换为 Pandas DataFrame。此选项是 toPandas()
。
这里需要注意的是,Python 本身不是分布式的。因此,当 PySpark DataFrame 转换为 Pandas 时,驱动程序需要收集其内存中的所有数据。我们需要确保驱动程序的内存能够收集自身的数据。如果数据无法适应驱动程序的内存,将导致内存不足错误。
以下是一个示例,说明如何将 PySpark DataFrame 转换为 Pandas DataFrame:
data_df.toPandas()
因此,您将看到具有我们指定的列及其数据类型的 DataFrame:
col_1 |
col_2 |
col_3 |
col_4 |
col_5 |
|
---|---|---|---|---|---|
0 |
100 |
200.0 |
String_test_1 |
2023-01-01 |
2023-01-01 12:00:00 |
1 |
200 |
300.0 |
String_test_2 |
2023-02-01 |
2023-01-02 12:00:00 |
2 |
300 |
400.0 |
String_test_3 |
2023-03-01 |
2023-01-03 12:00:00 |
表 4.1:我们指定的列和数据类型的 DataFrame
在下一节中,我们将了解不同的数据操作技术。您将需要根据不同的目的根据不同的标准对数据进行筛选、切片和切块。因此,数据操作在处理数据时是必不可少的。
如何在行和列上操作数据
在本节中,我们将学习如何在 Spark DataFrame 的行和列上执行不同的数据操作。
我们将首先看看如何在 Spark DataFrame 中选择列。
选择列
我们可以在 Spark DataFrame 中使用列函数在列级别进行数据操作。要选择 DataFrame 中的列,我们将使用 select()
函数如下:
from pyspark.sql import Column
data_df.select(data_df.col_3).show()
因此,您将只看到 DataFrame 中的一个列及其数据:
+-------------+
| col_3 |
+-------------+
|string_test_1|
|string_test_2|
|string_test_3|
+-------------+
The important thing to note here is that the resulting DataFrame with one column is a new DataFrame. Recalling what we discussed in *Chapter 3*, RDDs are immutable. The underlying data structure for DataFrames is RDDs, therefore, DataFrames are also immutable. This means every time you make a change to a DataFrame, a new DataFrame would be created out of it. You would either have to assign the resultant DataFrame to a new DataFrame or overwrite the original DataFrame.
在 PySpark 中也有其他一些方法可以达到相同的结果。其中一些在这里进行了演示:
data_df.select('col_3').show()
data_df.select(data_df['col_3']).show()
一旦我们选择了所需的列,就可能出现需要向 DataFrame 中添加新列的情况。现在我们将看看如何在 Spark DataFrame 中创建列。
创建列
我们可以使用 withColumn()
函数在 DataFrame 中创建新列。要创建新列,我们需要传递列名和列值以填充该列。在以下示例中,我们创建了一个名为 col_6
的新列,并在该列中放置了一个常量字面量 A
:
from pyspark.sql import functions as F
data_df = data_df.withColumn("col_6", F.lit("A"))
data_df.show()
因此,您将看到具有一个名为 col_6
的附加列,该列填充了多个 A
实例:
lit()
函数用于在列中填充常量值。
您还可以删除 DataFrame 中不再需要的列。现在我们将看看如何在 Spark DataFrame 中删除列。
删除列
如果我们需要从 Spark DataFrame 中删除列,我们将使用 drop()
函数。我们需要提供要删除的列的名称。以下是如何使用此函数的示例:
data_df = data_df.drop("col_5")
data_df.show()
因此,您将看到 DataFrame 中已删除 col_5
:
+-----+-----+-------------+----------+------+
|col_1|col_2| col_3 | col_4 | col_6|
+-----+-----+-------------+----------+------+
| 100|200.0|string_test_1|2023-01-01| A|
| 200|300.0|string_test_2|2023-02-01| A|
| 300|400.0|string_test_3|2023-03-01| A|
+-----+-----+-------------+----------+------+
我们已成功从该 DataFrame 中删除了 col_5
。
你也可以在同一个删除语句中删除多个列:
data_df = data_df.drop("col_4", "col_5")
还要注意,如果我们删除 DataFrame 中不存在的列,不会产生任何错误。结果 DataFrame 将保持原样。
就像删除列一样,你还可以在 Spark DataFrame 中更新列。现在,我们将看看如何在 Spark DataFrame 中更新列。
更新列
更新列也可以通过 Spark 中的 withColumn()
函数来完成。我们需要提供要更新的列的名称以及更新的值。注意,我们还可以使用此函数为列计算一些新值。以下是一个示例:
data_df.withColumn("col_2", F.col("col_2") / 100).show()
这将给我们以下更新的框架:
+-----+-----+-------------+----------+------+
|col_1|col_2| col_3 | col_4 | col_6|
+-----+-----+-------------+----------+------+
| 100|200.0|string_test_1|2023-01-01| A|
| 200|300.0|string_test_2|2023-02-01| A|
| 300|400.0|string_test_3|2023-03-01| A|
+-----+-----+-------------+----------+------+
这里需要注意的是,在更新列时使用 col
函数。此函数用于列操作。如果我们不使用此函数,我们的代码将返回错误。
如果你只需要重命名列,而不需要更新 DataFrame 中的列,你不必总是更新列。现在,我们将看看如何在 Spark DataFrame 中重命名列。
重命名列
为了更改列名,我们会在 Spark 中使用 withColumnRenamed()
函数。我们需要提供需要更改的列名以及新的列名。以下是说明这一点的代码示例:
data_df = data_df.withColumnRenamed("col_3", "string_col")
data_df.show()
因此,我们将看到以下变化:
+-----+-----+-------------+----------+------+
|col_1|col_2| string_col | col_4 | col_6|
+-----+-----+-------------+----------+------+
| 100|200.0|string_test_1|2023-01-01| A |
| 200|300.0|string_test_2|2023-02-01| A |
| 300|400.0|string_test_3|2023-03-01| A |
+-----+-----+-------------+----------+------+
注意,更改后 col_3
现在被称为 string_col
。
现在,让我们将注意力转向一些 Spark DataFrame 中的数据处理技术。你可以在 Spark DataFrame 中拥有类似搜索的功能,用于查找列中的不同值。现在,让我们看看如何在 Spark DataFrame 的列中查找唯一值。
在列中查找唯一值
查找唯一值是一个非常实用的函数,它将给我们列中的不同值。为此,我们可以使用 Spark DataFrame 的 distinct()
函数,如下所示:
data_df.select("col_6").distinct().show()
这里是结果:
+------+
|col_6 |
+------+
| A |
+------+
我们在 col_6
上应用了 distinct
函数,以获取该列中所有唯一的值。在我们的例子中,该列只有一个不同的值,即 A
,所以显示了它。
我们还可以使用它来查找给定列中不同值的数量。以下是如何使用此函数的示例:
data_df.select(F.countDistinct("col_6").alias("Total_Unique")).show()
这里是结果:
+------+
|col_6 |
+------+
| 1 |
+------+
在这个例子中,我们可以看到 col_6
中不同值的总数。目前,它是这个列中唯一的不同值类型,因此返回了 1
。
Spark 数据操作中另一个有用的函数是更改列的大小写。现在,让我们看看如何在 Spark DataFrame 中更改列的大小写。
更改列的大小写
Spark 中也存在一个用于更改列大小写的函数。我们不需要分别指定列的每个值来使用此函数。一旦应用,整个列的值都会更改大小写。以下是一个这样的例子:
from pyspark.sql.functions import upper
data_df.withColumn('upper_string_col', upper(data_df.string_col)).show()
这里是结果:
在这个例子中,我们将 string_col
的字母大小写改为全部大写。我们需要将这个结果分配给一个新列,因此,我们创建了一个名为 upper_string_col
的列来存储这些大写值。同时,请注意,这个列没有被添加到原始的 data_df
中,因为我们没有将结果保存回 data_df
。
在数据处理中,很多时候我们需要函数来过滤 DataFrame。现在,我们将看看如何在 Spark DataFrame 中过滤数据。
过滤 DataFrame
过滤 DataFrame 意味着我们可以从 DataFrame 中获取行或列的子集。有几种不同的方法可以过滤 Spark DataFrame。在这里,我们将看看一个例子:
data_df.filter(data_df.col_1 == 100).show()
这里是结果:
+-----+-----+-------------+----------+------+
|col_1|col_2| string_col | col_4 |col_6 |
+-----+-----+-------------+----------+------+
| 100|200.0|string_test_1|2023-01-01| A |
+-----+-----+-------------+----------+------+
在这个例子中,我们过滤 data_df
以仅包括 col_1
的列值等于 100
的行。只有一行满足这个标准,因此,结果 DataFrame 中只返回一行。
您可以使用此函数根据要求以多种方式对数据进行切片和切块。
由于我们正在讨论数据过滤,我们也可以根据逻辑运算符来过滤数据。现在,我们将看看如何在 DataFrames 中使用逻辑运算符来过滤数据。
DataFrame 中的逻辑运算符
我们还可以将逻辑运算符与 DataFrame 中的过滤操作结合使用,这是一组重要的运算符。这些包括 AND 和 OR 运算符等。它们用于根据复杂条件过滤 DataFrame。让我们看看如何使用 AND 运算符:
data_df.filter((data_df.col_1 == 100)
& (data_df.col_6 == 'A')).show()
这里是结果:
+-----+------+-------------+----------+------+
|col_1| col_2| string_col | col_4 |col_6 |
+-----+------+-------------+----------+------+
| 100| 200.0|string_test_1|2023-01-01| A |
+-----+------+-------------+----------+------+
在这个例子中,我们试图获取 col_1
的值为 100
且 col_6
的值为 A
的行。目前,只有一行满足这个条件,因此,只有一行作为结果返回。
现在,让我们看看如何使用 OR 运算符来组合条件:
data_df.filter((data_df.col_1 == 100)
| (data_df.col_2 == 300.00)).show()
此语句将给出以下结果:
+-----+------+-------------+----------+------+
|col_1| col_2| string_col | col_4 | col_6|
+-----+------+-------------+----------+------+
| 100| 200.0|string_test_1|2023-01-01| A |
| 200| 300.0|string_test_2|2023-02-01| A |
+-----+------+-------------+----------+------+
在这个例子中,我们试图获取 col_1
的值为 100
或 col_2
的值为 300.0
的行。目前,有两行满足这个条件,因此,它们被作为结果返回。
在数据过滤中,还有一个重要的函数用于在列表中查找值。现在,我们将看看如何在 PySpark 中使用 isin()
函数。
使用 isin()
isin()
函数用于在 DataFrame 列中查找存在于列表中的值。为此,我们需要创建一个包含一些值的列表。一旦我们有了这个列表,我们就会使用 isin()
函数来查看列表中的某些值是否存在于 DataFrame 中。以下是一个演示此功能的例子:
list = [100, 200]
data_df.filter(data_df.col_1.isin(list)).show()
这里是结果:
+-----+-----+-------------+----------+------+
|col_1|col_2| string_col | col_4 |col_6 |
+-----+-----+-------------+----------+------+
| 100|200.0|string_test_1|2023-01-01| A |
| 200|300.0|string_test_2|2023-02-01| A |
+-----+-----+-------------+----------+------+
在这个例子中,我们可以看到值 100
和 200
出现在 data_df
DataFrame 的两行中,因此,这两行都被返回。
我们还可以在 Spark DataFrames 的不同列中转换数据类型。现在,让我们看看如何在 Spark DataFrames 中转换不同的数据类型。
数据类型转换
在本节中,我们将看到在 Spark DataFrame 列中转换数据类型的不同方法。
我们将首先使用 Spark 中的 cast
函数。以下代码说明了这一点:
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,BooleanType,DateType,IntegerType
data_df_2 = data_df.withColumn("col_4",col("col_4").cast(StringType())) \
.withColumn("col_1",col("col_1").cast(IntegerType()))
data_df_2.printSchema()
data_df.show()
这里是结果:
root
|-- col_1: integer (nullable = true)
|-- col_2: double (nullable = true)
|-- string_col: string (nullable = true)
|-- col_4: string (nullable = true)
|-- col_6: string (nullable = false)
+-----+-----+-------------+----------+-----+
|col_1|col_2| string_col| col_4|col_6|
+-----+-----+-------------+----------+-----+
| 100|200.0|string_test_1|2023-01-01| A|
| 200|300.0|string_test_2|2023-02-01| A|
| 300|400.0|string_test_3|2023-03-01| A|
+-----+-----+-------------+----------+-----+
在前面的代码中,我们看到我们正在更改两个列的数据类型,即 col_4
和 col_1
。首先,我们将 col_4
改为字符串类型。这个列之前是日期列。然后,我们将 col_1
从 long
类型改为整数类型。
这里是 data_df
的模式,仅供参考:
root
|-- col_1: long (nullable = true)
|-- col_2: double (nullable = true)
|-- string_col: string (nullable = true)
|-- col_4: date (nullable = true)
|-- col_5: timestamp (nullable = true)
|-- col_6: string (nullable = false)
我们看到 col_1
和 col_4
是不同的数据类型。
下一个示例是通过使用 selectExpr()
函数来更改列的数据类型。以下代码说明了这一点:
data_df_3 = data_df_2.selectExpr("cast(col_4 as date) col_4",
"cast(col_1 as long) col_1")
data_df_3.printSchema()
data_df_3.show(truncate=False)
这里是结果:
root
|-- col_4: date (nullable = true)
|-- col_1: long (nullable = true)
+----------+-----+
| col_4 |col_1|
+----------+-----+
|2023-01-01| 100|
|2023-02-01| 200|
|2023-03-01| 300|
+----------+-----+
在前面的代码中,我们看到我们正在更改两个列的数据类型,即 col_4
和 col_1
。首先,我们将 col_4
改回 date
类型。然后,我们将 col_1
改为 long
类型。
下一个示例是通过使用 SQL 来更改列的数据类型。以下代码说明了这一点:
data_df_3.createOrReplaceTempView("CastExample")
data_df_4 = spark.sql("SELECT DOUBLE(col_1), DATE(col_4) from CastExample")
data_df_4.printSchema()
data_df_4.show(truncate=False)
这里是结果:
root
|-- col_1: double (nullable = true)
|-- col_4: date (nullable = true)
+-----+----------+
|col_1| col_4 |
+-----+----------+
|100.0|2023-01-01|
|200.0|2023-02-01|
|300.0|2023-03-01|
+-----+----------+
在前面的代码中,我们看到我们正在更改两个列的数据类型,即 col_4
和 col_1
。首先,我们使用 createOrReplaceTempView()
创建一个名为 CastExample
的表。然后,我们使用这个表将 col_4
改回 date
类型。然后,我们将 col_1
改为 double
类型。
在数据分析领域,处理空值非常有价值。现在,让我们看看我们如何从 DataFrame 中删除空值。
从 DataFrame 中删除空值
有时,在数据中存在可能导致清洁数据变得混乱的空值。删除空值是许多数据分析师和数据工程师需要做的基本练习。PySpark 提供了相关的函数来完成这项工作。
让我们创建另一个名为 salary_data
的 DataFrame 来展示一些后续操作:
salary_data = [("John", "Field-eng", 3500),
("Michael", "Field-eng", 4500),
("Robert", None, 4000),
("Maria", "Finance", 3500),
("John", "Sales", 3000),
("Kelly", "Finance", 3500),
("Kate", "Finance", 3000),
("Martin", None, 3500),
("Kiran", "Sales", 2200),
("Michael", "Field-eng", 4500)
]
columns= ["Employee", "Department", "Salary"]
salary_data = spark.createDataFrame(data = salary_data, schema = columns)
salary_data.printSchema()
salary_data.show()
这里是结果:
root
|-- Employee: string (nullable = true)
|-- Department: string (nullable = true)
|-- Salary: long (nullable = true)
+--------+----------+------+
|Employee|Department|Salary|
+--------+----------+------+
| John| Field-eng| 3500 |
| Michael| Field-eng| 4500 |
| Robert| null| 4000 |
| Maria| Finance| 3500 |
| John| Sales| 3000 |
| Kelly| Finance| 3500 |
| Kate| Finance| 3000 |
| Martin| null| 3500 |
| Kiran| Sales| 2200 |
| Michael| Field-eng| 4500 |
+--------+----------+------+
现在,让我们看看 dropna()
函数;这将帮助我们删除 DataFrame 中的空值:
salary_data.dropna().show()
这里是结果:
+--------+----------+------+
|Employee|Department|Salary|
+--------+----------+------+
| John | Field-eng| 3500 |
| Michael| Field-eng| 4500 |
| Maria | Finance| 3500 |
| John | Sales| 3000 |
| Kelly | Finance| 3500 |
| Kate | Finance| 3000 |
| Kiran | Sales| 2200 |
| Michael| Field-eng| 4500 |
+--------+----------+------+
在结果 DataFrame 中,我们看到当使用 dropna()
函数时,带有 Robert
和 Martin
的行从新的 DataFrame 中被删除。
去重数据是数据分析任务中经常需要的一种有用技术。现在,让我们看看我们如何从 DataFrame 中删除重复值。
从 DataFrame 中删除重复项
有时,在数据中存在冗余值,这会使清洁数据变得混乱。删除这些值可能在许多用例中是必要的。PySpark 提供了 dropDuplicates()
函数来执行此操作。以下是说明此操作的代码:
new_salary_data = salary_data.dropDuplicates().show()
这里是结果:
+--------+----------+------+
|Employee|Department|Salary|
+--------+----------+------+
| John| Field-eng| 3500|
| Michael| Field-eng| 4500|
| Maria| Finance| 3500|
| John| Sales| 3000|
| Kelly| Finance| 3500|
| Kate| Finance| 3000|
| Kiran| Sales| 2200|
+--------+----------+------+
在这个例子中,我们看到在将 dropDuplicates()
函数应用于原始 DataFrame 后,名为 Michael 的员工在结果 DataFrame 中只显示一次。这个名称及其对应值在原始 DataFrame 中出现了两次。
现在我们已经了解了不同的数据过滤技术,接下来我们将看看如何在 PySpark DataFrame 中进行数据聚合。
在 DataFrame 中使用聚合
Spark 中用于聚合数据的一些方法如下:
-
agg
-
avg
-
count
-
max
-
mean
-
min
-
pivot
-
sum
我们将在下面的代码示例中看到一些实际操作。
平均值(avg)
在下面的示例中,我们看到如何在 Spark 中使用聚合函数。我们将从计算列中所有值的平均值开始:
from pyspark.sql.functions import countDistinct, avg
salary_data.select(avg('Salary')).show()
这里是结果:
+-----------+
|avg(Salary)|
+-----------+
| 3520.0|
+-----------+
这个示例计算了 salary_data
DataFrame 中薪水列的平均值。我们将 Salary
列传递给了 avg
函数,它为我们计算了该列的平均值。
现在,让我们看看如何在 PySpark DataFrame 中计数不同的元素。
计数
在下面的代码示例中,我们可以看到如何在 Spark 中使用聚合函数:
salary_data.agg({'Salary':'count'}).show()
这里是结果:
+-------------+
|count(Salary)|
+-------------+
| 10|
+-------------+
这个示例计算了 salary_data
DataFrame 中 Salary
列的值的总数。我们将 Salary
列传递给了 agg
函数,并将 count
作为其另一个参数,它为我们计算了该列的计数。
现在,让我们看看如何在 PySpark DataFrame 中计数不同的元素。
计数独立值
在下面的示例中,我们将查看如何在 PySpark DataFrame 中计数不同的元素:
salary_data.select(countDistinct("Salary").alias("Distinct Salary")).show()
这里是结果:
+---------------+
|Distinct Salary|
+---------------+
| 5|
+---------------+
这个示例计算了 salary_data
DataFrame 中薪水列的总独立值。我们将 Salary
列传递给了 countDistinct
函数,它为我们计算了该列的计数。
现在,让我们看看如何在 PySpark DataFrame 中查找最大值。
查找最大值(max)
在下面的代码示例中,我们将查看如何在 PySpark DataFrame 的列中查找最大值:
salary_data.agg({'Salary':'max'}).show()
这里是结果:
+-----------+
|max(Salary)|
+-----------+
| 4500|
+-----------+
这个示例计算了 salary_data
DataFrame 中 Salary
列的所有值的最大值。我们将 Salary
列传递给了 agg
函数,并将 max
作为其另一个参数,它为我们计算了该列的最大值。
现在,让我们看看如何在 PySpark DataFrame 中获取所有元素的总和。
总和
在下面的代码示例中,我们将查看如何在 PySpark DataFrame 中求和所有值:
salary_data.agg({'Salary':'sum'}).show()
这里是结果:
+-----------+
|sum(Salary)|
+-----------+
| 35200|
+-----------+
这个示例计算了 salary_data
DataFrame 中 Salary
列所有值的总和。我们将 Salary
列传递给了 agg
函数,并将 sum
作为其另一个参数,它为我们计算了该列的总和。
现在,让我们看看如何在 PySpark DataFrame 中排序数据。
使用 OrderBy 排序数据
在下面的代码示例中,我们将查看如何在 PySpark DataFrame 中按升序排序数据:
salary_data.orderBy("Salary").show()
这里是结果:
+--------+----------+------+
|Employee|Department|Salary|
+--------+----------+------+
| Kiran | Sales | 2200 |
| Kate | Finance | 3000 |
| John | Sales | 3000 |
| John | Field-eng| 3500 |
| Martin | null | 3500 |
| Kelly | Finance | 3500 |
| Maria | Finance | 3500 |
| Robert | null | 4000 |
| Michael| Field-eng| 4500 |
| Michael| Field-eng| 4500 |
+--------+----------+------+
本例根据salary_data
DataFrame 中Salary
列的值对整个 DataFrame 进行排序。我们将Salary
列传递给orderBy
函数,它根据此列对 DataFrame 进行了排序。
我们还可以通过向原始orderBy
函数中添加另一个函数desc()
来按降序格式对数据进行排序。以下示例说明了这一点:
salary_data.orderBy(salary_data["Salary"].desc()).show()
这里是结果:
+--------+----------+------+
|Employee|Department|Salary|
+--------+----------+------+
| Michael| Field-eng| 4500 |
| Michael| Field-eng| 4500 |
| Robert | null | 4000 |
| Martin | null | 3500 |
| Kelly | Finance | 3500 |
| Maria | Finance | 3500 |
| John | Field-eng| 3500 |
| John | Sales | 3000 |
| Kate | Finance | 3000 |
| Kiran | Sales | 2200 |
+--------+----------+------+
本例中,根据salary_data
DataFrame 中Salary
列的值按降序对整个 DataFrame 进行排序。我们将Salary
列传递给orderBy
函数,并附加了desc()
函数调用,它根据此列对 DataFrame 进行了降序排序。
摘要
在本章中,我们学习了如何在 Spark DataFrame 中操作数据。
我们讨论了 Spark DataFrame API 以及 Spark 中的不同数据类型。我们还学习了如何在 Spark 中创建 DataFrame 以及如何创建后查看这些 DataFrame。最后,我们学习了不同的数据操作和数据聚合函数。
在下一章中,我们将介绍 Spark 中与数据处理相关的一些高级操作。
样题
-
以下哪个操作会触发评估?
-
DataFrame.filter()
-
DataFrame.distinct()
-
DataFrame.intersect()
-
DataFrame.join()
-
DataFrame.count()
答案
- E
更多推荐
所有评论(0)