原文:zh.annas-archive.org/md5/5e217f5bf66327f4a93816a8c91503e2

译者:飞龙

协议:CC BY-NC-SA 4.0

第八章:使用 Spark ML 进行机器学习

近年来,机器学习越来越受欢迎。在本章中,我们将全面探索 Apache Spark 上的强大框架 Spark 机器学习ML),它是一个用于可扩展机器学习的框架。我们将深入研究机器学习的基础概念以及 Spark ML 如何利用这些原则来实现高效和可扩展的数据驱动洞察。

我们将涵盖以下主题:

  • 机器学习的关键概念

  • 机器学习的不同类型

  • 使用 Spark 进行机器学习

  • 通过一个真实世界的例子考虑机器学习生命周期

  • 机器学习的不同案例研究

  • Spark ML 和分布式机器学习的未来趋势

机器学习包含针对不同数据场景的多种方法。我们将从学习机器学习的不同关键概念开始。

机器学习简介

机器学习是一个研究领域,专注于开发算法和模型,使计算机系统能够在没有被明确编程的情况下学习和做出预测或决策。它是人工智能AI)的一个子集,旨在为系统提供自动从数据和经验中学习和改进的能力。

在今天这个数据以前所未有的速度产生的大量数据的世界里,机器学习在提取有意义的洞察、做出准确预测和自动化决策过程中发挥着关键作用。随着数据的增长,机器可以更好地学习模式,从而更容易从这些数据中获得洞察。它在金融、医疗保健、营销、图像和语音识别、推荐系统等多个领域都有应用。

机器学习的核心概念

要理解机器学习,重要的是要掌握支撑其方法论的基本概念。

数据

数据是任何机器学习过程的基础。它可以是有结构的、半结构的或非结构的,并包括各种类型,如数值、分类、文本、图像等。机器学习算法需要高质量、相关和代表性的数据来学习模式和做出准确的预测。在处理机器学习问题时,拥有能够回答我们试图解决的问题的数据至关重要。在任何分析或模型构建过程中使用的数据质量将显著影响其结果和决策。不良或低质量的数据可能导致不准确、不可靠或误导性的结果,最终影响任何分析或模型的总体性能和可信度。

在不良数据上训练的机器学习模型可能会做出不准确或错误的预测或分类。例如,在不完整或具有偏见的数据上训练的模型可能会错误地将忠诚的客户识别为潜在的流失者,反之亦然。

依赖于从不良数据中得出的有缺陷或偏见的分析决策者可能会实施基于不准确洞察的策略。例如,由于有缺陷的流失预测而针对错误客户群体的营销活动可能导致资源浪费和错失机会。

因此,我们需要确保用于机器学习问题的数据能够代表我们想要构建模型的受众群体。另外一点需要注意的是,数据可能存在一些固有的偏差。我们有责任寻找这些偏差,并在使用这些数据构建机器学习模型时保持警觉。

特征

特征是机器学习算法用于做出预测或决策的数据的可测量属性或特征。它们是捕获数据中相关信息的变量或属性。在大量的数据中,我们希望了解哪些特征对解决特定问题是有用的。相关的特征会生成更好的模型。

特征工程,即选择、提取和转换特征的过程,在提高机器学习模型性能方面起着至关重要的作用。

标签和目标

标签或目标是指机器学习模型旨在预测或分类的期望输出或结果。在监督学习中,模型从标记数据中学习,标签代表与输入数据相关的正确答案或类别标签。在无监督学习中,模型在数据中识别模式或簇,而不需要任何明确的标签。

训练和测试

在机器学习中,模型使用可用数据的一个子集进行训练,这个子集被称为训练集。训练过程涉及将输入数据和相应的标签输入到模型中,模型从这些数据中学习以做出预测。一旦模型训练完成,其性能将使用另一个称为测试集的独立数据子集进行评估。这种评估有助于评估模型泛化能力和在未见数据上做出准确预测的能力。

算法和模型

机器学习算法是学习数据中的模式和关系的数学或统计过程,并做出预测或决策。它们可以分为各种类型,包括回归、分类、聚类、降维和强化学习。这些算法在数据上训练后,会生成捕获学习模式的模型,可用于对新未见数据做出预测。

深入讨论不同的机器学习算法超出了本书的范围。我们将在下一节讨论不同类型的机器学习问题。

机器学习的类型

机器学习问题可以大致分为两大类。在本节中,我们将探讨这两类。

监督学习

监督学习是一种机器学习方法,其中算法从标记的训练数据中学习以做出预测或决策。在监督学习中,训练数据包括输入特征和相应的输出标签或目标值。目标是学习一个映射函数,可以准确地预测新输入的输出。

监督学习的过程包括以下步骤:

  1. 数据准备:第一步是收集和预处理训练数据。这包括清理数据、处理缺失值以及将数据转换成适合学习算法的格式。数据应分为特征(输入变量)和标签(输出变量)。

  2. 模型训练:一旦数据已经准备好,监督学习算法就在标记的训练数据上训练。算法学习输入特征和相应输出标签之间的模式和关系。目标是找到一个可以很好地泛化到未见数据并做出准确预测的模型。

  3. 模型评估:在训练模型后,需要评估其性能。这是通过使用称为测试集或验证集的单独数据集来完成的。将模型的预测与测试集中的实际标签进行比较,并计算各种评估指标,如准确率、精确率、召回率或均方误差。

  4. 模型部署和预测:一旦模型经过训练和评估,就可以部署到对新数据做出预测。训练好的模型接受新数据的输入特征,并根据训练阶段学到的知识产生预测或决策。

监督学习算法的例子包括线性回归、逻辑回归、支持向量机SVM)、决策树、随机森林、梯度提升和神经网络。再次强调,深入探讨这些算法超出了本书的范围。你可以在这里了解更多关于它们的信息:spark.apache.org/docs/latest/ml-classification-regression.html

无监督学习

无监督学习是一种机器学习类型,其中算法在没有任何标记输出的情况下学习数据中的模式和关系。在无监督学习中,训练数据仅由输入特征组成,目标是发现数据中的隐藏模式、结构或聚类。

无监督学习的过程包括以下步骤:

  1. 数据准备:与监督学习类似,第一步是收集和预处理数据。然而,在无监督学习中,没有标记的输出值或目标变量。数据需要以适合特定无监督学习算法的方式转换和准备。

  2. 模型训练:在无监督学习中,算法在没有任何特定目标变量的输入特征上训练。算法通过统计属性或相似度度量探索数据,并识别模式或聚类。目标是提取数据中的有意义信息,而不需要任何预定义的标签。

  3. 模型评估(可选):与监督学习不同,无监督学习没有基于已知标签的直接评估指标。无监督学习中的评估通常是主观的,并取决于特定的任务或问题领域。它也比监督学习更手动。评估可能包括可视化发现的聚类、评估降维的质量或使用领域知识来验证结果。

  4. 模式发现和洞察:无监督学习的主要目标是发现数据中的隐藏模式、结构或聚类。无监督学习算法可以揭示关于数据的洞察,识别异常或离群值,执行降维或生成推荐。

无监督学习算法的例子包括 K-means 聚类、层次聚类、主成分分析(PCA)、关联规则挖掘和自组织映射(SOM)。

总结来说,监督学习和无监督学习是机器学习中的两种关键类型。监督学习依赖于标记数据来学习模式和进行预测,而无监督学习则在未标记数据中探索模式和结构。这两种类型都有它们自己的算法和技术,以及不同的选择。深入讨论无监督学习超出了本书的范围。

在下一节中,我们将探讨监督机器学习,这是人工智能和数据科学领域的一个基石,代表了构建预测模型和做出数据驱动决策的强大方法。

监督学习类型

如我们所知,监督学习是机器学习的一个分支,其中的算法从标记的训练数据中学习模式和关系。它涉及通过展示输入数据及其相应的输出标签来教授或监督模型,使算法能够学习输入和输出变量之间的映射。我们将探讨三种关键的监督学习类型——分类、回归和时间序列。

分类

分类是一种机器学习任务,其目标是根据其特征将数据分类或归类到预定义的类别或类别中。算法从标记的训练数据中学习,以构建一个可以预测新、未见数据实例类别标签的模型。

在分类中,输出是离散的,代表类别标签。用于分类任务的一些常见算法包括逻辑回归、决策树、随机森林、SVM 和朴素贝叶斯。

例如,考虑一个垃圾邮件分类任务,其目标是预测一封 incoming 邮件是否为垃圾邮件。算法在标记的邮件数据集上训练,其中每封邮件都与一个表示是否为垃圾邮件的类别标签相关联。训练好的模型可以根据其特征(如内容、主题或发件人)将新的邮件分类为垃圾邮件或非垃圾邮件。

回归

回归是另一种机器学习任务,它侧重于根据输入特征预测连续或数值值。在回归中,算法从标记的训练数据中学习,以构建一个模型,该模型可以估计或预测给定一组输入特征的目标变量的数值。

当输出是一个连续值时,例如预测房价、股票市场趋势或根据历史数据预测产品的销售,就会使用回归模型。一些常用的回归算法包括线性回归、决策树、随机森林、梯度提升和神经网络。

例如,考虑一个案例,你想要根据房屋的各种特征(如面积、卧室数量、位置等)来预测房价。在这种情况下,算法是在一个标记的房屋数据集上训练的,其中每座房屋都与相应的价格相关联。训练好的回归模型可以基于新房屋的特征来预测其价格。

时间序列

时间序列分析是机器学习的一个专门领域,它处理随时间收集的数据,其中观察的顺序很重要。在时间序列分析中,目标是理解和预测数据中的模式、趋势和依赖关系。

时间序列模型用于根据历史数据点预测未来值。它们在金融、股票市场预测、天气预报和需求预测等领域得到广泛应用。一些流行的时序算法包括自回归积分移动平均ARIMA)、指数平滑方法和长短期记忆LSTM)网络。

例如,假设你有一个特定公司的历史股票市场数据,包括日期和相应的股票价格。时间序列算法可以分析数据中的模式和趋势,并根据历史价格波动预测未来的股票价格。

总之,监督学习包括各种类型,如分类、回归和时间序列分析。每种类型都针对特定的学习任务,并需要不同的算法和技术。了解这些类型有助于选择适合特定数据分析与预测任务的最佳算法和方法。

接下来,我们将探讨如何利用 Spark 进行机器学习任务。

Spark 机器学习

Spark 提供了一个强大且可扩展的平台,用于执行大规模机器学习任务。Spark 的ML 库,也称为MLlib,提供了一系列算法和工具,用于构建和部署机器学习模型。

使用 Spark 进行机器学习的优点包括其分布式计算能力、高效的数据处理、可扩展性和与其他 Spark 组件(如 Spark SQL 和 Spark Streaming)的集成。Spark 的 MLlib 支持批处理和流数据处理,使得实时机器学习应用的开发成为可能。

机器学习是一个变革性的领域,它使计算机能够从数据中学习并做出预测或决策。通过理解关键概念并利用 Spark 的 MLlib 等工具,我们可以利用机器学习的力量来获得洞察力,自动化流程,并在各个领域推动创新。

现在,让我们来看看使用 Spark 进行机器学习任务的好处。

Apache Spark 在大规模机器学习中的优势

通过利用 Spark 的分布式计算能力和丰富的生态系统,数据科学家和工程师可以有效地解决大规模数据集上的复杂机器学习挑战。由于其分布式计算能力,它提供了各种优势,以下是一些:

  • 速度和性能:Apache Spark 的一个关键优势是它能够以非凡的速度处理大规模数据。Spark 利用内存计算和优化的数据处理技术,如数据并行任务管道化,来加速计算。这使得它在机器学习中常用的迭代算法中效率极高,显著减少了整体处理时间。

  • 分布式计算:Spark 的分布式计算模型允许它在集群的多个节点上分配数据和计算,实现并行处理。这种分布式特性使得 Spark 能够水平扩展,利用多台机器的计算能力并行处理数据。这使得它非常适合需要处理大量数据的机器学习任务。

  • 容错性:Apache Spark 的另一个优点是其内置的容错机制。Spark 自动跟踪弹性分布式数据集(RDDs)的 lineage,RDDs 是 Spark 中的基本数据抽象,这使得它能够从故障中恢复并重新运行失败的任务。这确保了 Spark 应用程序的可靠性和弹性,使其成为处理大规模机器学习工作负载的强大平台。

  • 通用性和灵活性:Spark 提供了一整套 API 和库,这些库简化了各种数据处理和分析任务,包括机器学习。Spark 的 MLlib 库提供了一套丰富的分布式机器学习算法和实用工具,使得开发可扩展的机器学习模型变得容易。此外,Spark 与流行的数据处理框架和工具集成良好,能够无缝集成到现有的数据管道和生态系统中。

  • 实时和流处理能力:正如我们在上一章中讨论的,Spark 通过其名为 Spark Streaming 的流组件扩展了其批处理之外的特性。这在需要基于持续到达的数据立即获得洞察或决策的场景中特别有价值,例如实时欺诈检测、传感器数据分析或社交媒体流上的情感分析。

  • 生态系统和社区支持:Apache Spark 拥有一个充满活力和活跃的开发者和贡献者社区,确保了持续的开发、改进和支持。Spark 从丰富的工具和扩展生态系统中受益,提供了额外的功能性和集成选项。Spark 的社区驱动特性确保了丰富的资源、文档、教程和在线论坛,用于学习和故障排除。

因此,Apache Spark 为大规模 ML 任务提供了显著的优势。其速度、可扩展性、容错性、多功能性和实时能力使其成为处理大数据和开发可扩展 ML 模型的强大框架。

现在,让我们看看 Spark 提供的不同库,以在分布式框架中利用 ML 功能。

Spark MLlib 与 Spark ML

Apache Spark 为 ML 提供了两个库:Spark MLlib 和 Spark ML。尽管它们名称相似,但在设计、API 和功能方面,这两个库之间有一些关键差异。让我们比较 Spark MLlib 和 Spark ML,以了解它们的特性和用例。

Spark MLlib

Spark MLlib 是 Apache Spark 中的原始 ML 库。它在 Spark 的早期版本中引入,提供了一套丰富的分布式 ML 算法和实用工具。MLlib 是建立在 RDD API 之上的,这是 Spark 中的核心数据抽象。

Spark MLlib 与其他非分布式 ML 库(如 scikit-learn)相比,有几个关键特性使其脱颖而出。让我们看看其中的一些:

  • 基于 RDD 的 API:MLlib 利用 RDD 抽象进行分布式数据处理,使其适用于批量处理和迭代算法。RDD API 允许进行高效的分布式计算,但对于某些用例来说可能低级且复杂。

  • 多样化的算法:MLlib 提供了广泛的分布式 ML 算法,包括分类、回归、聚类、协同过滤、降维等。这些算法旨在处理大规模数据,并且可以在 ML 管道中处理各种任务。

  • 特征工程:MLlib 提供了特征提取、转换和选择的实用工具。它包括处理分类和数值特征、文本处理和特征缩放的方法。

  • 模型持久化:MLlib 支持模型持久化,允许将训练好的模型保存到磁盘,并在以后用于部署或进一步分析。

在下一节中,我们将探讨 Spark ML 库。这是另一个也提供 ML 功能的新库。

Spark ML

Spark ML 是在 Spark 2.0 中引入的 Apache Spark 中的较新 ML 库。它旨在更易于使用,具有高级 API 并专注于 DataFrame,这是 Spark SQL 中引入的具有结构和优化的分布式数据集合。

Spark ML 的关键特性如下:

  • 基于 DataFrame 的 API:Spark ML 利用 DataFrame API,它比 RDD API 提供了更直观和更高层次的接口。DataFrame 提供了结构化和表格化的数据表示,使得处理结构化数据以及与 Spark SQL 集成变得更加容易。

  • 管道:Spark ML 引入了管道的概念,为构建机器学习工作流程提供了更高层次的抽象。管道允许将多个数据转换和模型训练阶段链接成一个单一的管道,简化了复杂机器学习管道的开发和部署。

  • 集成特征转换器:Spark ML 包含了一系列特征转换器,如 StringIndexer、OneHotEncoder、VectorAssembler 等。这些转换器与 DataFrame 无缝集成,简化了特征工程过程。

  • 统一 API:Spark ML 统一了不同机器学习任务的 API,例如分类、回归、聚类和推荐。这为不同算法提供了一个一致且统一的编程接口,简化了学习曲线。

既然我们已经了解了 Spark MLlib 和 Spark ML 的关键特性,让我们来探讨何时使用它们。

在以下场景中,使用 Spark MLlib 将受益:

  • 你正在使用不支持 Spark ML 的 Spark 旧版本

  • 你需要低级控制,并需要直接与 RDDs 工作

  • 你需要访问 Spark ML 中不可用的特定算法或功能

在以下场景中,你应该优先使用 Spark ML:

  • 你正在使用 Spark 2.0 或更高版本

  • 你偏好更高层次的 API,并希望利用 DataFrames 和 Spark SQL 的功能

  • 你需要构建包含集成特征转换器和管道的端到端机器学习管道

Spark MLlib 和 Spark ML 都在 Apache Spark 中提供了强大的机器学习功能。正如我们所见,Spark MLlib 是原始库,拥有丰富的分布式算法集,而 Spark ML 是一个更新的库,具有更用户友好的 API 和与 DataFrames 的集成。两者之间的选择取决于你的 Spark 版本、对 API 风格的偏好以及你机器学习任务的具体要求。

机器学习生命周期

机器学习生命周期涵盖了开发和部署机器学习模型的端到端过程。它包括几个阶段,每个阶段都有其自身的任务和考虑因素。理解机器学习生命周期对于构建稳健且成功的机器学习解决方案至关重要。在本节中,我们将探讨机器学习生命周期的关键阶段:

  1. 问题定义:机器学习生命周期的第一阶段是问题定义。这涉及到明确界定你想要解决的问题,以及理解你的机器学习项目的目标和目的。这一阶段需要领域专家和数据科学家之间的协作,以识别问题、定义成功指标和确定项目的范围。

  2. 数据获取和理解:一旦问题被定义,下一步就是获取用于训练和评估的必要数据。数据获取可能涉及从数据库、API 或外部数据集中收集数据。确保数据质量、完整性和与当前问题的相关性非常重要。此外,数据理解涉及探索和分析获取到的数据,以深入了解其结构、分布和潜在问题。

  3. 数据准备和特征工程:数据准备和特征工程是机器学习生命周期中的关键步骤。它涉及转换和预处理数据,使其适合训练机器学习模型。这包括诸如清理数据、处理缺失值、编码分类变量、缩放特征以及通过特征工程技术创建新特征等任务。适当的数据准备和特征工程对机器学习模型的性能和准确性有重大影响。

  4. 模型训练和评估:在这个阶段,机器学习模型在准备好的数据上进行训练。模型训练涉及选择合适的算法、定义模型架构,并使用训练数据优化其参数。然后使用评估指标和验证技术对训练好的模型进行评估,以评估其性能。这个阶段通常需要迭代和微调模型,以达到所需的准确性和泛化能力。

  5. 模型部署:一旦模型经过训练和评估,它就准备好进行部署。模型部署包括将模型集成到生产环境中,对新数据进行预测,并监控其性能。这可能涉及设置 API、创建批量或实时推理系统,并确保模型的可扩展性和可靠性。部署还包括对模型版本控制、监控和重新训练的考虑,以保持模型随时间保持有效性。

  6. 模型监控和维护:一旦模型部署,持续监控其性能并保持其有效性至关重要。监控包括跟踪模型预测、检测异常,并从用户或领域专家那里收集反馈。它还包括使用新数据定期重新训练模型,以适应变化的模式或概念。模型维护涉及解决模型漂移、更新依赖项,并在生产环境中管理模型的生命周期。

  7. 模型迭代和改进:机器学习生命周期是一个迭代的过程,模型通常需要随着时间的推移进行改进。基于用户反馈、性能指标和不断变化的企业需求,模型可能需要更新、重新训练或替换。迭代和改进对于保持模型更新并确保它们继续提供准确的预测至关重要。

机器学习生命周期包括问题定义、数据获取、数据准备、模型训练、模型部署、模型监控和模型迭代。每个阶段在开发成功的机器学习解决方案中都发挥着关键作用。通过遵循一个定义良好的生命周期,组织可以有效地构建、部署和维护机器学习模型,以解决复杂问题并从其数据中提取有价值的见解。

问题陈述

让我们深入一个案例研究,我们将探讨使用历史数据预测房价的艺术。想象一下:我们有一座关于房屋的宝贵信息宝库,包括分区、地块面积、建筑类型、整体状况、建造年份和销售价格等细节。我们的目标是利用机器学习的力量,准确预测即将到来的新房屋的价格。

为了完成这一壮举,我们将踏上构建一个专门用于预测房价的机器学习模型的旅程。该模型将利用现有的历史数据并纳入额外的特征。通过仔细分析和理解这些特征与相应销售价格之间的关系,我们的模型将成为估计任何新进入市场的房屋价值的可靠工具。

为了实现这一点,我们将回顾上一节中定义的一些步骤,其中我们讨论了机器学习生命周期。由于房价是连续的,我们将使用线性回归模型来预测这些价格。

我们将首先准备数据,使其可用于机器学习模型。

数据准备和特征工程

正如我们所知,数据准备和特征工程是机器学习过程中的关键步骤。适当的数据准备和特征工程技术可以显著提高模型的性能和准确性。在本节中,我们将通过代码示例探索常见的数据准备和特征工程任务。

数据集介绍

构建模型的第一步是找到相关数据。我们将为此目的使用房价数据(位于docs.google.com/spreadsheets/d/1caaR9pT24GNmq3rDQpMiIMJrmiTGarbs/edit#gid=1150341366)。这些数据有 2,920 行和 13 列。

该数据集有以下列:

  • Id: 数据表中每行的唯一标识符

  • MSSubClass: 房地产的子类

  • MSZoning: 房地产的分区

  • LotArea: 该房产所在地块的总面积

  • LotConfig: 地块配置 – 例如,是否为角地块

  • BldgType: 住宅类型 – 例如,单户住宅、联排别墅等

  • OverallCond: 房屋的一般状况

  • YearBuilt: 房屋建造的年份

  • YearRemodAdd: 进行任何翻修的年份

  • Exterior1st: 外部类型 – 例如,乙烯基、外墙板等

  • BsmtFinSF2: 完成地下室的总面积

  • TotalBsmtSF: 地下室的总面积

  • SalePrice: 房屋的销售价格

我们将从本节开头提供的链接下载这些数据。

现在我们已经了解了一些数据中的数据点,让我们学习如何加载数据。

加载数据

到目前为止,我们已经在我们的计算机和 Databricks 环境中下载了数据,并以 CSV 文件的形式存在。如您从前面的章节中回忆起来,我们学习了如何通过各种技术将数据集加载到 DataFrame 中。在这里,我们将使用 CSV 文件来加载数据:

housing_data = spark.read.csv("HousePricePrediction.csv")
# Printing first 5 records of the dataset
housing_data.show(5)

我们可以在图 8.1 中看到结果。请注意,由于数据集太大,无法全部显示,我们只能看到部分结果:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/dtbrk-cert-ass-dev-spk-py/img/B19176_08_1.jpg

让我们打印这个数据集的模式:

housing_data.printSchema

我们将得到以下模式:

<bound method DataFrame.printSchema of DataFrame[Id: bigint, MSSubClass: bigint, MSZoning: string, LotArea: bigint, LotConfig: string, BldgType: string, OverallCond: bigint, YearBuilt: bigint, YearRemodAdd: bigint, Exterior1st: string, BsmtFinSF2: bigint, TotalBsmtSF: bigint, SalePrice: bigint]>

如您可能已经注意到的,一些列的类型是字符串。我们将在下一节中清理这些数据。

清洗数据

数据清洗涉及处理缺失值、异常值和不一致的数据。在我们清理数据之前,我们将查看数据中有多少行。我们可以通过使用count()函数来完成:

housing_data.count()

该语句的结果如下所示:

2919

这意味着在我们应用任何清洗之前,数据中包含 2,919 行。现在,我们将从该数据集中删除缺失值,如下所示:

# Remove rows with missing values
cleaned_data = housing_data.dropna()
cleaned_data.count()

以下是该代码的结果:

1460

这表明我们删除了一些数据行,现在数据量变小了。

在下一节中,我们将讨论分类变量以及如何处理它们,特别是我们例子中用字符串表示的那些。

处理分类变量

在统计学和数据分析领域,分类变量是一种表示类别或组别的变量类型,它可以取有限、固定的不同值或级别。这些变量表示定性特征,不具有固有的数值意义或大小。相反,它们代表不同的属性或标签,将数据分类到特定的组或类别中。在训练机器学习模型之前,分类变量需要被编码成数值。

在我们的例子中,我们有一些是字符串类型的列。这些需要被编码成数值,以便模型能够正确使用它们。为此,我们将使用 Spark 的StringIndexer库来索引字符串列。

以下代码显示了如何使用StringIndexer

#import required libraries
from pyspark.ml.feature import StringIndexer
mszoning_indexer = StringIndexer(inputCol="MSZoning", outputCol="MSZoningIndex")
#Fits a model to the input dataset with optional parameters.
df_mszoning = mszoning_indexer.fit(cleaned_data).transform(cleaned_data)
df_mszoning.show()

在前面的代码中,我们正在将MSZoning列转换为索引列。为了实现这一点,我们创建了一个名为mszoningStringIndexer值。我们将其MSZoning作为要处理的输入列。输出列的名称是MSZoningIndex。我们将在下一步中使用这个输出列。之后,我们将mszoning_indexer拟合到cleaned_data

在生成的 DataFrame 中,您将注意到增加了一个名为MSZoningIndex的额外列。

现在,我们将使用管道来转换 DataFrame 中的所有特征。

管道汇集了一系列必要的步骤,每个步骤都致力于将原始数据转换为有价值的预测和分析。管道作为一个结构化的路径,由不同的阶段或组件组成,按照特定的顺序排列。每个阶段代表一个独特的操作或转换,它精炼数据,使其更适合机器学习任务。

管道(pipeline)的核心在于其无缝连接这些阶段的能力,形成一个协调一致的转换流程。这种编排确保数据能够轻松地通过每个阶段,一个阶段的输出成为下一个阶段的输入。它消除了手动干预的需要,自动化整个过程,节省了我们宝贵的时间和精力。我们将各种操作集成到管道中,如数据清洗、特征工程、编码分类变量、缩放数值特征等等。每个操作都在转换数据,使其可用于机器学习模型。

机器学习管道使我们能够简化我们的工作流程,尝试不同的转换组合,并在数据处理任务中保持一致性。它提供了一个结构化的框架,使我们能够轻松地重现和分享我们的工作,促进协作,并加深我们对数据转换过程的理解。

在机器学习和数据预处理中,独热编码器是一种将分类变量转换为数值格式的技术,使算法能够更好地理解和处理分类数据。当处理缺乏序数关系或数值表示的分类特征时,它特别有用。

我们将在管道中使用 StringIndexerOneHotEncoder。让我们看看我们如何实现这一点:

from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml import Pipeline
mszoning_indexer = StringIndexer(inputCol="MSZoning", outputCol="MSZoningIndex")
lotconfig_indexer = StringIndexer(inputCol="LotConfig", outputCol="LotConfigIndex")
bldgtype_indexer = StringIndexer(inputCol="BldgType", outputCol="BldgTypeIndex")
exterior1st_indexer = StringIndexer(inputCol="Exterior1st", outputCol="Exterior1stIndex")
onehotencoder_mszoning_vector = OneHotEncoder(inputCol="MSZoningIndex", outputCol="MSZoningVector")
onehotencoder_lotconfig_vector = OneHotEncoder(inputCol="LotConfigIndex", outputCol="LotConfigVector")
onehotencoder_bldgtype_vector = OneHotEncoder(inputCol="BldgTypeIndex", outputCol="BldgTypeVector")
onehotencoder_exterior1st_vector = OneHotEncoder(inputCol="Exterior1stIndex", outputCol="Exterior1stVector")
#Create pipeline and pass all stages
pipeline = Pipeline(stages=[mszoning_indexer,
                            lotconfig_indexer,
                            bldgtype_indexer,
                            exterior1st_indexer,
                            onehotencoder_mszoning_vector,
                            onehotencoder_lotconfig_vector,
                            onehotencoder_bldgtype_vector,
                            onehotencoder_exterior1st_vector])

要开始我们的代码,我们需要从 PySpark 库中导入所需的模块。StringIndexerOneHotEncoder 模块将被用来处理住房数据集的字符串列。

当我们开始将分类列转换为机器学习算法可以理解的数值表示的过程时,让我们更仔细地看看代码中发生的魔法。

第一步是为我们希望转换的每个分类列创建 StringIndexer 实例。每个实例接受一个输入列,例如 MSZoningLotConfig,并生成一个相应的输出列,带有数值索引。例如,MSZoningIndex 列捕获了 MSZoning 列的转换索引值。

分类列成功索引后,我们进入下一阶段。现在,我们希望将这些索引转换为二进制向量。为此,我们可以使用 OneHotEncoder。生成的向量将每个分类值表示为一个二进制数组,其中值为 1 表示该类别的存在,否则为 0。

我们为每个索引列创建 OneHotEncoder 实例,例如 MSZoningIndexLotConfigIndex,并生成包含二进制向量表示的新输出列。这些输出列,如 MSZoningVectorLotConfigVector,用于捕获编码信息。

随着代码的进展,我们组装了一个管道——一系列转换,其中每个转换代表一个阶段。在我们的案例中,每个阶段包括特定分类列的索引和独热编码步骤。我们在管道中安排这些阶段,确保转换的正确顺序。

通过构建我们的管道,我们协调了操作的流畅流程。管道连接了不同阶段之间的点,使得将这些转换应用到整个数据集上变得毫不费力。我们的管道充当指挥,引导数据通过转换,最终使其成为适合机器学习的格式。

现在,我们将此管道拟合到我们的清理数据集,以便所有列都可以一起转换:

df_transformed = pipeline.fit(cleaned_data).transform(cleaned_data)
df_transformed.show(5)

结果 DataFrame 将包含我们在管道中通过转换创建的附加列。我们为每个字符串列创建了索引和向量列。

现在,我们需要从我们的数据集中删除不必要的冗余列。我们将在下一节中这样做。

数据清理

在这一步中,我们将确保我们只使用 ML 所需的特征。为了实现这一点,我们将删除不同的附加列,例如不服务于模型的身份列。此外,我们还将删除我们已经应用了转换的特征,例如字符串列。

以下代码展示了如何删除列:

drop_column_list = ["Id", "MSZoning","LotConfig","BldgType", "Exterior1st"]
df_dropped_cols = df_transformed.select([column for column in df_transformed.columns if column not in drop_column_list])
df_dropped_cols.columns

这是结果:

['MSSubClass',
 'LotArea',
 'OverallCond',
 'YearBuilt',
 'YearRemodAdd',
 'BsmtFinSF2',
 'TotalBsmtSF',
 'SalePrice',
 'MSZoningIndex',
 'LotConfigIndex',
 'BldgTypeIndex',
 'Exterior1stIndex',
 'MSZoningVector',
 'LotConfigVector',
 'BldgTypeVector',
 'Exterior1stVector']

如您从结果列列表中看到的,IdMSZoningLotConfigBldgTypeExterior1st 列已从结果 DataFrame 中删除。

过程中的下一步是组装数据。

组装向量

在这一步中,我们将根据我们想要的特性组装一个向量。这一步对于 Spark ML 与数据一起工作来说是必要的。

以下代码捕捉了我们如何实现这一点:

from pyspark.ml.feature import VectorAssembler
#Assembling features
feature_assembly = VectorAssembler(inputCols = ['MSSubClass',
 'LotArea',
 'OverallCond',
 'YearBuilt',
 'YearRemodAdd',
 'BsmtFinSF2',
 'TotalBsmtSF',
 'MSZoningIndex',
 'LotConfigIndex',
 'BldgTypeIndex',
 'Exterior1stIndex',
 'MSZoningVector',
 'LotConfigVector',
 'BldgTypeVector',
 'Exterior1stVector'], outputCol = 'features')
output = feature_assembly.transform(df_dropped_cols)
output.show(3)

在前面的代码块中,我们创建了一个包含组装向量的 features 列。在对其进行缩放后,我们将使用此列进行模型训练。

向量组装完成后,过程的下一步是对数据进行缩放。

特征缩放

特征缩放确保所有特征都在相似的尺度上,防止某些特征主导学习过程。

为了做到这一点,我们可以使用以下代码:

#Normalizing the features
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",withStd=True, withMean=False)
# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(output)
# Normalize each feature to have unit standard deviation.
scaledOutput = scalerModel.transform(output)
scaledOutput.show(3)

以下代码仅选择缩放后的特征和目标列——即 SalePrice

#Selecting input and output column from output
df_model_final = scaledOutput.select(['SalePrice', 'scaledFeatures'])
df_model_final.show(3)

我们将得到以下输出:

+---------+--------------------+
|SalePrice|   scaledFeatures   |
+---------+--------------------+
|   208500|(37,[0,1,2,3,4,6,...|
|   181500|(37,[0,1,2,3,4,6,...|
|   223500|(37,[0,1,2,3,4,6,...|
+---------+--------------------+

如您所见,df_model_final 现在只有两列。SalePrice 是我们将要预测的列,因此是我们的目标列。scaledFeatures 包含我们将用于训练 ML 模型的所有特征。

这些示例展示了使用 PySpark 进行常见的数据准备和特征工程任务。然而,具体的技术和方法可能因数据集和机器学习任务的要求而异。理解数据的特征并选择适当的预处理和特征工程技术至关重要。适当的数据准备和特征工程是构建准确和稳健的机器学习模型的基础。

此过程的下一步是训练和评估机器学习模型。

模型训练和评估

模型训练和评估是机器学习过程中的关键步骤。在本节中,我们将探讨如何使用各种指标和技术来训练机器学习模型并评估其性能。我们将使用 PySpark 作为模型训练和评估的框架。

数据拆分

在训练模型之前,将数据集拆分为训练集和测试集非常重要。训练集用于训练模型,而测试集用于评估其性能。以下是如何使用 PySpark 拆分数据的示例:

#test train split
df_train, df_test = df_model_final.randomSplit([0.75, 0.25])

在前面的代码中,我们正在进行数据的随机拆分,将 75%的数据放入训练集,将 25%的数据放入测试集。还有其他拆分技术。你应该仔细查看你的数据,然后定义最适合你的数据和模型训练的拆分方式。

我们拆分数据的原因是,一旦我们训练了模型,我们想看看训练好的模型在它从未见过的数据集上的预测效果如何。在这种情况下,那就是我们的测试数据集。这将帮助我们评估模型并确定模型的质量。基于此,我们可以采用不同的技术来改进我们的模型。

下一步是模型训练。

模型训练

数据拆分后,我们可以在训练数据上训练一个机器学习模型。PySpark 为各种类型的机器学习任务提供了一系列算法。在这个例子中,我们将使用线性回归作为我们的选择模型。

下面是一个训练线性回归模型的示例:

from pyspark.ml.regression import LinearRegression
# Instantiate the linear regression model
regressor = LinearRegression(featuresCol = 'scaledFeatures', labelCol = 'SalePrice')
# Fit the model on the training data
regressor = regressor.fit(df_train)

在前面的代码中,我们正在使用训练数据并将其拟合到线性回归模型中。我们还添加了一个名为labelCol的参数,告诉模型这是我们的目标列,用于预测。

模型训练完成后,下一步是确定模型的好坏。我们将在下一节通过评估模型来完成这一步。

模型评估

模型训练后,我们需要评估其在测试数据上的性能。评估指标提供了关于模型性能的见解。

均方误差MSE)是一个基本的统计指标,用于通过量化预测值和实际值之间平方差的平均值来评估回归模型的性能。

R 平方,通常表示为R2,是一个统计量,表示在回归模型中,因变量的方差中有多少是可预测的或由自变量解释的。它作为独立变量解释因变量变异性好坏的指标。

这里是使用均方误差(MSE)和 R2 指标评估回归模型的一个示例:

#MSE for the train data
pred_results = regressor.evaluate(df_train)
print("The train MSE for the model is: %2f"% pred_results.meanAbsoluteError)
print("The train r2 for the model is: %2f"% pred_results.r2)

这里是结果:

The MSE for the model is: 32287.153682
The r2 for the model is: 0.614926

我们可以检查测试数据的性能,如图所示:

#Checking test performance
pred_results = regressor.evaluate(df_test)
print("The test MSE for the model is: %2f"% pred_results.meanAbsoluteError)
print("The test r2 for the model is: %2f"% pred_results.r2)

这里是结果:

The MSE for the model is: 31668.331218
The r2 for the model is: 0.613300

根据模型的结果,我们可以进一步调整它。我们将在下一节中看到实现这一目标的一些技术。

交叉验证

交叉验证是提高机器学习模型性能的不同方法之一。

通过将数据分成多个子集进行训练和评估,交叉验证用于更稳健地评估模型性能。因此,我们不仅使用训练和测试数据,还使用一个验证集,模型从未见过这些数据,仅用于衡量性能。

交叉验证遵循一个简单的原则:而不是依赖于单一的训练-测试分割,我们将数据集分成多个子集,或称为。每个折作为一个迷你训练-测试分割,其中一部分数据用于训练,其余部分保留用于测试。通过旋转折,我们确保每个数据点都有机会成为测试集的一部分,从而减轻偏差,提供更具有代表性的评估。

最常见的交叉验证形式是k 折交叉验证。在这种方法中,数据集被分为 k 个大小相等的折。模型被训练和评估 k 次,每次每个折作为测试集,而其余的折共同组成训练集。通过平均每次迭代获得的表现指标,我们得到对模型性能的更稳健估计。

通过交叉验证,我们获得了关于模型泛化能力的宝贵见解。它使我们能够衡量模型在不同数据子集中的性能,捕捉数据集中存在的固有变化和细微差别。这种技术帮助我们检测潜在的过拟合问题,即模型在训练集上表现异常出色,但无法泛化到未见过的数据。

除了 k 折交叉验证之外,还有针对特定场景的变体和扩展。分层交叉验证确保每个折保持与原始数据集相同的类别分布,从而保持分割的代表性。另一方面,留一法交叉验证将每个数据点视为一个单独的折,提供严格的评估,但代价是增加了计算复杂性。

接下来,我们将学习超参数调优。

超参数调优

超参数调优是优化机器学习算法超参数的过程,以提高其性能。超参数是模型外部的设置或配置,不能直接从训练数据中学习。与在训练过程中学习的模型参数不同,超参数需要在事先指定,并在确定机器学习模型的行为和性能方面至关重要。我们将使用超参数调优来提高模型性能。超参数是在训练之前设置的参数,不是从数据中学习的。调整超参数可以显著影响模型的表现。

想象一下:我们的模型是一个复杂的机械装置,由各种被称为超参数的旋钮和杠杆组成。这些超参数控制着我们的模型的行为和特征,影响着其学习、泛化和做出准确预测的能力。然而,找到这些超参数的最佳配置并非易事。

超参数调优是系统地搜索和选择最佳超参数组合的艺术。它使我们能够超越默认设置,发现与我们的数据和谐一致的配置,提取最有意义的见解并实现卓越的性能。

超参数调优的目标是获得最佳值。我们探索不同的超参数设置,穿越一个可能性的多维景观。这种探索可以采取多种形式,例如网格搜索、随机搜索,或者更高级的技术,如贝叶斯优化或遗传算法。

网格搜索是一种流行的方法,它涉及为每个超参数定义一个潜在值的网格。然后,模型在网格中的每个可能组合上进行训练和评估。通过彻底搜索网格,我们发现了产生最高性能的配置,为我们进一步优化提供了坚实的基础。

随机搜索采取不同的方法。它从预定义的分布中随机采样超参数值,并对每个采样的配置评估模型的性能。这种随机探索使我们能够覆盖更广泛的可能范围,可能发现非常规但非常有效的配置。

这些例子展示了使用 PySpark 进行模型训练和评估的过程。然而,具体算法、评估指标和技术可能因所面临的机器学习任务而异。理解问题域、选择合适的算法和选择相关的评估指标对于有效地训练和评估模型至关重要。

模型部署

模型部署是将训练好的机器学习模型用于生产环境的过程。在本节中,我们将探讨有效部署机器学习模型的多种方法和技巧:

  • 序列化和持久化:一旦模型被训练,它需要被序列化并持久化到磁盘以供以后使用。序列化是将模型对象转换为可以存储的格式的过程,而持久化涉及将序列化的模型保存到存储系统中。

  • 模型服务:模型服务涉及将训练好的模型作为 API 端点或服务提供,该服务可以接收输入数据并返回预测结果。这允许其他应用程序或系统集成并使用该模型进行实时预测。

模型监控和管理

一旦模型被部署,重要的是要监控其在生产环境中的性能和行为,并保持其长期的有效性。监控可以帮助识别数据漂移、模型退化或异常等问题。此外,模型管理涉及版本控制、跟踪和维护已部署模型的多个版本。这些做法确保模型保持最新状态,并随着时间的推移保持最佳性能。在本节中,我们将探讨模型监控和维护的关键方面:

  • 可扩展性和性能:在部署机器学习模型时,可扩展性和性能是至关重要的考虑因素。模型应设计并部署为能够高效处理大量数据并满足高吞吐量需求的方式。例如,Apache Spark 等技术提供了分布式计算能力,使得可扩展且高性能的模型部署成为可能。

  • 模型更新和重新训练:机器学习模型可能需要定期更新或重新训练,以适应变化的数据模式或提高性能。部署的模型应具备机制,以便在不中断服务流程的情况下方便地进行更新和重新训练。这可能涉及自动化流程,例如监控数据漂移或基于特定条件的重新训练触发器。

  • 性能指标:为了监控已部署的模型,定义和跟踪相关的性能指标非常重要。这些指标可能因机器学习问题的类型和应用程序的具体要求而异。一些常用的性能指标包括准确率、精确率、召回率、F1 分数和ROC 曲线下面积AUC)。通过定期评估这些指标,可以识别出与预期性能的偏差,这表明需要进行进一步调查或维护操作。

  • 数据漂移检测:数据漂移是指输入数据的统计属性随时间变化的现象,这会导致模型性能下降。监控数据漂移对于确保部署的模型持续提供准确预测至关重要。可以使用诸如统计测试、特征分布比较和异常值检测等技术来检测数据漂移。当检测到数据漂移时,可能需要更新模型或使用更近期的数据进行重新训练。

  • 模型性能监控:监控部署模型的性能涉及跟踪其预测并将其与真实值进行比较。这可以通过定期采样预测子集并评估其实际结果来实现。监控还可以包括分析预测错误、识别模式或异常以及调查任何性能下降的根本原因。通过定期监控模型的性能,可以及早发现问题并采取纠正措施。

  • 模型重新训练和更新:在生产中部署的模型可能需要定期更新或重新训练以保持其有效性。当有新数据可用或应用领域发生重大变化时,使用新鲜数据重新训练模型可以帮助提高其性能。此外,错误修复、功能增强或算法改进可能需要更新部署的模型。建立良好的流程和基础设施来高效地处理模型重新训练和更新至关重要。

  • 版本控制和模型治理:维护部署模型的适当版本控制和治理对于跟踪变更、保持可重复性和确保合规性至关重要。版本控制系统可用于管理模型版本、跟踪变更并提供模型更新的历史记录。此外,维护与模型变更、依赖关系和相关流程相关的文档有助于有效的模型治理。

  • 协作与反馈:模型监控和维护通常涉及不同利益相关者之间的协作,包括数据科学家、工程师、领域专家和业务用户。建立反馈和沟通渠道可以促进见解的交流、问题的识别和必要变更的实施。定期的会议或反馈循环有助于将模型的性能与应用的演变需求保持一致。

总体而言,模型部署是机器学习生命周期中的关键步骤。它涉及序列化和持久化训练好的模型,将它们作为 API 或服务提供,监控其性能,确保可扩展性和性能,以及管理更新和重新训练。

通过积极监控和维护部署的模型,组织可以确保其机器学习系统持续提供准确和可靠的预测。有效的模型监控技术与主动维护策略相结合,能够及时识别性能问题,并支持必要的行动,以保持模型更新并与业务目标保持一致。

模型迭代与改进

模型迭代与改进是机器学习生命周期中的一个关键阶段,专注于提升部署模型的性能和有效性。通过持续优化和改进模型,组织可以实现更好的预测,并从其机器学习项目中获得更大的价值。在本节中,我们将探讨模型迭代和改进的关键方面:

  • 收集反馈和获取洞察:模型迭代和改进的第一步是从各种利益相关者那里收集反馈,包括最终用户、领域专家和业务团队。反馈可以提供有关模型性能、改进领域和在实际场景中遇到的潜在问题的宝贵见解。可以通过调查、用户访谈或在生产环境中监控模型的运行行为来收集此类反馈。

  • 分析模型性能:为了确定改进领域,重要的是要彻底分析模型的性能。这包括检查性能指标、评估预测误差以及深入分析被错误分类或预测不佳的实例。通过了解模型的优点和缺点,数据科学家可以将精力集中在需要关注的特定领域。

  • 探索新特征和数据:提升模型性能的一种方法是通过整合新特征或利用额外的数据源。探索性数据分析有助于识别可能对预测有重大影响的潜在特征。可以使用特征工程技术,如创建交互项、缩放或转换变量,来增强数据的表示。此外,整合来自不同来源的新数据可以提供新的见解并提高模型的一般化能力。

  • 算法选择和超参数调整:通过实验不同的算法和超参数,可以显著提升模型性能。数据科学家可以探索替代算法或现有算法的变体,以确定针对特定问题的最佳方法。可以使用网格搜索或贝叶斯优化等超参数调整技术来找到模型参数的最优值。这一迭代过程有助于确定最佳算法和参数设置,从而产生更优的结果。

  • 集成方法:集成方法涉及结合多个模型以创建更稳健和准确的预测。可以应用诸如 bagging、boosting 或 stacking 等技术,从多个基础模型构建集成模型。集成方法通常可以通过减少偏差、方差和过拟合来提高模型性能。尝试不同的集成策略和模型组合可以进一步提高预测准确性。

  • A/B 测试和受控实验:可以在受控环境中进行 A/B 测试或受控实验,以评估模型改进的影响。通过随机分配用户或数据样本到不同版本的模型,组织可以衡量新模型与现有模型的性能。这种方法提供了具有统计学意义的成果,以确定所提出的更改是否导致了预期的改进。

  • 持续监控与评估:一旦改进后的模型部署,持续的监控与评估对于确保其持续性能至关重要。监控数据漂移、分析性能指标以及定期评估有助于识别潜在的退化或进一步改进的需求。这个反馈循环允许对部署的模型进行持续迭代和优化。

通过拥抱迭代和改进的文化,组织可以持续提升其机器学习模型的性能和准确性。通过收集反馈、分析模型性能、探索新的特性和算法、进行实验以及持续监控,模型可以迭代优化以实现更好的预测并推动可衡量的业务成果。

案例研究和现实世界案例

在本节中,我们将探讨机器学习的两个突出用例:客户流失预测和欺诈检测。这些示例展示了机器学习技术在解决现实世界挑战和实现显著商业价值方面的实际应用。

客户流失预测

客户流失是指客户与公司终止关系的现象,通常是通过取消订阅或转向竞争对手来实现的。预测客户流失对商业至关重要,因为它允许企业主动识别有离开风险的客户并采取适当的措施来留住他们。机器学习模型可以分析各种客户属性和行为模式来预测流失的可能性。让我们深入了解一个客户流失预测案例研究。

案例研究 – 电信公司

一家电信公司希望通过预测哪些客户最有可能取消他们的订阅来降低客户流失率。公司收集了大量的客户数据,包括人口统计信息、通话记录、服务使用情况和客户投诉。通过利用机器学习,他们旨在识别客户流失的关键指标并构建一个预测模型来预测未来的流失者:

  • 数据准备:公司收集并预处理客户数据,确保数据清洁、格式化并准备好分析。他们将客户档案与历史流失信息相结合,创建标记数据集。

  • 特征工程:为了捕捉有意义的模式,公司从可用数据中工程相关特征。这些特征可能包括平均通话时长、投诉数量、月度服务使用量和任期等变量。

  • 模型选择和训练:公司选择合适的机器学习算法,例如逻辑回归、决策树或随机森林,来构建流失预测模型。他们将数据集分为训练集和测试集,在训练数据上训练模型,并在测试数据上评估其性能。

  • 模型评估:使用评估指标如准确率、精确率、召回率和 F1 分数来评估模型的表现。公司分析模型正确识别流失客户和非流失客户的能力,在误报和漏报之间取得平衡。

  • 模型部署和监控:一旦模型达到预期的性能标准,它就被部署到生产环境中。模型持续监控传入的客户数据,生成流失预测,并为处于风险中的客户触发适当的保留策略。

欺诈检测

欺诈检测是机器学习的另一个关键应用,旨在识别欺诈活动并防止财务损失。机器学习模型可以从历史数据中学习欺诈行为的模式,并在实时中标记可疑的交易或活动。让我们探讨一个欺诈检测案例研究。

案例研究 - 金融机构

一家金融机构希望实时检测欺诈交易,以保护其客户并防止货币损失。该机构收集交易数据,包括交易金额、时间戳、商户信息和客户详情。通过利用机器学习算法,他们旨在构建一个强大的欺诈检测系统:

  • 数据预处理:金融机构处理和清理交易数据,确保数据完整性和一致性。他们还可能通过整合额外的信息,如 IP 地址或设备标识符,来增强欺诈检测能力。

  • 特征工程:从交易数据中提取相关特征,以捕捉潜在的欺诈活动指标。这些特征可能包括交易金额、频率、地理位置、与典型消费模式的偏差以及客户交易历史。

  • 模型训练:金融机构选择合适的机器学习算法,例如异常检测技术或监督学习方法(例如,逻辑回归和梯度提升),以训练欺诈检测模型。该模型在标记为欺诈或非欺诈的历史数据上训练。

  • 实时监控:一旦模型经过训练,它就会被部署以实时分析传入的交易。模型为每笔交易分配一个欺诈概率分数,超过一定阈值的交易将被标记为需要进一步调查或干预。

  • 持续改进:金融机构通过监控其性能并整合新数据,持续优化欺诈检测模型。他们定期评估模型的有效性,调整阈值,并更新模型以适应不断变化的欺诈模式和技巧。

通过将 ML 技术应用于客户流失预测和欺诈检测,组织可以增强其决策过程,提高客户保留率,并减轻财务风险。这些案例研究突出了 ML 在现实场景中的实际应用,展示了其在各个行业中的价值。

Spark ML 和分布式 ML 的未来趋势

随着机器学习领域的持续发展,我们可以在 Spark ML 和分布式 ML 领域期待一些未来的趋势和进步。以下是一些关键领域值得关注:

  • 深度学习集成:Spark ML 很可能将更深入地与深度学习框架(如 TensorFlow 和 PyTorch)集成。这将使用户能够无缝地将深度学习模型集成到他们的 Spark ML 管道中,释放神经网络在复杂任务(如图像识别和自然语言处理)中的力量。

  • 自动化机器学习:自动化将在简化并加速机器学习过程中发挥重要作用。我们可以在 Spark ML 中期待自动化特征工程、超参数调整和模型选择技术的进步。这些进步将使用户能够以最小的手动努力构建高性能模型。

  • 可解释人工智能:随着机器学习模型中透明度和可解释性的需求增长,Spark ML 很可能将采用模型可解释性的技术。这将使用户能够理解和解释模型做出的预测,使模型更加可靠,并符合监管要求。

  • 生成式人工智能(GenAI):GenAI 是当前的热门话题。随着 GenAI 用例的需求增加,现有平台可能会整合一些用于 GenAI 的 LLMs(大型语言模型)。

  • 边缘计算和物联网:随着边缘计算和 物联网(IoT) 的兴起,Spark ML 预计将扩展其功能,以支持在边缘设备上进行 ML 推理和训练。这将实现实时、低延迟的预测和跨边缘设备的分布式学习,为智能城市、自动驾驶汽车和工业物联网等领域的应用开辟新的可能性。

由此,我们结束了本书的学习部分。让我们简要回顾一下我们所学的内容。

概述

总结来说,Spark ML 为分布式机器学习任务提供了一个强大且可扩展的框架。它与 Apache Spark 的集成在处理大规模数据集、并行计算和容错方面提供了显著优势。在本章中,我们探讨了 Spark ML 的关键概念、技术和实际应用案例。

我们讨论了机器学习生命周期,强调了数据准备、模型训练、评估、部署、监控和持续改进的重要性。我们还比较了 Spark MLlib 和 Spark ML,突出了它们各自的特点和用例。

在本章中,我们讨论了与 Spark ML 相关的各种关键概念和技术。我们探讨了不同类型的机器学习,如分类、回归、时间序列分析、监督学习和无监督学习。我们强调了数据准备和特征工程在构建有效的机器学习管道中的重要性。我们还简要提到了 Spark ML 中的容错和可靠性方面,确保了鲁棒性和数据完整性。

此外,我们考察了实际应用案例,包括客户流失预测和欺诈检测,以展示 Spark ML 在解决复杂商业挑战中的实际应用。这些案例研究展示了组织如何利用 Spark ML 来增强决策能力、提高客户保留率并减轻财务风险。

在你继续使用 Spark ML 进行机器学习之旅时,重要的是要记住该领域的迭代和动态特性。保持对最新进展的了解,探索新技术,并拥抱持续学习和改进的心态。

通过利用 Spark ML 的力量,你可以从数据中解锁有价值的见解,构建复杂的机器学习模型,并做出推动业务成功的明智决策。因此,利用 Spark ML 的能力,拥抱未来趋势,踏上掌握分布式机器学习的旅程。

本章到此结束。希望它能帮助你在机器学习模型世界的精彩旅程中取得进步。接下来的两章是模拟测试,旨在帮助你为认证考试做好准备。

第五部分:模拟试卷

本部分将提供两份模拟试卷,帮助读者通过练习题目来准备认证考试。

本部分包含以下章节:

  • 第九章*,* 模拟试卷 1

  • 第十章*,* 模拟试卷 2

第九章:模拟测试 1

问题

尝试回答这些问题以测试你对 Apache Spark 的了解:

问题 1

以下哪个陈述没有准确地描述 Spark 驱动程序的功能?

  1. Spark 驱动程序作为运行 Spark 应用程序主方法的节点,用于协调应用程序。

  2. Spark 驱动程序可以水平扩展以提高整体处理吞吐量。

  3. Spark 驱动程序包含 SparkContext 对象。

  4. Spark 驱动程序负责使用集群模式下的不同工作节点调度数据的执行。

  5. 最佳性能要求 Spark 驱动程序应尽可能靠近工作节点。

问题 2

以下哪个陈述准确地描述了阶段?

  1. 阶段内的任务可以由多台机器同时执行。

  2. 作业中的各个阶段可以并发运行。

  3. 阶段由一个或多个作业组成。

  4. 阶段在提交之前暂时存储事务。

问题 3

以下哪个陈述准确地描述了 Spark 的集群执行模式?

  1. 集群模式在网关节点上运行执行器进程。

  2. 集群模式涉及驱动程序托管在网关机器上。

  3. 在集群模式下,Spark 驱动程序和集群管理器不是位于同一位置的。

  4. 集群模式下的驱动程序位于工作节点上。

问题 4

以下哪个陈述准确地描述了 Spark 的客户端执行模式?

  1. 客户端模式在网关节点上运行执行器进程。

  2. 在客户端模式下,驱动程序与执行器位于同一位置。

  3. 在客户端模式下,Spark 驱动程序和集群管理器是位于同一位置的。

  4. 在客户端模式下,驱动程序位于边缘节点上。

问题 5

以下哪个陈述准确地描述了 Spark 的独立部署模式?

  1. 独立模式为每个应用程序在每个工作节点上使用一个执行器。

  2. 在独立模式下,驱动程序位于工作节点上。

  3. 在独立模式下,集群不需要驱动程序。

  4. 在独立模式下,驱动程序位于边缘节点上。

问题 6

Spark 中的任务是什么?

  1. 每个数据分区在任务中执行的工作单元是槽。

  2. 任务是 Spark 中可以执行的第二小实体。

  3. 具有广泛依赖关系的任务可以合并为单个任务。

  4. 任务是 Spark 中分区执行的单个工作单元。

问题 7

以下哪个是 Spark 执行层次结构中的最高级别?

  1. 任务

  2. 任务

  3. 执行器

  4. 阶段

问题 8

如何在 Spark 的上下文中准确描述槽的概念?

  1. 槽的创建和终止与执行器的工作负载相一致。

  2. Spark 通过在各个槽之间策略性地存储数据来增强 I/O 性能。

  3. 每个槽始终被限制在单个核心上。

  4. 槽允许任务并行运行。

问题 9

Spark 中执行器的角色是什么?

  1. 执行器的角色是将操作请求转换为 DAG。

  2. Spark 环境中只能有一个执行器。

  3. 执行器以优化和分布式的方式处理分区

  4. 执行器安排查询以执行

问题 10:

Shuffle 在 Spark 中的作用是什么?

  1. Shuffle 将变量广播到不同的分区

  2. 使用 shuffle,数据会被写入磁盘

  3. Shuffle 命令在 Spark 中转换数据

  4. Shuffle 是一种窄转换

问题 11:

Actions 在 Spark 中的作用是什么?

  1. Actions 只从磁盘读取数据

  2. Actions 用于修改现有的 RDD

  3. Actions 触发任务的执行

  4. Actions 用于建立阶段边界

问题 12:

以下哪项是 Spark 中集群管理器的一项任务?

  1. 在执行器失败的情况下,集群管理器将与驱动器协作以启动一个新的执行器

  2. 集群管理器可以将分区合并以增加复杂数据处理的速度

  3. 集群管理器收集查询的运行时统计信息

  4. 集群管理器创建查询计划

问题 13:

以下哪项是 Spark 中自适应查询执行的一项任务?

  1. 自适应查询执行可以合并分区以增加复杂数据处理的速度

  2. 在执行器失败的情况下,自适应查询执行功能将与驱动器协作以启动一个新的执行器

  3. 自适应查询执行创建查询计划

  4. 自适应查询执行负责在 Spark 中生成多个执行器以执行任务

问题 14:

以下哪项操作被认为是转换?

  1. df.select()

  2. df.show()

  3. df.head()

  4. df.count()

问题 15:

Spark 中懒加载评估的一个特性是什么?

  1. Spark 只在执行期间失败作业,而不是在定义期间

  2. Spark 只在定义期间失败作业

  3. Spark 在收到转换操作时会执行

  4. Spark 在收到操作时会失败

问题 16:

以下关于 Spark 执行层次结构的哪个陈述是正确的?

  1. 在 Spark 的执行层次结构中,任务位于作业之上

  2. 在 Spark 的执行层次结构中,多个作业包含在一个阶段中

  3. 在 Spark 的执行层次结构中,一个作业可能跨越多个阶段边界

  4. 在 Spark 的执行层次结构中,slot 是最小的单元

问题 17:

以下哪项是 Spark 驱动的特征?

  1. 当驱动器发送命令时,工作节点负责将 Spark 操作转换为 DAG

  2. Spark 驱动负责执行任务并将结果返回给执行器

  3. Spark 驱动可以通过添加更多机器来扩展,从而提高 Spark 任务的性能

  4. Spark 驱动以优化和分布式的方式处理分区

问题 18:

以下关于广播变量的哪个陈述是准确的?

  1. 广播变量仅存在于驱动节点上

  2. 广播变量只能用于适合内存的表

  3. 广播变量不是不可变的,这意味着它们可以在集群之间共享

  4. 广播变量不会在工作节点之间共享

问题 19:

以下哪个代码块返回了 DataFrame dfemployee_stateemployee_salary 列的唯一值?

  1. Df.select('employee_state').join(df.select('employee_salary'), col('employee_state')==col('employee_salary'), 'left').show()

  2. df.select(col('employee_state'), col('employee_salary')).agg({'*': 'count'}).show()

  3. df.select('employee_state', 'employee_salary').distinct().show()

  4. df.select('employee_state').union(df.select('employee_salary')).distinct().show()

问题 20:

以下哪个代码块从 my_fle_path 位置读取名为 my_file.parquet 的 Parquet 文件到 DataFrame df

  1. df = spark.mode("parquet").read("my_fle_path/my_file.parquet")

  2. df = spark.read.path("my_fle_path/my_file.parquet")

  3. df = spark.read().parquet("my_fle_path/my_file.parquet")

  4. df = spark.read.parquet("/my_fle_path/my_file.parquet")

问题 21:

以下哪个代码块对 salarydfemployeedf DataFrame 的 employeeSalaryIDemployeeID 列执行了内连接?

  1. salarydf.join(employeedf, salarydf.employeeID == employeedf.employeeSalaryID)

    1. Salarydf.createOrReplaceTempView(salarydf)

    2. employeedf.createOrReplaceTempView('employeedf')

    3. spark.sql("SELECT * FROM salarydf CROSS JOIN employeedf ON employeeSalaryID ==employeeID")

    1. salarydf

    2. .``join(employeedf, col(employeeID)==col(employeeSalaryID))

    1. Salarydf.createOrReplaceTempView(salarydf)

    2. employeedf.createOrReplaceTempView('employeedf')

    3. SELECT * FROM salarydf

    4. INNER JOIN employeedf

    5. ON salarydf.employeeSalaryID == employeedf. employeeID

问题 22:

以下哪个代码块按列 salary 降序排序返回 df DataFrame,并显示最后的缺失值?

  1. df.sort(nulls_last("salary"))

  2. df.orderBy("salary").nulls_last()

  3. df.sort("salary", ascending=False)

  4. df.nulls_last("salary")

问题 23:

以下代码块包含一个错误。该代码块应该返回一个 df DataFrame 的副本,其中列名 state 被更改为 stateID。找出错误。

代码块:

df.withColumn("stateID", "state")
  1. 方法中的参数 "stateID""state" 应该交换

  2. 应该将 withColumn 方法替换为 withColumnRenamed 方法

  3. 应该将 withColumn 方法替换为 withColumnRenamed 方法,并且需要重新排序方法的参数

  4. 没有这样的方法可以更改列名

问题 24:

以下哪个代码块在 salarydfemployeedf DataFrame 之间使用 employeeIDsalaryEmployeeID 列作为连接键执行了内连接?

  1. salarydf.join(employeedf, "inner", salarydf.employeedf == employeeID.salaryEmployeeID)

  2. salarydf.join(employeedf, employeeID == salaryEmployeeID)

  3. salarydf.join(employeedf, salarydf.salaryEmployeeID == employeedf.employeeID, "inner")

  4. salarydf.join(employeedf, salarydf.employeeID == employeedf.salaryEmployeeID, “inner”)`

问题 25:

以下代码块应返回一个df DataFrame,其中employeeID列被转换为整数。请选择正确填充代码块空白的答案以完成此操作:

df.__1__(__2__.__3__(__4__))
    1. select

    2. col("employeeID")

    3. as

    4. IntegerType

    1. select

    2. col("employeeID")

    3. as

    4. Integer

    1. cast

    2. "``employeeID"

    3. as

    4. IntegerType()

    1. select

    2. col("employeeID")

    3. cast

    4. IntegerType()

问题 26:

查找在将employeedfsalarydf DataFrames 按employeeIDemployeeSalaryID列分别连接后,结果 DataFrame 中列 department 不为空的记录数。以下哪些代码块(按顺序)应执行以实现此目的?

  1. .filter(col("department").isNotNull())

  2. .count()

  3. employeedf.join(salarydf, employeedf.employeeID == salarydf.employeeSalaryID)

  4. employeedf.join(salarydf, employeedf.employeeID ==salarydf. employeeSalaryID, how=‘inner’)`

  5. .filter(col(department).isnotnull())

  6. .sum(col(department))

  7. 3, 1, 6

  8. 3, 1, 2

  9. 4, 1, 2

  10. 3, 5, 2

问题 27:

以下哪个代码块返回了df DataFrame 中列 state 值唯一的那些行?

  1. df.dropDuplicates(subset=["state"]).show()

  2. df.distinct(subset=["state"]).show()

  3. df.drop_duplicates(subset=["state"]).show()

  4. df.unique("state").show()

问题 28:

以下代码块包含一个错误。该代码块应返回一个包含额外列squared_numberdf DataFrame 副本,该列是列 number 的平方。请找出错误。

代码块:

df.withColumnRenamed(col("number"), pow(col("number"), 0.2).alias("squared_number"))
  1. withColumnRenamed方法的参数需要重新排序

  2. 应将withColumnRenamed方法替换为withColumn方法

  3. 应将withColumnRenamed方法替换为select方法,并将0.2替换为2

  4. 应将参数0.2替换为2

问题 29:

以下哪个代码块返回了一个新的 DataFrame,其中列 salary 被重命名为new_salary,employee 被重命名为new_employeedf DataFrame 中?

  1. df.withColumnRenamed(salary, new_salary).withColumnRenamed(employee, new_employee)

  2. df.withColumnRenamed("salary", "new_salary")

  3. df.withColumnRenamed("employee", "new_employee")

  4. df.withColumn("salary", "``new_salary").withColumn("employee", "new_employee")

  5. df.withColumnRenamed("salary", "``new_salary").withColumnRenamed("employee", "new_employee")

问题 30:

以下哪个代码块返回了一个df DataFrame 的副本,其中列 salary 已被重命名为employeeSalary

  1. df.withColumn(["salary", "employeeSalary"])

  2. df.withColumnRenamed("salary").alias("employeeSalary ")

  3. df.withColumnRenamed("salary", "``employeeSalary ")

  4. df.withColumn("salary", "``employeeSalary ")

问题 31:

以下代码块包含一个错误。代码块应该将 df DataFrame 保存到 my_file_path 路径作为 Parquet 文件,并追加到任何现有的 Parquet 文件。找出错误。

df.format("parquet").option("mode", "append").save(my_file_path)
  1. 代码没有保存到正确的路径

  2. 应该交换 save()format 函数

  3. 代码块缺少对 DataFrameWriter 的引用

  4. 应该覆盖 option 模式以正确写入文件

问题 32:

我们如何将 df DataFrame 从 12 个分区减少到 6 个分区?

  1. df.repartition(12)

  2. df.coalesce(6).shuffle()

  3. df.coalesce(6, shuffle=True)

  4. df.repartition(6)

问题 33:

以下哪个代码块返回一个 DataFrame,其中时间戳列被转换为名为 record_timestamp 的新列,格式为日、月和年?

  1. df.withColumn("record_timestamp", from_unixtime(unix_timestamp(col("timestamp")), "dd-MM-yyyy"))

  2. df.withColumnRenamed("record_timestamp", from_unixtime(unix_timestamp(col("timestamp")), "dd-MM-yyyy"))

  3. df.select ("record_timestamp", from_unixtime(unix_timestamp(col("timestamp")), "dd-MM-yyyy"))

  4. df.withColumn("record_timestamp", from_unixtime(unix_timestamp(col("timestamp")), "MM-dd-yyyy"))

问题 34:

以下哪个代码块通过将 DataFrame salaryDf 的行追加到 DataFrame employeeDf 的行来创建一个新的 DataFrame,而不考虑两个 DataFrame 都有不同的列名?

  1. salaryDf.join(employeeDf)

  2. salaryDf.union(employeeDf)

  3. salaryDf.concat(employeeDf)

  4. salaryDf.unionAll(employeeDf)

问题 35:

以下代码块包含一个错误。代码块应该计算每个部门 employee_salary 列中所有工资的总和。找出错误。

df.agg("department").sum("employee_salary")
  1. 应该使用 avg(col("value")) 而不是 avg("value")

  2. 所有列名都应该用 col() 运算符包裹

  3. "storeId" 和 “value" 应该交换

  4. Agg 应该替换为 groupBy

问题 36:

以下代码块包含一个错误。代码块旨在对 salarydfemployeedf DataFrame 的 employeeSalaryIDemployeeID 列分别执行交叉连接。找出错误。

employeedf.join(salarydf, [salarydf.employeeSalaryID, employeedf.employeeID], "cross")
  1. 参数中的连接类型 "cross" 需要替换为 crossJoin

  2. salarydf.employeeSalaryID, employeedf.employeeID 应替换为 salarydf.employeeSalaryID == employeedf.employeeID

  3. 应该删除 "cross" 参数,因为 "cross" 是默认的连接类型

  4. 应从调用中删除 "cross" 参数,并用 crossJoin 替换 join

问题 37:

以下代码块包含一个错误。代码块应该显示 df DataFrame 的模式。找出错误。

df.rdd.printSchema()
  1. 在 Spark 中,我们无法打印 DataFrame 的模式

  2. printSchema 不能通过 df.rdd 调用,而应该直接从 df 调用

  3. Spark 中没有名为 printSchema() 的方法

  4. 应该使用 print_schema() 方法而不是 printSchema()

问题 38:

以下代码块应该将 df DataFrame 写入到 filePath 路径的 Parquet 文件中,替换任何现有文件。选择正确填充代码块空白处的答案以完成此操作:

df.__1__.format("parquet").__2__(__3__).__4__(filePath)
    1. save

    2. mode

    3. "``ignore"

    4. path

    1. store

    2. with

    3. "``replace"

    4. path

    1. write

    2. mode

    3. "``overwrite"

    4. save

    1. save

    2. mode

    3. "``overwrite"

    4. path

问题 39:

以下代码块包含一个错误。代码块本应按薪资降序对 df DataFrame 进行排序。然后,它应该根据奖金列进行排序,将 nulls 放在最后。找出错误。

df.orderBy ('salary', asc_nulls_first(col('bonus')))
transactionsDf.orderBy('value', asc_nulls_first(col('predError')))
  1. 应该以降序对 salary 列进行排序。此外,它应该被包裹在 col() 操作符中

  2. 应该用 col() 操作符将 salary 列包裹起来

  3. 应该以降序对 bonus 列进行排序,将 nulls 放在最后

  4. 应该使用 desc_nulls_first()bonus 列进行排序

问题 40:

以下代码块包含一个错误。代码块应该使用 square_root_method Python 方法找到 df DataFrame 中 salary 列的平方根,并在新列 sqrt_salary 中返回它。找出错误。

square_root_method_udf = udf(square_root_method)
df.withColumn("sqrt_salary", square_root_method("salary"))
  1. square_root_method 没有指定返回类型

  2. 在第二行代码中,Spark 需要调用 squre_root_method_udf 而不是 square_root_method

  3. udf 未在 Spark 中注册

  4. 需要添加一个新列

问题 41:

以下代码块包含一个错误。代码块应该返回将 employeeID 重命名为 employeeIdColumndf DataFrame。找出错误。

df.withColumn("employeeIdColumn", "employeeID")
  1. 应该使用 withColumnRenamed 方法而不是 withColumn

  2. 应该使用 withColumnRenamed 方法而不是 withColumn,并且参数 "employeeIdColumn" 应该与参数 "employeeID" 交换

  3. 参数 "employeeIdColumn""employeeID" 应该交换

  4. 应该将 withColumn 操作符替换为 withColumnRenamed 操作符

问题 42:

以下哪个代码块会返回一个新的 DataFrame,其列与 DataFrame df 相同,除了 salary 列?

  1. df.drop("salary")

  2. df.drop(col(salary))

  3. df.drop(salary)

  4. df.delete("salary")

问题 43:

以下哪个代码块返回一个 DataFrame,显示 df DataFrame 中 salary 列的平均值,按 department 列分组?

  1. df.groupBy("department").agg(avg("salary"))

  2. df.groupBy(col(department).avg())

  3. df.groupBy("department").avg(col("salary"))

  4. df.groupBy("department").agg(average("salary"))

问题 44:

以下哪个代码块创建了一个 DataFrame,显示基于部门和国家/地区列,年龄大于 35 的 salaryDf DataFrame 中 salary 列的平均值?

  1. salaryDf.filter(col("age") > 35)

  2. .``filter(col("employeeID")

  3. .``filter(col("employeeID").isNotNull())

  4. .``groupBy("department")

  5. .``groupBy("department", "state")

  6. .``agg(avg("salary").alias("mean_salary"))

  7. .``agg(average("salary").alias("mean_salary"))

    1. 1,2,5,6

    2. 1,3,5,6

    3. 1,3,6,7

    4. 1,2,4,6

问题 45:

以下代码块包含一个错误。该代码块需要缓存df DataFrame,以便此 DataFrame 具有容错性。找出错误。

df.persist(StorageLevel.MEMORY_AND_DISK_3)
  1. persist()不是 DataFrame API 的一个函数

  2. 应将df.write()df.persist结合使用以正确写入 DataFrame

  3. 存储级别不正确,应为MEMORY_AND_DISK_2

  4. 应使用df.cache()而不是df.persist()

问题 46:

以下哪个代码块在不重复的情况下连接了salaryDfemployeeDf DataFrame 的行(假设两个 DataFrame 的列相似)?

  1. salaryDf.concat(employeeDf).unique()

  2. spark.union(salaryDf, employeeDf).distinct()

  3. salaryDf.union(employeeDf).unique()

  4. salaryDf.union(employeeDf).distinct()

问题 47:

以下哪个代码块从filePath读取一个完整的 CSV 文件文件夹,包含列标题?

  1. spark.option("header",True).csv(filePath)

  2. spark.read.load(filePath)

  3. spark.read().option("header",True).load(filePath)

  4. spark.read.format("csv").option("header",True).load(filePath)

问题 48:

以下代码块包含一个错误。df DataFrame 包含列[employeeID, salary, 和 department]。该代码块应返回一个仅包含employeeIDsalary`列的 DataFrame。找出错误。

df.select(col(department))
  1. 应在select参数中指定df DataFrame 的所有列名

  2. 应将select运算符替换为drop运算符,并列出df DataFrame 中的所有列名作为列表

  3. 应将select运算符替换为drop运算符

  4. 列名department应列出为col("department")

问题 49:

以下代码块包含一个错误。该代码块应将 DataFrame df作为 Parquet 文件写入到filePath位置,在按department列分区后。找出错误。

df.write.partition("department").parquet()
  1. 应使用partitionBy()方法而不是partition()方法。

  2. 应使用partitionBy()方法而不是partition(),并将filePath添加到parquet方法中

  3. 在写入方法之前应调用partition()方法,并将filePath添加到parquet方法中

  4. 应将"department"列用col()运算符包裹

问题 50:

以下哪个代码块从内存和磁盘中移除了缓存的df DataFrame?

  1. df.unpersist()

  2. drop df

  3. df.clearCache()

  4. df.persist()

问题 51:

以下代码块应该返回一个包含额外列:test_column,其值为19df DataFrame 的副本。请选择正确填充代码块空白处的答案以完成此操作:

df.__1__(__2__, __3__)
    1. withColumn

    2. '``test_column'

    3. 19

    1. withColumnRenamed

    2. test_column

    3. lit(19)

    1. withColumn

    2. 'test_column'

    3. lit(19)

    1. withColumnRenamed

    2. test_column

    3. 19

问题 52:

以下代码块应该返回一个包含 employeeIdsalarybonusdepartment 列的 DataFrame,来自 transactionsDf DataFrame。选择正确填充空白的答案以完成此操作:

df.__1__(__2__)
    1. drop

    2. "employeeId", "salary", "bonus", "department"

    1. filter

    2. "employeeId, salary, bonus, department"

    1. select

    2. ["employeeId", "salary", "bonus", "department"]

    1. select

    2. col(["employeeId", "salary", "bonus", "department"])

问题 53:

以下哪个代码块返回了一个 DataFrame,其中 salary 列在 df DataFrame 中被转换为字符串?

  1. df.withColumn("salary", castString(“salary”, “string”))`

  2. df.withColumn("salary", col("salary").cast("string"))

  3. df.select(cast("salary", "string"))

  4. df.withColumn("salary", col("salary").castString("string"))

问题 54:

以下代码块包含错误。该代码块应该结合来自 salaryDfemployeeDf DataFrame 的数据,显示 salaryDf DataFrame 中所有与 employeeDf DataFrame 中 employeeSalaryID 列的值匹配的行。找出错误。

employeeDf.join(salaryDf, employeeDf.employeeID==employeeSalaryID)
  1. join 语句缺少右侧的 DataFrame,其中列名为 employeeSalaryID

  2. 应该使用 union 方法而不是 join

  3. 应该使用 innerJoin 而不是 join

  4. salaryDf 应该替换 employeeDf

问题 55:

以下哪个代码块读取存储在 my_file_path 的 JSON 文件作为 DataFrame?

  1. spark.read.json(my_file_path)

  2. spark.read(my_file_path, source="json")

  3. spark.read.path(my_file_path)

  4. spark.read().json(my_file_path)

问题 56:

以下代码块包含错误。该代码块应该返回一个新的 DataFrame,通过过滤 df DataFrame 中 salary 列大于 2000 的行。找出错误。

df.where("col(salary) >= 2000")
  1. 应该使用 filter() 而不是 where()

  2. where 方法的参数应该是 "col(salary) > 2000"

  3. 应该使用 > 操作符而不是 >=

  4. where 方法的参数应该是 "salary > 2000"

问题 57:

以下哪个代码块返回了一个 DataFrame,其中从 df DataFrame 中删除了 salarystate 列?

  1. df.withColumn ("salary", "state")

  2. df.drop(["salary", "state"])

  3. df.drop("salary", "state")

  4. df.withColumnRenamed ("salary", "state")

问题 58:

以下哪个代码块返回了一个包含 df DataFrame 中每个部门计数的两列 DataFrame?

  1. df.count("department").distinct()

  2. df.count("department")

  3. df.groupBy("department").count()

  4. df.groupBy("department").agg(count("department"))

问题 59:

以下哪个代码块打印了 DataFrame 的模式,并包含列名和类型?

  1. print(df.columns)

  2. df.printSchema()

  3. df.rdd.printSchema()

  4. df.print_schema()

问题 60:

以下哪个代码块创建了一个新的 DataFrame,包含三个列:department(部门),age(年龄)和max_salary(最高薪水),并且对于每个部门以及每个年龄组的每个员工都有最高的薪水?

  1. df.max(salary)

  2. df.groupBy(["department", "age"]).agg(max("salary").alias("max_salary"))

  3. df.agg(max(salary).alias(max_salary')

  4. df.groupby(department).agg(max(salary).alias(max_salary)

答案

  1. B

  2. A

  3. D

  4. D

  5. A

  6. D

  7. A

  8. D

  9. C

  10. B

  11. C

  12. A

  13. A

  14. A

  15. A

  16. C

  17. B

  18. B

  19. D

  20. D

  21. D

  22. C

  23. C

  24. D

  25. D

  26. C

  27. A

  28. C

  29. E

  30. C

  31. C

  32. D

  33. A

  34. B

  35. D

  36. B

  37. B

  38. C

  39. A

  40. B

  41. B

  42. A

  43. A

  44. A

  45. C

  46. D

  47. D

  48. C

  49. B

  50. A

  51. C

  52. C

  53. B

  54. A

  55. A

  56. D

  57. C

  58. C

  59. B

  60. B

第十章:模拟测试 2

问题

尝试回答这些问题以测试你对 Apache Spark 的了解:

问题 1:

Spark 中的任务是什么?

  1. 在任务中为每个数据分区执行的工作单元是插槽

  2. 任务是 Spark 中可以执行的第二小实体

  3. 具有宽依赖的任务可以合并为单个任务

  4. 任务是 Spark 中可以执行的最小组件

问题 2:

执行器在 Spark 中的角色是什么?

  1. 执行器的角色是请求将操作转换为有向无环图 (DAG)

  2. Spark 环境中只能有一个执行器

  3. 执行器负责执行驱动程序分配给它们的任务

  4. 执行器安排查询以执行

问题 3:

以下哪个是自适应查询执行在 Spark 中的任务之一?

  1. 自适应查询执行在查询执行期间收集运行时统计信息以优化查询计划

  2. 自适应查询执行负责将任务分配给执行器

  3. 自适应查询执行负责 Spark 中的宽操作

  4. 自适应查询执行负责 Spark 中的容错

问题 4:

Spark 执行层次结构中的最低级别是什么?

  1. 任务

  2. 插槽

  3. 作业

  4. 阶段

问题 5:

以下哪个操作是动作?

  1. DataFrame.count()

  2. DataFrame.filter()

  3. DataFrame.select()

  4. DataFrame.groupBy()

问题 6:

以下哪个描述了 DataFrame API 的特性?

  1. DataFrame API 在后端基于弹性分布式数据集 (RDD)

  2. DataFrame API 在 Scala 中可用,但在 Python 中不可用

  3. DataFrame API 没有数据操作函数

  4. DataFrame API 用于在执行器中分配任务

问题 7:

以下哪个关于执行器的陈述是准确的?

  1. 插槽不是执行器的一部分

  2. 执行器能够通过插槽并行运行任务

  3. 执行器始终等于任务

  4. 执行器负责为作业分配任务

问题 8:

以下哪个关于 Spark 驱动程序的陈述是准确的?

  1. Spark 应用程序中有多个驱动程序

  2. 插槽是驱动程序的一部分

  3. 驱动程序并行执行任务

  4. 将操作转换为 DAG 计算的责任在于 Spark 驱动程序

问题 9:

以下哪个操作是宽转换?

  1. DataFrame.show()

  2. DataFrame.groupBy()

  3. DataFrame.repartition()

  4. DataFrame.select()

  5. DataFrame.filter()

问题 10:

以下哪个关于惰性评估的陈述是正确的?

  1. 执行是由转换触发的

  2. 执行是由动作触发的

  3. 语句按照代码中的顺序执行

  4. Spark 将任务分配给不同的执行器

问题 11:

以下哪个关于 Spark 中的 DAGs 的陈述是正确的?

  1. DAGs 是惰性评估的

  2. DAGs 可以在 Spark 中水平扩展

  3. DAGs 负责以优化和分布式的方式处理分区

  4. DAG 由可以并行运行的任务组成

问题 12:

以下哪个关于 Spark 容错机制的陈述是正确的?

  1. Spark 通过 DAGs 实现容错能力

  2. 使 Spark 具备容错能力是执行器的责任

  3. 由于容错能力,Spark 可以重新计算任何失败的 RDD

  4. Spark 在传统的 RDD 数据系统之上构建了一个容错层,而 RDD 本身并不具备容错能力

问题 13:

Spark 容错机制的核心是什么?

  1. RDD 是 Spark 的核心,它设计上具有容错能力

  2. 数据分区,因为数据可以被重新计算

  3. DataFrame 是 Spark 的核心,因为它是不变的

  4. 执行器确保 Spark 保持容错能力

问题 14:

Spark 中的作业有哪些准确之处?

  1. 作业的不同阶段可以并行执行

  2. 作业的不同阶段不能并行执行

  3. 一个任务由许多作业组成

  4. 一个阶段由许多作业组成

问题 15:

Spark 中的 shuffle 有哪些准确之处?

  1. 在 shuffle 过程中,数据被发送到多个分区进行处理

  2. 在 shuffle 过程中,数据被发送到单个分区进行处理

  3. Shuffle 是一个触发 Spark 评估的操作

  4. 在 shuffle 过程中,所有数据都保留在内存中以便处理

问题 16:

Spark 中的集群管理器有哪些准确之处?

  1. 集群管理器负责管理 Spark 的资源

  2. 集群管理器负责直接与执行器协同工作

  3. 集群管理器负责创建查询计划

  4. 集群管理器负责优化 DAGs

问题 17:

以下代码块需要计算df DataFrame 中每个部门的salary列的总和和平均值。然后,它应该计算bonus列的总和和最大值:

df.___1___ ("department").___2___ (sum("salary").alias("sum_salary"), ___3___ ("salary").alias("avg_salary"), sum("bonus").alias("sum_bonus"), ___4___("bonus").alias("max_bonus") )

选择正确的答案来填充代码块中的空白,以完成此任务:

    1. groupBy

    2. agg

    3. avg

    4. max

    1. filter

    2. agg

    3. avg

    4. max

    1. groupBy

    2. avg

    3. agg

    4. max

    1. groupBy

    2. agg

    3. avg

    4. avg

问题 18:

以下代码块中包含一个错误。代码块需要将salaryDf DataFrame 与较大的employeeDf DataFrame 在employeeID列上连接:

salaryDf.join(employeeDf, "employeeID", how="broadcast")

识别错误:

  1. 代码应该使用innerJoin而不是join

  2. broadcast不是 Spark 中用于连接两个 DataFrames 的join类型

  3. salaryDfemployeeDf应该交换

  4. how参数中,应该使用crossJoin而不是broadcast

问题 19:

以下哪个代码块将df DataFrame 的 shuffle 操作从 5 个分区变为 20 个分区?

  1. df.repartition(5)

  2. df.repartition(20)

  3. df.coalesce(20)

  4. df.coalesce(5)

问题 20:

以下哪个操作将触发评估?

  1. df.filter()

  2. df.distinct()

  3. df.intersect()

  4. df.join()

  5. df.count()

问题 21:

以下哪个代码块返回df DataFrame 中agename列的唯一值,并在各自的列中保持所有值唯一?

  1. df.select('age').join(df.select('name'), col(state) == col('name'), 'inner').show()

  2. df.select(col('age'), col('name')).agg({'*': 'count'}).show()

  3. df.select('age', 'name').distinct().show()

  4. df.select('age').unionAll(df.select('name')).distinct().show()

问题 22

以下哪个代码块返回df DataFrame 中总行数的计数?

  1. df.count()

  2. df.select(col('state'), col('department')).agg({'*': 'count'}).show()

  3. df.select('state', 'department').distinct().show()

  4. df.select('state').union(df.select('department')).distinct().show()

问题 23

以下代码块包含一个错误。代码块应该将df DataFrame 保存为新的 parquet 文件到filePath路径:

df.write.mode("append").parquet(filePath)

识别错误:

  1. 代码块应该有overwrite选项而不是append

  2. 代码应该是write.parquet而不是write.mode

  3. 不能直接从 DataFrame 中调用df.write操作

  4. 代码的第一部分应该是df.write.mode(append)

问题 24

以下哪个代码块向df DataFrame 中添加了一个salary_squared列,该列是salary列的平方?

  1. df.withColumnRenamed("salary_squared", pow(col("salary"), 2))

  2. df.withColumn("salary_squared", col("salary"*2))

  3. df.withColumn("salary_squared", pow(col("salary"), 2))

  4. df.withColumn("salary_squared", square(col("salary")))

问题 25

以下哪个代码块执行了一个连接操作,其中小的salaryDf DataFrame 被发送到所有执行器,以便可以在employeeSalaryIDEmployeeID列上与employeeDf DataFrame 进行连接?

  1. employeeDf.join(salaryDf, "employeeDf.employeeID == salaryDf.employeeSalaryID", "inner")

  2. employeeDf.join(salaryDf, "employeeDf.employeeID == salaryDf.employeeSalaryID", "broadcast")

  3. employeeDf.join(broadcast(salaryDf), employeeDf.employeeID == salaryDf.employeeSalaryID)

  4. salaryDf.join(broadcast(employeeDf), employeeDf.employeeID == salaryDf.employeeSalaryID)

问题 26

以下哪个代码块在salarydf DataFrame 和employeedf DataFrame 之间执行了外连接,使用employeeIDsalaryEmployeeID列作为连接键分别?

  1. Salarydf.join(employeedf, "outer", salarydf.employeedf == employeeID.salaryEmployeeID)

  2. salarydf.join(employeedf, employeeID == salaryEmployeeID)

  3. salarydf.join(employeedf, salarydf.salaryEmployeeID == employeedf.employeeID, "outer")

  4. salarydf.join(employeedf, salarydf.employeeID == employeedf.salaryEmployeeID, "outer")

问题 27

以下哪个代码块会打印出df DataFrame 的模式?

  1. df.rdd.printSchema

  2. df.rdd.printSchema()

  3. df.printSchema

  4. df.printSchema()

问题 28

以下哪个代码块在 salarydf DataFrame 和 employeedf DataFrame 之间执行左连接,使用 employeeID 列?

  1. salaryDf.join(employeeDf, salaryDf["employeeID"] == employeeDf["employeeID"], "outer")

  2. salaryDf.join(employeeDf, salaryDf["employeeID"] == employeeDf["employeeID"], "left")

  3. salaryDf.join(employeeDf, salaryDf["employeeID"] == employeeDf["employeeID"], "inner")

  4. salaryDf.join(employeeDf, salaryDf["employeeID"] == employeeDf["employeeID"], "right")

问题 29:

以下哪个代码块按升序聚合了df DataFrame 中的bonus列,并且nulls值排在最后?

  1. df.agg(asc_nulls_last("bonus").alias("bonus_agg"))

  2. df.agg(asc_nulls_first("bonus").alias("bonus_agg"))

  3. df.agg(asc_nulls_last("bonus", asc).alias("bonus_agg"))

  4. df.agg(asc_nulls_first("bonus", asc).alias("bonus_agg"))

问题 30:

以下代码块包含一个错误。该代码块应该通过在 employeeIDemployeeSalaryID 列上分别连接 employeeDfsalaryDf DataFrame 来返回一个 DataFrame,同时从最终的 DataFrame 中排除 employeeDf DataFrame 中的 bonusdepartment 列以及 salaryDf DataFrame 中的 salary 列。

employeeDf.groupBy(salaryDf, employeeDf.employeeID == salaryDf.employeeSalaryID, "inner").delete("bonus", "department", "salary")

识别错误:

  1. groupBy 应该替换为 innerJoin 操作符

  2. groupBy 应该替换为一个 join 操作符,并且 delete 应该替换为 drop

  3. groupBy 应该替换为 crossJoin 操作符,并且 delete 应该替换为 withColumn

  4. groupBy 应该替换为一个 join 操作符,并且 delete 应该替换为 withColumnRenamed

问题 31:

以下哪个代码块将 /loc/example.csv CSV 文件作为 df DataFrame 读取?

  1. df = spark.read.csv("/loc/example.csv")

  2. df = spark.mode("csv").read("/loc/example.csv")

  3. df = spark.read.path("/loc/example.csv")

  4. df = spark.read().csv("/loc/example.csv")

问题 32:

以下哪个代码块使用名为 my_schema 的模式文件在 my_path 位置读取一个 parquet 文件?

  1. spark.read.schema(my_schema).format("parquet").load(my_path)

  2. spark.read.schema("my_schema").format("parquet").load(my_path)

  3. spark.read.schema(my_schema).parquet(my_path)

  4. spark.read.parquet(my_path).schema(my_schema)

问题 33:

我们想要找到在将employeedfsalarydf DataFrame 在employeeIDemployeeSalaryID列上分别连接时,结果 DataFrame 中的记录数。应该执行哪些代码块来实现这一点?

  1. .``filter(~isnull(col(department)))

  2. .``count()

  3. employeedf.join(salarydf, col("employeedf.employeeID")==col("salarydf.employeeSalaryID"))

  4. employeedf.join(salarydf, employeedf. employeeID ==salarydf. employeeSalaryID, how='inner')

  5. .``filter(col(department).isnotnull())

  6. .``sum(col(department))

    1. 3, 1, 6

    2. 3, 1, 2

    3. 4, 2

    4. 3, 5, 2

问题 34:

以下哪个代码块返回一个 df DataFrame 的副本,其中 state 列的名称已更改为 stateID

  1. df.withColumnRenamed("state", "stateID")

  2. df.withColumnRenamed("stateID", "state")

  3. df.withColumn("state", "stateID")

  4. df.withColumn("stateID", "state")

问题 35:

以下哪个代码块返回一个 df DataFrame 的副本,其中 salary 列已转换为 integer

  1. df.col("salary").cast("integer"))

  2. df.withColumn("salary", col("salary").castType("integer"))

  3. df.withColumn("salary", col("salary").convert("integerType()"))

  4. df.withColumn("salary", col("salary").cast("integer"))

问题 36:

以下哪个代码块将 df DataFrame 分成两半,即使代码多次运行,值也完全相同?

  1. df.randomSplit([0.5, 0.5], seed=123)

  2. df.split([0.5, 0.5], seed=123)

  3. df.split([0.5, 0.5])

  4. df.randomSplit([0.5, 0.5])

问题 37:

以下哪个代码块按两个列,salarydepartment,排序 df DataFrame,其中 salary 是升序,department 是降序?

  1. df.sort("salary", asc("department"))

  2. df.sort("salary", desc(department))

  3. df.sort(col(salary)).desc(col(department))

  4. df.sort("salary", desc("department"))

问题 38:

以下哪个代码块从 salaryDf DataFrame 的 bonus 列计算平均值,并将其添加到名为 average_bonus 的新列中?

  1. salaryDf.avg("bonus").alias("average_bonus"))

  2. salaryDf.agg(avg("bonus").alias("average_bonus"))

  3. salaryDf.agg(sum("bonus").alias("average_bonus"))

  4. salaryDf.agg(average("bonus").alias("average_bonus"))

问题 39:

以下哪个代码块将 df DataFrame 保存到 /FileStore/file.csv 位置作为 CSV 文件,如果位置中已存在文件则抛出错误?

  1. df.write.mode("error").csv("/FileStore/file.csv")

  2. df.write.mode.error.csv("/FileStore/file.csv")

  3. df.write.mode("exception").csv("/FileStore/file.csv")

  4. df.write.mode("exists").csv("/FileStore/file.csv")

问题 40:

以下哪个代码块读取位于 /my_path/my_csv.csv CSV 文件到 DataFrame 中?

  1. spark.read().mode("csv").path("/my_path/my_csv.csv")

  2. spark.read.format("csv").path("/my_path/my_csv.csv")

  3. spark.read("csv", "/my_path/my_csv.csv")

  4. spark.read.csv("/my_path/my_csv.csv")

问题 41:

以下哪个代码块显示 df DataFrame 的前 100 行,其中包含 salary 列,按降序排列?

  1. df.sort(asc(value)).show(100)

  2. df.sort(col("value")).show(100)

  3. df.sort(col("value").desc()).show(100)

  4. df.sort(col("value").asc()).print(100)

问题 42:

以下哪个代码块创建了一个 DataFrame,它显示了基于 departmentstate 列的 salary 列的平均值,其中 age 大于 35,并且返回的 DataFrame 应该按 employeeID 列升序排序,以确保该列没有空值?

  1. salaryDf.filter(col("age") > 35)

  2. .``filter(col("employeeID")

  3. .``filter(col("employeeID").isNotNull())

  4. .``groupBy("department")

  5. .``groupBy("department", "state")

  6. .``agg(avg("salary").alias("mean_salary"))

  7. .``agg(average("salary").alias("mean_salary"))

  8. .``orderBy("employeeID")

    1. 1, 2, 5, 6, 8

    2. 1, 3, 5, 6, 8

    3. 1, 3, 6, 7, 8

    4. 1, 2, 4, 6, 8

问题 43:

以下代码块包含一个错误。代码块应该返回一个新的 DataFrame,不包含 employeesalary 列,并添加一个 fixed_value 列,其值为 100

df.withColumnRenamed(fixed_value).drop('employee', 'salary')

确定错误:

  1. withcolumnRenamed 应该替换为 withcolumn,并且应该使用 lit() 函数来填充 100 的值

  2. withcolumnRenamed 应该替换为 withcolumn

  3. drop 函数中应该交换 employeesalary

  4. lit() 函数调用缺失

问题 44:

以下哪个代码块返回了 df DataFrame 中数值和字符串列的基本统计信息?

  1. df.describe()

  2. df.detail()

  3. df.head()

  4. df.explain()

问题 45:

以下哪个代码块返回了 df DataFrame 的前 5 行?

  1. df.select(5)

  2. df.head(5)

  3. df.top(5)

  4. df.show()

问题 46:

以下哪个代码块创建了一个新的 DataFrame,包含来自 df DataFrame 的 departmentagesalary 列?

  1. df.select("department", "``age", "salary")

  2. df.drop("department", "``age", "salary")

  3. df.filter("department", "``age", "salary")

  4. df.where("department", "``age", "salary")

问题 47:

以下哪个代码块创建了一个新的 DataFrame,包含三个列,departmentagemax_salary,其中每个部门以及每个年龄组的最高工资来自 df DataFrame?

df.___1___ (["department", "age"]).___2___ (___3___ ("salary").alias("max_salary"))

确定正确答案:

    1. filter

    2. agg

    3. max

    1. groupBy

    2. agg

    3. max

    1. filter

    2. agg

    3. sum

    1. groupBy

    2. agg

    3. sum

问题 48:

以下代码块包含一个错误。代码块应该返回一个新的 DataFrame,通过行筛选,其中 salary 列在 df DataFrame 中大于或等于 1000

df.filter(F(salary) >= 1000)

确定错误:

  1. 应该使用 where() 而不是 filter()

  2. 应该将 F(salary) 操作替换为 F.col("salary")

  3. 应该使用 > 操作符而不是 >=

  4. where 方法的参数应该是 "salary > 1000"

问题 49:

以下哪个代码块返回了一个 df DataFrame 的副本,其中 department 列已被重命名为 business_unit

  1. df.withColumn(["department", "business_unit"])

  2. itemsDf.withColumn("department").alias("business_unit")

  3. itemsDf.withColumnRenamed("department", "business_unit")

  4. itemsDf.withColumnRenamed("business_unit", "department")

问题 50:

以下哪个代码块从df DataFrame 返回包含每个部门员工总数的数据帧?

  1. df.groupBy("department").agg(count("*").alias("total_employees"))

  2. df.filter("department").agg(count("*").alias("total_employees"))

  3. df.groupBy("department").agg(sum("*").alias("total_employees"))

  4. df.filter("department").agg(sum("*").alias("total_employees"))

问题 51:

以下哪个代码块从df DataFrame 返回将employee列转换为字符串类型的 DataFrame?

  1. df.withColumn("employee", col("employee").cast_type("string"))

  2. df.withColumn("employee", col("employee").cast("string"))

  3. df.withColumn("employee", col("employee").cast_type("stringType()"))

  4. df.withColumnRenamed("employee", col("employee").cast("string"))

问题 52:

以下哪个代码块返回一个新的 DataFrame,其中包含一个新的fixed_value列,该列在df DataFrame 的所有行中都有Z

  1. df.withColumn("fixed_value", F.lit("Z"))

  2. df.withColumn("fixed_value", F("Z"))

  3. df.withColumnRenamed("fixed_value", F.lit("Z"))

  4. df.withColumnRenamed("fixed_value", lit("Z"))

问题 53:

以下哪个代码块返回一个新的 DataFrame,其中包含一个新的upper_string列,它是df DataFrame 中employeeName列的大写版本?

  1. df.withColumnRenamed('employeeName', upper(df.upper_string))

  2. df.withColumnRenamed('upper_string', upper(df.employeeName))

  3. df.withColumn('upper_string', upper(df.employeeName))

  4. df.withColumn(' employeeName', upper(df.upper_string))

问题 54:

以下代码块包含一个错误。该代码块本应使用 udf 将员工姓名转换为大写:

capitalize_udf = udf(lambda x: x.upper(), StringType())
df_with_capitalized_names = df.withColumn("capitalized_name", capitalize("employee"))

识别错误:

  1. 应该使用capitalize_udf函数而不是capitalize

  2. udf函数capitalize_udf没有正确地转换为大写。

  3. 应该使用IntegerType()而不是StringType()

  4. 应该使用df.withColumn("employee", capitalize("capitalized_name"))代替df.withColumn("capitalized_name", capitalize("employee")),而不是df.withColumn("capitalized_name", capitalize("employee"))

问题 55:

以下代码块包含一个错误。该代码块本应按薪资升序对df DataFrame 进行排序。然后,它应该根据bonus列进行排序,将nulls放在最后。

df.orderBy ('salary', asc_nulls_first(col('bonus')))

识别错误:

  1. salary列应该以降序排序,并使用desc_nulls_last代替asc_nulls_first。此外,它应该被col()运算符包裹。

  2. salary列应该被col()运算符包裹。

  3. 奖金列应该以降序排序,并将 null 值放在最后。

  4. 奖金列应该按照desc_nulls_first()进行排序。

问题 56:

以下代码块包含一个错误。该代码块需要根据 department 列对 df DataFrame 进行分组,并计算每个部门的总工资和平均工资。

df.filter("department").agg(sum("salary").alias("sum_salary"), avg("salary").alias("avg_salary"))

识别错误:

  1. avg 方法也应该通过 agg 函数调用

  2. 应该使用 groupBy 而不是 filter

  3. agg 方法的语法不正确

  4. 应该在 salary 上进行过滤,而不是在 department 上进行过滤

问题 57

哪个代码块将 df DataFrame 写入到 filePath 路径上的 parquet 文件,并按 department 列进行分区?

  1. df.write.partitionBy("department").parquet(filePath)

  2. df.write.partition("department").parquet(filePath)

  3. df.write.parquet("department").partition(filePath)

  4. df.write.coalesce("department").parquet(filePath)

问题 58

df DataFrame 包含列 [employeeID, salary, department]。以下哪段代码将返回只包含列 [employeeID, salary]df DataFrame?

  1. df.drop("department")

  2. df.select(col(employeeID))

  3. df.drop("department", "salary")

  4. df.select("employeeID", "department")

问题 59

以下哪个代码块返回一个新的 DataFrame,其列与 df DataFrame 相同,除了 salary 列?

  1. df.drop(col("salary"))

  2. df.delete(salary)

  3. df.drop(salary)

  4. df.delete("salary")

问题 60

以下代码块包含一个错误。该代码块应该返回将 employeeID 重命名为 employeeIdColumndf DataFrame。

df.withColumnRenamed("employeeIdColumn", "employeeID")

识别错误:

  1. 代替 withColumnRenamed,应该使用 withColumn 方法

  2. 应该使用 withColumn 方法代替 withColumnRenamed,并且将 "employeeIdColumn" 参数与 "employeeID" 参数交换

  3. "employeeIdColumn""employeeID" 参数应该交换

  4. withColumnRenamed 不是一个 DataFrame 的方法

答案

  1. D

  2. C

  3. A

  4. A

  5. A

  6. A

  7. B

  8. D

  9. C

  10. B

  11. C

  12. C

  13. A

  14. B

  15. A

  16. A

  17. A

  18. B

  19. B

  20. E

  21. C

  22. A

  23. A

  24. C

  25. C

  26. D

  27. D

  28. B

  29. A

  30. B

  31. A

  32. A

  33. C

  34. A

  35. D

  36. A

  37. D

  38. B

  39. A

  40. D

  41. C

  42. B

  43. A

  44. A

  45. B

  46. A

  47. B

  48. B

  49. C

  50. A

  51. B

  52. A

  53. C

  54. A

  55. A

  56. B

  57. A

  58. A

  59. A

  60. C

Logo

更多推荐