原文:annas-archive.org/md5/b306e51c73948c57f772d5af5f61eb39

译者:飞龙

协议:CC BY-NC-SA 4.0

第六章:分析学习:使用 TensorFlow 进行 AI 和图像识别

“人工智能、深度学习、机器学习——无论你在做什么,如果你不理解它——就学习它。因为不然,你将在 3 年内变成恐龙。”
马克·库班

这是一个涵盖流行行业用例的系列示例应用中的第一章,毫无疑问,我从一个与机器学习,特别是通过图像识别示例应用的深度学习相关的用例开始。这几年,人工智能AI)领域经历了加速增长,许多实际应用已成为现实,比如自动驾驶汽车,具备高级自动语音识别的聊天机器人,这些技术在某些任务中完全能够替代人工操作员,而越来越多的人,无论是学术界还是产业界,开始参与其中。然而,仍然存在一种看法,认为进入的门槛很高,并且掌握机器学习背后的数学概念是前提条件。在本章中,我们尝试通过示例来演示,事实并非如此。

我们将以简要介绍机器学习开始本章,并介绍其一个子集——深度学习。接着我们将介绍一个非常流行的深度学习框架——TensorFlow,我们将利用它来构建一个图像识别模型。在本章的第二部分,我们将展示如何通过实现一个名为 PixieApp 的示例应用来将我们构建的模型投入实际使用,用户可以输入一个网站链接,获取该网站的所有图片,并将其作为输入传递给模型进行分类。

在本章结束时,你应该确信,即使没有机器学习博士学位,也完全可以构建有意义的应用并将其投入实际使用。

什么是机器学习?

我认为很好地捕捉到机器学习直觉的一个定义来自斯坦福大学的副教授 Andrew Ng,在他的 Coursera 课程机器学习中提到(www.coursera.org/learn/machine-learning):

机器学习是让计算机通过学习来完成任务,而不是通过显式编程。

上述定义中的关键词是学习,在此上下文中,学习的含义与我们人类的学习方式非常相似。继续这一类比,从小开始,我们就被教导如何通过示范或者通过自身的试错过程完成一项任务。广义来说,机器学习算法可以分为两种类型,这两种类型对应于人类学习的两种方式:

  • 监督学习:算法从已正确标注的示例数据中学习。这些数据也叫做训练数据,或者有时被称为地面真实

  • 无监督学习:算法能够从未标记的数据中自行学习。

下面的表格概述了每个类别中最常用的机器学习算法及其解决的问题类型:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_01.jpg

机器学习算法列表

这些算法的输出被称为模型,并用于对从未见过的新输入数据进行预测。构建和部署这些模型的整个端到端过程在不同类型的算法中是非常一致的。

下图展示了这个过程的高层次工作流:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_02.jpg

机器学习模型工作流

和往常一样,工作流从数据开始。在监督学习的情况下,数据将作为示例使用,因此必须正确标记答案。然后,输入数据被处理以提取内在特性,称为特征,我们可以将它们看作是代表输入数据的数值。随后,这些特征被输入到一个机器学习算法中,构建出一个模型。在典型设置中,原始数据会被拆分为训练数据、测试数据和盲数据。在模型构建阶段,测试数据和盲数据用于验证和优化模型,以确保模型不会过度拟合训练数据。过度拟合发生在模型参数过于紧密地跟随训练数据,导致在使用未见过的数据时出现错误。当模型达到预期的准确度时,它会被部署到生产环境中,并根据宿主应用的需求对新数据进行预测。

在本节中,我们将提供一个非常高层次的机器学习介绍,配以简化的数据流水线工作流,足以让你理解模型是如何构建和部署的。如果你是初学者,我强烈推荐 Andrew Ng 在 Coursera 上的机器学习课程(我自己也时常回顾)。在接下来的部分,我们将介绍机器学习的一个分支——深度学习,我们将用它来构建图像识别示例应用。

什么是深度学习?

让计算机学习、推理和思考(做决策)是一门被称为认知计算的科学,其中机器学习和深度学习是重要组成部分。下图展示了这些领域如何与 AI 这一广泛领域相关:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_03.jpg

深度学习在 AI 中的位置

正如图示所示,深度学习是机器学习算法的一种类型。或许不为人所广知的是,深度学习领域已经存在相当长的时间,但直到最近才被广泛应用。兴趣的复燃是由于近年来计算机、云计算和存储技术的巨大进步,这些技术推动了人工智能的指数增长,并催生了许多新的深度学习算法,每个算法都特别适合解决特定问题。

正如我们在本章稍后讨论的,深度学习算法特别擅长学习复杂的非线性假设。它们的设计实际上是受到人脑工作方式的启发,例如,输入数据通过多个计算单元层进行处理,以将复杂的模型表示(例如图像)分解为更简单的表示,然后将结果传递到下一层,依此类推,直到到达负责输出结果的最终层。这些层的组合也被称为神经网络,构成一层的计算单元被称为神经元。本质上,一个神经元负责接收多个输入,并将其转换为单一输出,然后这个输出可以输入到下一层的其他神经元。

以下图示表示了一个用于图像分类的多层神经网络:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_04.jpg

图像分类的神经网络高级表示

上述神经网络也被称为前馈网络,因为每个计算单元的输出作为输入传递到下一层,从输入层开始。中间层被称为隐藏层,包含由网络自动学习的中间特征。在我们的图像示例中,某些神经元可能负责检测角落,而其他神经元则可能专注于边缘,依此类推。最终的输出层负责为每个输出类别分配一个置信度(得分)。

一个重要的问题是,神经元的输出是如何从输入生成的?在不深入探讨涉及的数学内容的前提下,每个人工神经元会对其输入的加权和应用激活函数 https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_26.jpg,以决定它是否应该激活

以下公式计算加权和:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_27.jpg

其中 https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_28.jpg 是层 ii + 1 之间的权重矩阵。这些权重是在稍后讨论的训练阶段中计算得出的。

注意

注意:前面公式中的偏置表示偏置神经元的权重,它是每一层中添加的一个额外神经元,其 x 值为 +1。偏置神经元很特殊,因为它贡献了下一层的输入,但与上一层没有连接。然而,它的权重仍然像其他神经元一样被正常学习。偏置神经元的直觉是,它为线性回归方程提供了常数项 b:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_29.jpg

当然,应用神经元激活函数 https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_30.jpgA 上,不能简单地产生一个二进制(0 或 1)值,因为如果多个类别都被赋予了 1 的分数,我们就无法正确地排序最终的候选答案。相反,我们使用提供 0 到 1 之间非离散分数的激活函数,并设置一个阈值(例如 0.5)来决定是否激活神经元。

最常用的激活函数之一是 sigmoid 函数:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_31.jpg

下图展示了如何使用 sigmoid 激活函数根据输入和权重计算神经元的输出:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_05.jpg

使用 sigmoid 函数计算神经元输出

其他常用的激活函数包括双曲正切 https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_32.jpg修正线性单元ReLu):https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_33.jpg。当有很多层时,ReLu 的表现更好,因为它提供了稀疏的激活神经元,从而减少噪音并加快学习速度。

前馈传播用于模型评分时,但在训练神经网络的权重矩阵时,一种常用的方法叫做反向传播en.wikipedia.org/wiki/Backpropagation)。

以下高层步骤描述了训练是如何进行的:

  1. 随机初始化权重矩阵(最好使用较小的值,例如 https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_34.jpg)。

  2. 使用之前描述的前向传播方法,对所有训练样本进行计算,使用你选择的激活函数计算每个神经元的输出。

  3. 为你的神经网络实现一个成本函数。成本函数量化了与训练样本的误差。可以与反向传播算法一起使用的成本函数有多种,例如均方误差(en.wikipedia.org/wiki/Mean_squared_error)和交叉熵(en.wikipedia.org/wiki/Cross_entropy)。

  4. 使用反向传播来最小化你的成本函数并计算权重矩阵。反向传播的基本思想是从输出层的激活值开始,计算与训练数据的误差,并将这些误差反向传递到隐藏层。然后,这些误差会被调整,以最小化步骤 3 中实现的成本函数。

注意

注意:详细解释这些成本函数以及它们如何被优化超出了本书的范围。若想深入了解,我强烈推荐阅读 MIT 出版社的《深度学习》一书(Ian Goodfellow、Yoshua Bengio 和 Aaron Courville)。

在本节中,我们从高层次讨论了神经网络的工作原理以及它们是如何训练的。当然,我们只触及了这项激动人心的技术的皮毛,但希望你应该能大致了解它们的工作方式。在接下来的部分,我们将开始研究 TensorFlow,这是一个帮助抽象实现神经网络底层复杂性的编程框架。

开始使用 TensorFlow

除了 TensorFlow (www.tensorflow.org) 之外,我还可以选择多个开源深度学习框架用于这个示例应用程序。

以下是一些最流行的框架:

TensorFlow API 支持多种语言:Python、C++、Java、Go,最近还包括 JavaScript。我们可以将 API 分为两类:高级和低级,具体如下图所示:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_06.jpg

TensorFlow 高级 API 架构

为了开始使用 TensorFlow API,让我们构建一个简单的神经网络,学习 XOR 转换。

提醒一下,XOR 运算符只有四个训练样本:

X Y 结果
0 0 0
0 1 1
1 0 1
1 1 0

有趣的是,线性分类器 (en.wikipedia.org/wiki/Linear_classifier) 无法学习 XOR 转换。然而,我们可以通过一个简单的神经网络来解决这个问题,该网络有两个输入层神经元、一个隐藏层(包含两个神经元)和一个输出层(包含一个神经元,进行二分类),如下所示:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_07.jpg

XOR 神经网络

注意

注意:你可以通过以下命令直接从 Notebook 安装 TensorFlow:

!pip install tensorflow

像往常一样,在成功安装任何内容后,别忘了重启内核。

为了创建输入层和输出层的张量,我们使用tf.placeholder API,如下代码所示:

import tensorflow as tf
x_input = tf.placeholder(tf.float32)
y_output = tf.placeholder(tf.float32)

然后,我们使用tf.Variable API (www.tensorflow.org/programmers_guide/variables) 初始化矩阵的随机值!TensorFlow 入门 和 https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_new_02.jpg,分别对应隐藏层和输出层:

eps = 0.01
W1 = tf.Variable(tf.random_uniform([2,2], -eps, eps))
W2 = tf.Variable(tf.random_uniform([2,1], -eps, eps))

对于激活函数,我们使用 sigmoid 函数:

注意

注意:为了简化,我们省略了偏置的介绍。

layer1 = tf.sigmoid(tf.matmul(x_input, W1))
output_layer = tf.sigmoid(tf.matmul(layer1, W2))

对于损失函数,我们使用MSE(即均方误差):

cost = tf.reduce_mean(tf.square(y_output - output_layer))

在图中的所有张量就位后,我们可以使用tf.train.GradientDescentOptimizer,学习率为0.05,来最小化我们的损失函数,开始训练:

train = tf.train.GradientDescentOptimizer(0.05).minimize(cost)
training_data = ([[0,0],[0,1],[1,0],[1,1]], [[0],[1],[1],[0]])
with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    for i in range(5000):
        sess.run(train,
            feed_dict={x_input: training_data[0], y_output: training_data[1]})

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode1.py

上述代码首次引入了 TensorFlow Session 的概念,这是框架的基础部分。实际上,任何 TensorFlow 操作必须在Session的上下文中执行,使用其run方法。会话还维护需要显式释放的资源,通过close方法来释放。为了方便,Session类通过提供__enter____exit__方法支持上下文管理协议。这允许调用者使用with语句 (docs.python.org/3/whatsnew/2.6.html#pep-343-the-with-statement) 来调用 TensorFlow 操作,并自动释放资源。

以下伪代码展示了一个典型的 TensorFlow 执行结构:

with tf.Session() as sess:
    with-block statement with TensorFlow operations

在本节中,我们快速探讨了低级 TensorFlow API,构建了一个简单的神经网络,学习了 XOR 转换。在下一节中,我们将探讨提供高级抽象层的更高层次的估计器 API。

使用 DNNClassifier 进行简单的分类

注意

注意:本节讨论了一个示例 PixieApp 的源代码。如果你想跟着操作,可能更容易直接下载完整的 Notebook 文件,位于这个位置:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/TensorFlow%20classification.ipynb

在我们开始使用低级 TensorFlow API 中的张量、图和会话之前,先熟悉一下 Estimators 包提供的高级 API 是很有帮助的。在这一部分,我们构建了一个简单的 PixieApp,它接受 pandas DataFrame 作为输入,并训练一个具有类别输出的分类模型。

注意

注意:分类输出基本上有两种类型:类别型和连续型。在类别型分类模型中,输出只能从有限的预定义值列表中选择,且可能有或没有逻辑顺序。我们通常称二分类为只有两个类别的分类模型。另一方面,连续输出可以有任何数值。

用户首先需要选择一个数值列进行预测,然后使用数据框中所有其他数值列训练一个分类模型。

注意

注意:这个示例应用的一些代码改编自 github.com/tensorflow/models/tree/master/samples/core/get_started

对于这个示例,我们将使用内置的示例数据集 #7:波士顿犯罪数据,两周的样本数据,但你也可以使用任何其他数据集,只要它有足够的数据和数值列。

提醒一下,你可以使用以下代码浏览 PixieDust 内置的数据集:

import pixiedust
pixiedust.sampleData()

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_08.jpg

PixieDust 中的内置数据集列表

以下代码使用 sampleData() API 加载 波士顿犯罪 数据集:

import pixiedust
crimes = pixiedust.sampleData(7, forcePandas=True)

和往常一样,我们首先通过 display() 命令探索数据。这里的目标是寻找一个合适的列进行预测:

display(crimes)

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_09.jpg

犯罪数据集的表格视图

看起来 nonviolent 是一个适合二分类的良好候选项。现在让我们展示一个条形图,以确保该列的数据分布良好:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_10.jpg

在选项对话框中选择非暴力列

点击 OK 会生成以下图表:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_11.jpg

非暴力犯罪分布

不幸的是,数据倾向于非暴力犯罪,但我们有接近 2,000 个暴力犯罪的数据点,对于这个示例应用程序来说,应该足够了。

我们现在准备创建 do_training 方法,使用 tf.estimator.DNNClassifier 创建一个分类模型。

注意

注意:你可以在这里找到更多关于 DNNClassifier 和其他高级 TensorFlow 估算器的信息:

www.tensorflow.org/api_docs/python/tf/estimator

DNNClassifier 构造函数有很多可选参数。在我们的示例应用中,我们只会使用其中三个,但我鼓励你查看文档中的其他参数:

  • feature_columnsfeature_column._FeatureColumn模型输入的可迭代对象。在我们的例子中,我们可以使用 Python 推导式仅通过 pandas DataFrame 的数值列创建一个数组。

  • hidden_units:每个单元隐藏层数的可迭代对象。在这里,我们只使用两个层,每个层有 10 个节点。

  • n_classes:标签类别的数量。我们将通过对预测列进行分组并计算行数来推断此数字。

这是do_training方法的代码:

def do_training(train, train_labels, test, test_labels, num_classes):
    #set TensorFlow logging level to INFO
    tf.logging.set_verbosity(tf.logging.INFO)

    # Build 2 hidden layer DNN with 10, 10 units respectively.
    classifier = tf.estimator.DNNClassifier(
        # Compute feature_columns from dataframe keys using a list comprehension
        feature_columns =
            [tf.feature_column.numeric_column(key=key) for key in train.keys()],
        hidden_units=[10, 10],
        n_classes=num_classes)

    # Train the Model
    classifier.train(
        input_fn=lambda:train_input_fn(train, train_labels,100),
        steps=1000
    )

    # Evaluate the model
    eval_result = classifier.evaluate(
        input_fn=lambda:eval_input_fn(test, test_labels,100)
    )

    return (classifier, eval_result)

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode2.py

classifier.train方法使用一个train_input_fn方法,负责以小批量的形式提供训练输入数据(即真实标签),返回一个tf.data.Dataset(features, labels)元组。我们的代码还通过classifier.evaluate进行模型评估,通过对测试数据集进行评分并将结果与给定标签进行比较来验证准确性。结果随后作为函数输出的一部分返回。

此方法需要一个与train_input_fn类似的eval_input_fn方法,唯一的区别是在评估过程中我们不使数据集可重复。由于这两个方法共享大部分相同的代码,我们使用一个名为input_fn的辅助方法,该方法由两个方法调用,并带有适当的标志:

def input_fn(features, labels, batch_size, train):
    # Convert the inputs to a Dataset and shuffle.
    dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels)).shuffle(1000)
    if train:
        #repeat only for training
 dataset = dataset.repeat()
    # Return the dataset in batch
    return dataset.batch(batch_size)

def train_input_fn(features, labels, batch_size):
    return input_fn(features, labels, batch_size, train=True)

def eval_input_fn(features, labels, batch_size):
    return input_fn(features, labels, batch_size, train=False)

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode3.py

下一步是构建 PixieApp,它将从传递给run方法的 pandas DataFrame 创建分类器。主屏幕构建了所有数值列的下拉列表,并要求用户选择一个将用作分类器输出的列。这是通过以下代码完成的,使用 Jinja2 {%for ...%} 循环遍历作为输入传递的 DataFrame,DataFrame 通过pixieapp_entity变量引用。

注意

注意:以下代码使用[[SimpleClassificationDNN]]符号表示它是指定类的不完整代码。请勿尝试运行此代码,直到提供完整实现为止。

[[SimpleClassificationDNN]]
from pixiedust.display.app import *
@PixieApp
class SimpleClassificationDNN():
    @route()
    def main_screen(self):
        return """
<h1 style="margin:40px">
    <center>The classificiation model will be trained on all the numeric columns of the dataset</center>
</h1>
<style>
    div.outer-wrapper {
        display: table;width:100%;height:300px;
    }
    div.inner-wrapper {
        display: table-cell;vertical-align: middle;height: 100%;width: 100%;
    }
</style>
<div class="outer-wrapper">
    <div class="inner-wrapper">
        <div class="col-sm-3"></div>
        <div class="input-group col-sm-6">
          <select id="cols{{prefix}}" style="width:100%;height:30px" pd_options="predictor=$val(cols{{prefix}})">
              <option value="0">Select a predictor column</option>
              {%for col in this.pixieapp_entity.columns.values.tolist()%}
 <option value="{{col}}">{{col}}</option>
 {%endfor%}
          </select>
        </div>
    </div>
</div>     
        """

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode4.py

使用crimes数据集,我们通过以下代码运行 PixieApp:

app = SimpleClassificationDNN()
app.run(crimes)

注意

注意:此时 PixieApp 代码尚不完整,但我们仍然可以看到欢迎页面的结果,如下图所示:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_12.jpg

显示输入 pandas DataFrame 列表的主屏幕

当用户选择预测列(例如 nonviolent)时,通过属性 pd_options="predictor=$val(cols{{prefix}})" 会触发一个新的 prepare_training 路由。该路由将显示两个条形图,分别显示训练集和测试集的输出类别分布,这些数据是通过从原始数据集中以 80/20 的比例随机选取得到的。

注意

注意:我们在训练集和测试集之间使用 80/20 的分割比例,从我的经验来看,这种做法很常见。当然,这不是绝对规则,根据具体情况可以进行调整。

屏幕片段还包括一个按钮,用于启动训练分类器。

prepare_training 路由的代码如下所示:

[[SimpleClassificationDNN]]
@route(predictor="*")
@templateArgs
def prepare_training(self, predictor):
        #select only numerical columns
        self.dataset = self.pixieapp_entity.dropna(axis=1).select_dtypes(
            include=['int16', 'int32', 'int64', 'float16', 'float32', 'float64']
        )
        #Compute the number of classed by counting the groups
        self.num_classes = self.dataset.groupby(predictor).size().shape[0]
        #Create the train and test feature and labels
        self.train_x=self.dataset.sample(frac=0.8)
        self.full_train = self.train_x.copy()
        self.train_y = self.train_x.pop(predictor)
        self.test_x=self.dataset.drop(self.train_x.index)
        self.full_test = self.test_x.copy()
        self.test_y=self.test_x.pop(predictor)

        bar_chart_options = {
          "rowCount": "100",
          "keyFields": predictor,
          "handlerId": "barChart",
          "noChartCache": "true"
        }

        return """
<div class="container" style="margin-top:20px">
    <div class="row">
        <div class="col-sm-5">
            <h3><center>Train set class distribution</center></h3>
            <div pd_entity="full_train" pd_render_onload>
                <pd_options>{{bar_chart_options|tojson}}</pd_options>
            </div>
        </div>
        <div class="col-sm-5">
            <h3><center>Test set class distribution</center></h3>
            <div pd_entity="full_test" pd_render_onload>
                <pd_options>{{bar_chart_options|tojson}}</pd_options>
            </div>
        </div>
    </div>
</div>

<div style="text-align:center">
 <button class="btn btn-default" type="submit" pd_options="do_training=true">
 Start Training
 </button>
</div>
"""

注意

你可以在此找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode5.py

注意:由于我们计算了 bar_chart_options 变量一次,并且在 Jinja2 模板中使用它,所以使用了 @templateArgs

选择 nonviolent 预测列将给我们以下截图结果:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_13.jpg

预训练屏幕

开始训练 按钮通过属性 pd_options="do_training=true", 调用 do_training 路由,该路由触发我们之前创建的 do_training 方法。注意,我们使用了 @captureOutput 装饰器,因为我们将 TensorFlow 日志级别设置为 INFO,所以我们希望捕获日志消息并将其显示给用户。这些日志消息会通过 stream 模式返回到浏览器,PixieDust 会自动将它们显示为专门创建的 <div> 元素,并随着数据的到达动态追加到该元素中。当训练完成时,路由返回一个 HTML 片段,生成一个表格,显示 do_training 方法返回的评估指标,如下所示的代码:

[[SimpleClassificationDNN]]
@route(do_training="*")
   @captureOutput
def do_training_screen(self):
 self.classifier, self.eval_results = \
 do_training(
self.train_x, self.train_y, self.test_x, self.test_y, self.num_classes
 )
        return """
<h2>Training completed successfully</h2>
<table>
    <thead>
        <th>Metric</th>
        <th>Value</th>
    </thead>
    <tbody>
{%for key,value in this.eval_results.items()%}
<tr>
    <td>{{key}}</td>
    <td>{{value}}</td>
</tr>
{%endfor%}
    </tbody>
</table>
        """

注意

你可以在此找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode6.py

以下截图显示了模型成功创建后的结果,并包括分类模型的评估指标表,准确率为 87%:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_14.jpg

显示成功训练结果的最终屏幕

这个 PixieApp 使用 crimes 数据集作为参数运行,如下所示的代码所示:

app = SimpleClassificationDNN()
app.run(crimes)

一旦模型成功训练,你可以通过在 app.classifier 变量上调用 predict 方法来分类新数据。与 trainevaluate 方法类似,predict 也接受一个 input_fn,用于构造输入特征。

注意

注意:有关 predict 方法的更多细节,请参见此处:

www.tensorflow.org/api_docs/python/tf/estimator/DNNClassifier#predict

这个示例应用程序通过使用高层次的估算器 API,为熟悉 TensorFlow 框架提供了一个很好的起点。

注意

注意:此示例应用程序的完整笔记本可以在这里找到:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/TensorFlow%20classification.ipynb

在下一部分,我们将开始使用低级 TensorFlow API(包括张量、图和会话)构建我们的图像识别示例应用程序。

图像识别示例应用程序

当谈到构建一个开放式应用程序时,你应该从定义MVP(即最小可行产品)版本的需求开始,该版本仅包含足够的功能,足以使其对用户有用且有价值。在做技术决策时,确保你能够尽快获得一个完整的端到端实现,而不会投入过多时间,这是一个非常重要的标准。其核心思想是,你需要从小做起,这样你可以快速迭代并改进应用程序。

对于我们图像识别示例应用程序的 MVP,我们将使用以下要求:

  • 不要从头开始构建模型;而是重用公开可用的预训练通用卷积神经网络CNNen.wikipedia.org/wiki/Convolutional_neural_network)模型,如 MobileNet。我们可以稍后使用迁移学习(en.wikipedia.org/wiki/Transfer_learning)用自定义训练图像重新训练这些模型。

  • 对于 MVP,我们虽然只关注评分而不涉及训练,但仍应确保应用程序对用户有吸引力。所以让我们构建一个 PixieApp,允许用户输入网页的 URL,并显示从页面中抓取的所有图片,包括我们的模型推断的分类输出。

  • 由于我们正在学习深度学习神经网络和 TensorFlow,如果我们能够在 Jupyter Notebook 中直接显示 TensorBoard 图形可视化(www.tensorflow.org/programmers_guide/graph_viz),而不强迫用户使用其他工具,那将会非常棒。这将提供更好的用户体验,并增强用户与应用程序的互动。

注意

注意:本节中的应用程序实现是根据以下教程改编的:

codelabs.developers.google.com/codelabs/tensorflow-for-poets

第一部分 – 加载预训练的 MobileNet 模型

注意

注意:你可以下载完成的 Notebook 来跟进本节讨论:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/Tensorflow%20VR%20Part%201.ipynb

有很多公开可用的图像分类模型,使用 CNNs,且在如 ImageNet 等大型图像数据库上进行了预训练。ImageNet 发起了多个公开挑战,如ImageNet 大规模视觉识别挑战赛ILSVRC)或 Kaggle 上的ImageNet 物体定位挑战www.kaggle.com/c/imagenet-object-localization-challenge),并取得了非常有趣的结果。

这些挑战催生了多个模型,如 ResNet、Inception、SqueezeNet、VGGNet 或 Xception,每个模型都使用不同的神经网络架构。详细讲解每个架构超出了本书的范围,但即使你还不是机器学习专家(我也绝对不是),我也鼓励你在网上阅读相关内容。为了这个示例应用,我选择了 MobileNet 模型,因为它小巧、快速且非常准确。它提供了一个包含 1,000 个类别的图像分类模型,足以满足此示例应用的需求。

为了确保代码的稳定性,我已在 GitHub 仓库中创建了模型的副本:github.com/DTAIEB/Thoughtful-Data-Science/tree/master/chapter%206/Visual%20Recognition/mobilenet_v1_0.50_224

在这个目录中,你可以找到以下文件:

  • frozen_graph.pb:TensorFlow 图的序列化二进制版本

  • labels.txt:包含 1,000 个图像类别及其索引的文本文件

  • quantized_graph.pb:采用 8 位定点表示的模型图的压缩形式

加载模型的过程包括构建一个tf.graph对象及相关标签。由于未来可能会加载多个模型,因此我们首先定义一个字典,用来提供有关模型的元数据:

models = {
    "mobilenet": {
        "base_url":"https://github.com/DTAIEB/Thoughtful-Data-Science/raw/master/chapter%206/Visual%20Recognition/mobilenet_v1_0.50_224",
        "model_file_url": "frozen_graph.pb",
        "label_file": "labels.txt",
        "output_layer": "MobilenetV1/Predictions/Softmax"
    }
}

注意

你可以在这里找到文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode7.py

在前面的models字典中,每个键代表特定模型的元数据:

  • base_url:指向文件存储的 URL

  • model_file_url:假定相对于base_url的模型文件名称

  • label_file:假定相对于base_url的标签文件名称

  • output_layer:提供每个类别最终得分的输出层名称

我们实现了一个get_model_attribute辅助方法,以便从model元数据中读取内容,这在我们整个应用程序中都非常有用:

# helper method for reading attributes from the model metadata
def get_model_attribute(model, key, default_value = None):
    if key not in model:
        if default_value is None:
            raise Exception("Require model attribute {} not found".format(key))
        return default_value
    return model[key]

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode8.py

为了加载图形,我们下载二进制文件,使用ParseFromString方法将其加载到tf.GraphDef对象中,然后我们调用tf.import_graph_def方法,将图形作为当前内容管理器:

import tensorflow as tf
import requests
# Helper method for resolving url relative to the selected model
def get_url(model, path):
    return model["base_url"] + "/" + path

# Download the serialized model and create a TensorFlow graph
def load_graph(model):
    graph = tf.Graph()
    graph_def = tf.GraphDef()
    graph_def.ParseFromString(
        requests.get( get_url( model, model["model_file_url"] ) ).content
    )
    with graph.as_default():
        tf.import_graph_def(graph_def)
    return graph

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode9.py

加载标签的方法返回一个 JSON 对象或一个数组(稍后我们会看到这两者都需要)。以下代码使用 Python 列表推导式迭代requests.get调用返回的行。然后,它使用as_json标志将数据格式化为适当的形式:

# Load the labels
def load_labels(model, as_json = False):
    labels = [line.rstrip() \
      for line in requests.get(get_url(model, model["label_file"]) ).text.split("\n") if line != ""]
    if as_json:
        return [{"index": item.split(":")[0],"label":item.split(":")[1]} for item in labels]
    return labels

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode10.py

下一步是调用模型来分类图像。为了简化操作并可能提高其价值,我们要求用户提供一个包含待分类图像的 HTML 页面的 URL。我们将使用 BeautifulSoup4 库来帮助解析页面。要安装 BeautifulSoup4,只需运行以下命令:

!pip install beautifulsoup4

注意

注意:像往常一样,安装完成后不要忘记重启内核。

以下get_image_urls方法接受一个 URL 作为输入,下载 HTML,实例化一个 BeautifulSoup 解析器,并提取所有<img>元素和background-image样式中找到的图像。BeautifulSoup 提供了一个非常优雅且易于使用的 API 来解析 HTML。在这里,我们只使用find_all方法来查找所有的<img>元素,并使用select方法选择所有具有内联样式的元素。读者很快会注意到,我们没有探索通过 HTML 创建图像的其他方式,例如,作为 CSS 类声明的图像。像往常一样,如果你有兴趣和时间改进它,我非常欢迎你在 GitHub 仓库中提交拉取请求(关于如何创建拉取请求,请参阅此处:help.github.com/articles/creating-a-pull-request)。

get_image_urls的代码如下:

from bs4 import BeautifulSoup as BS
import re

# return an array of all the images scraped from an html page
def get_image_urls(url):
    # Instantiate a BeautifulSoup parser
    soup = BS(requests.get(url).text, "html.parser")

    # Local helper method for extracting url
    def extract_url(val):
        m = re.match(r"url\((.*)\)", val)
        val = m.group(1) if m is not None else val
        return "http:" + val if val.startswith("//") else val

    # List comprehension that look for <img> elements and backgroud-image styles
    return [extract_url(imgtag['src']) for imgtag in soup.find_all('img')] + [ \
        extract_url(val.strip()) for key,val in \
        [tuple(selector.split(":")) for elt in soup.select("[style]") \
            for selector in elt["style"].strip(" ;").split(";")] \
            if key.strip().lower()=='background-image' \
        ]

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode11.py

对于发现的每一张图片,我们还需要一个辅助函数来下载这些图片,这些图片将作为输入传递给模型进行分类。

以下download_image方法将图片下载到临时文件:

import tempfile
def download_image(url):
   response = requests.get(url, stream=True)
   if response.status_code == 200:
      with tempfile.NamedTemporaryFile(delete=False) as f:
 for chunk in response.iter_content(2048):
 f.write(chunk)
         return f.name
   else:
      raise Exception("Unable to download image: {}".format(response.status_code))

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode12.py

给定一张本地路径的图片,我们现在需要通过调用tf.image包中的正确解码方法将其解码为张量,也就是.png文件需要使用decode_png方法。

注意

注意:在数学中,张量是向量的一个推广,向量由方向和大小定义,张量则支持更高的维度。向量是 1 阶张量,同样,标量是 0 阶张量。直观地讲,我们可以把 2 阶张量看作一个二维数组,其中的值是通过乘以两个向量得到的结果。在 TensorFlow 中,张量是 n 维数组。

在对图片读取器张量进行一些转换(转换为正确的十进制表示、调整大小和归一化)之后,我们在归一化器张量上调用tf.Session.run以执行之前定义的步骤,如以下代码所示:

# decode a given image into a tensor
def read_tensor_from_image_file(model, file_name):
    file_reader = tf.read_file(file_name, "file_reader")
    if file_name.endswith(".png"):
        image_reader = tf.image.decode_png(file_reader, channels = 3,name='png_reader')
    elif file_name.endswith(".gif"):
        image_reader = tf.squeeze(tf.image.decode_gif(file_reader,name='gif_reader'))
    elif file_name.endswith(".bmp"):
        image_reader = tf.image.decode_bmp(file_reader, name='bmp_reader')
    else:
        image_reader = tf.image.decode_jpeg(file_reader, channels = 3, name='jpeg_reader')
    float_caster = tf.cast(image_reader, tf.float32)
    dims_expander = tf.expand_dims(float_caster, 0);

    # Read some info from the model metadata, providing default values
    input_height = get_model_attribute(model, "input_height", 224)
    input_width = get_model_attribute(model, "input_width", 224)
    input_mean = get_model_attribute(model, "input_mean", 0)
    input_std = get_model_attribute(model, "input_std", 255)

    resized = tf.image.resize_bilinear(dims_expander, [input_height, input_width])
    normalized = tf.divide(tf.subtract(resized, [input_mean]), [input_std])
    sess = tf.Session()
    result = sess.run(normalized)
    return result

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode13.py

在所有部分准备好后,我们现在可以实现score_image方法,该方法接受tf.graph、模型元数据和图片的 URL 作为输入参数,并根据置信度分数返回前五个候选分类,包括它们的标签:

import numpy as np

# classify an image given its url
def score_image(graph, model, url):
    # Get the input and output layer from the model
    input_layer = get_model_attribute(model, "input_layer", "input")
    output_layer = get_model_attribute(model, "output_layer")

    # Download the image and build a tensor from its data
    t = read_tensor_from_image_file(model, download_image(url))

    # Retrieve the tensors corresponding to the input and output layers
    input_tensor = graph.get_tensor_by_name("import/" + input_layer + ":0");
    output_tensor = graph.get_tensor_by_name("import/" + output_layer + ":0");

    with tf.Session(graph=graph) as sess:
        results = sess.run(output_tensor, {input_tensor: t})
    results = np.squeeze(results)
    # select the top 5 candidate and match them to the labels
    top_k = results.argsort()[-5:][::-1]
 labels = load_labels(model)
 return [(labels[i].split(":")[1], results[i]) for i in top_k]

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode14.py

我们现在可以使用以下步骤来测试代码:

  1. 选择mobilenet模型并加载对应的图

  2. 获取从 Flickr 网站抓取的图片 URL 列表

  3. 对每个图片 URL 调用score_image方法并打印结果

代码如下所示:

model = models['mobilenet']
graph = load_graph(model)
image_urls = get_image_urls("https://www.flickr.com/search/?text=cats")
for url in image_urls:
    results = score_image(graph, model, url)
    print("Result for {}: \n\t{}".format(url, results))

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode15.py

结果非常准确(除了第一张是空白图片),如以下截图所示:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_15.jpg

对与猫相关的 Flickr 页面上发现的图片进行分类

我们的图像识别示例应用程序的第一部分现已完成;您可以在以下位置找到完整的 Notebook:github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/Tensorflow%20VR%20Part%201.ipynb

在接下来的部分中,我们将通过构建 PixieApp 的用户界面来构建一个更加用户友好的体验。

第二部分 – 创建一个 PixieApp 用于我们的图像识别示例应用程序

注意

注意:您可以在此下载完成的 Notebook,以便跟随本部分的讨论:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/Tensorflow%20VR%20Part%202.ipynb

提醒一下,PixieApp 的setup方法(如果定义的话)会在应用程序开始运行之前执行。我们用它来选择模型并初始化图形:

from pixiedust.display.app import *

@PixieApp
class ScoreImageApp():
    def setup(self):
        self.model = models["mobilenet"]
        self.graph = load_graph( self.model )
    ...

注意

您可以在此找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode16.py

在 PixieApp 的主屏幕上,我们使用一个输入框让用户输入网页的 URL,如下所示的代码片段所示:

[[ScoreImageApp]]
@route()
def main_screen(self):
   return """
<style>
    div.outer-wrapper {
        display: table;width:100%;height:300px;
    }
    div.inner-wrapper {
        display: table-cell;vertical-align: middle;height: 100%;width: 100%;
    }
</style>
<div class="outer-wrapper">
    <div class="inner-wrapper">
        <div class="col-sm-3"></div>
        <div class="input-group col-sm-6">
          <input id="url{{prefix}}" type="text" class="form-control"
              value="https://www.flickr.com/search/?text=cats"
              placeholder="Enter a url that contains images">
          <span class="input-group-btn">
            <button class="btn btn-default" type="button" pd_options="image_url=$val(url{{prefix}})">Go</button>
          </span>
        </div>
    </div>
</div>
"""

注意

您可以在此找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode17.py

为了方便起见,我们将输入文本初始化为默认值https://www.flickr.com/search/?text=cats

我们现在可以使用以下代码来运行并测试主屏幕:

app = ScoreImageApp()
app.run()

主屏幕看起来是这样的:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_16.jpg

图像识别 PixieApp 的主屏幕

注意

注意:这对于测试是好的,但我们应该记住,do_process_url路由尚未实现,因此,点击Go按钮将会回退到默认路由。

现在让我们实现do_process_url路由,它会在用户点击Go按钮时触发。该路由首先调用get_image_urls方法获取图像 URL 列表。然后,我们使用 Jinja2 构建一个 HTML 片段,显示所有图像。对于每个图像,我们异步调用do_score_url路由,运行模型并显示结果。

以下代码展示了do_process_url路由的实现:

[[ScoreImageApp]]
@route(image_url="*")
@templateArgs
def do_process_url(self, image_url):
    image_urls = get_image_urls(image_url)
    return """
<div>
{%for url in image_urls%}
<div style="float: left; font-size: 9pt; text-align: center; width: 30%; margin-right: 1%; margin-bottom: 0.5em;">
<img src="img/{{url}}" style="width: 100%">
  <div style="display:inline-block" pd_render_onload pd_options="score_url={{url}}">
  </div>
</div>
{%endfor%}
<p style="clear: both;">
</div>
        """

注意

您可以在此找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode18.py

注意@templateArgs装饰器的使用,它允许 Jinja2 片段引用本地的image_urls变量。

最后,在 do_score_url 路由中,我们调用 score_image 并将结果以列表形式显示:

[[ScoreImageApp]]
@route(score_url="*")
@templateArgs
def do_score_url(self, score_url):
    results = score_image(self.graph, self.model, score_url)
    return """
<ul style="text-align:left">
{%for label, confidence in results%}
<li><b>{{label}}</b>: {{confidence}}</li>
{%endfor%}
</ul>
"""

注意

您可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode19.py

以下截图展示了包含猫咪图像的 Flickr 页面结果:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_17.jpg

猫咪的图像分类结果

注意

提醒您,您可以在此位置找到完整的 Notebook:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/Tensorflow%20VR%20Part%202.ipynb

我们的 MVP 应用程序几乎完成。在下一节中,我们将直接在 Notebook 中集成 TensorBoard 图形可视化。

第三部分 – 集成 TensorBoard 图形可视化

注意

注意:本节中描述的部分代码改编自位于此处的 deepdream notebook:

github.com/tensorflow/tensorflow/blob/master/tensorflow/examples/tutorials/deepdream/deepdream.ipynb

您可以在这里下载完整的 Notebook 来跟随本节内容:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/Tensorflow%20VR%20Part%203.ipynb

TensorFlow 提供了一套非常强大的可视化工具,帮助调试和优化应用程序性能。请花点时间在这里探索 TensorBoard 的功能:www.tensorflow.org/programmers_guide/summaries_and_tensorboard

这里的一个问题是,将 TensorBoard 服务器配置为与您的 Notebook 一起使用可能会很困难,特别是当您的 Notebooks 托管在云端时,且您几乎无法访问底层操作系统。在这种情况下,配置和启动 TensorBoard 服务器可能会变得几乎不可能。在本节中,我们展示了如何通过将模型图形可视化直接集成到 Notebook 中来解决这个问题,无需任何配置。为了提供更好的用户体验,我们希望将 TensorBoard 可视化功能添加到我们的 PixieApp 中。我们通过将主布局更改为选项卡布局,并将 TensorBoard 可视化分配到单独的选项卡中来实现这一点。方便的是,PixieDust 提供了一个名为 TemplateTabbedApp 的基础 PixieApp,它负责构建选项卡布局。当使用 TemplateTabbedApp 作为基类时,我们需要在 setup 方法中配置选项卡,如下所示:

[[ImageRecoApp]]
from pixiedust.apps.template import TemplateTabbedApp
@PixieApp
class ImageRecoApp(TemplateTabbedApp):
    def setup(self):
        self.apps = [
            {"title": "Score", "app_class": "ScoreImageApp"},
            {"title": "Model", "app_class": "TensorGraphApp"},
            {"title": "Labels", "app_class": "LabelsApp"}
        ]
        self.model = models["mobilenet"]
        self.graph = self.load_graph(self.model)

app = ImageRecoApp()
app.run()

注意

您可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode20.py

需要注意的是,在前面的代码中,我们已经将LabelsApp子 PixieApp 添加到了标签页列表中,尽管它尚未实现。因此,正如预期的那样,如果直接运行这段代码,Labels标签将会失败。

self.apps包含一个对象数组,用于定义标签页:

  • title:标签页标题

  • app_class: 选中标签时运行的 PixieApp

ImageRecoApp中,我们配置了三个与三个子 PixieApps 相关联的标签页:我们在第二部分 – 为图像识别示例应用创建 PixieApp中已经创建的ScoreImageApp,用于显示模型图的TensorGraphApp,以及用于显示模型中所有标注类别的表格的LabelsApp

结果显示在以下截图中:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_18.jpg

包含 Score、Model 和 Labels 的标签布局

使用TemplateTabbedApp超类的另一个优点是,子 PixieApps 是分开定义的,这使得代码更易于维护和重用。

首先来看一下TensorGraphApp PixieApp。它的主路由返回一个 HTML 片段,该片段从https://tensorboard.appspot.com的 Iframe 加载tf-graph-basic.build.html,并使用 JavaScript 加载监听器应用通过tf.Graph.as_graph_def方法计算得到的序列化图定义。为了确保图定义保持在合理的大小,并避免在浏览器客户端上不必要的性能下降,我们调用strip_consts方法删除具有大尺寸常量值的张量。

TensorGraphApp的代码如下所示:

@PixieApp
class TensorGraphApp():
    """Visualize TensorFlow graph."""
    def setup(self):
        self.graph = self.parent_pixieapp.graph

    @route()
    @templateArgs
    def main_screen(self):
        strip_def = self.strip_consts(self.graph.as_graph_def())
        code = """
            <script>
              function load() {{
                document.getElementById("{id}").pbtxt = {data};
              }}
            </script>
            <link rel="import" href="https://tensorboard.appspot.com/tf-graph-basic.build.html" onload=load()>
            <div style="height:600px">
              <tf-graph-basic id="{id}"></tf-graph-basic>
            </div>
        """.format(data=repr(str(strip_def)), id='graph'+ self.getPrefix()).replace('"', '&quot;')

        return """
<iframe seamless style="width:1200px;height:620px;border:0" srcdoc="{{code}}"></iframe>
"""

    def strip_consts(self, graph_def, max_const_size=32):
        """Strip large constant values from graph_def."""
        strip_def = tf.GraphDef()
        for n0 in graph_def.node:
            n = strip_def.node.add() 
            n.MergeFrom(n0)
            if n.op == 'Const':
                tensor = n.attr['value'].tensor
                size = len(tensor.tensor_content)
                if size > max_const_size:
                    tensor.tensor_content = "<stripped {} bytes>".format(size).encode("UTF-8")
        return strip_def

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode21.py

注意:子 PixieApps 可以通过self.parent_pixieapp变量访问其父 PixieApp。

TensorGraphApp子 PixieApp 的结果屏幕如以下截图所示。它提供了选定模型的 TensorFlow 图的交互式可视化,允许用户浏览不同的节点,并深入探索模型。然而,重要的是要注意,整个可视化是在浏览器内运行的,而没有使用 TensorBoard 服务器。因此,TensorBoard 中的一些功能,如运行时统计信息,是禁用的。

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_19.jpg

显示 MobileNet V1 的模型图

LabelsApp PixieApp 中,我们只是将标签作为 JSON 格式加载,并使用handlerId=tableView选项在 PixieDust 表格中显示它。

[[LabelsApp]]
@PixieApp
class LabelsApp():
    def setup(self):
        self.labels = self.parent_pixieapp.load_labels(
            self.parent_pixieapp.model, as_json=True
        )

    @route()
    def main_screen(self):
        return """
<div pd_render_onload pd_entity="labels">
    <pd_options>
    {
        "table_noschema": "true",
 "handlerId": "tableView",
        "rowCount": "10000"
    }
    </pd_options>
</div>
        """

注意

您可以在此找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode22.py

注意:我们通过将table_noschema设置为true来配置表格,以避免显示模式架构,但为了方便起见,我们保留了搜索栏。

结果如下截图所示:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_20.jpg

可搜索的模型类别表格

我们的 MVP 图像识别示例应用程序现在已经完成;您可以在此找到完整的 Notebook:github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/Tensorflow%20VR%20Part%203.ipynb

在下一节中,我们将通过允许用户使用自定义图像重新训练模型来改进应用程序。

第四部分 – 使用自定义训练数据重新训练模型

注意

注意:您可以在此下载完整的 Notebook 以便跟随本节的讨论:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/Tensorflow%20VR%20Part%204.ipynb

本节中的代码相当广泛,部分与主题无关的辅助函数将被省略。然而,和往常一样,更多关于代码的信息请参阅 GitHub 上的完整 Notebook。

在本节中,我们将使用自定义训练数据重新训练 MobileNet 模型,并用它来分类那些在通用模型中得分较低的图像。

注意

注意:本节中的代码改编自TensorFlow for poets教程:

github.com/googlecodelabs/tensorflow-for-poets-2/blob/master/scripts/retrain.py

正如大多数时候一样,获取高质量的训练数据可能是最具挑战性且耗时的任务。在我们的示例中,我们需要为每个要训练的类别获取大量图像。为了简便和可复现性,我们使用了 ImageNet 数据库,该数据库方便地提供了获取 URL 和相关标签的 API。我们还将下载的文件限制为.jpg格式。当然,如果需要,您也可以自行获取训练数据。

我们首先从 2011 年秋季发布的版本下载所有图片 URL 的列表,链接在这里:image-net.org/imagenet_data/urls/imagenet_fall11_urls.tgz,并将文件解压到你选择的本地目录(例如,我选择了/Users/dtaieb/Downloads/fall11_urls.txt)。我们还需要下载所有synsets的 WordNet ID 与单词的映射文件,链接在这里:image-net.org/archive/words.txt,这个文件将帮助我们找到包含我们需要下载的 URL 的 WordNet ID。

以下代码将分别加载两个文件到 pandas DataFrame 中:

import pandas
wnid_to_urls = pandas.read_csv('/Users/dtaieb/Downloads/fall11_urls.txt',
                sep='\t', names=["wnid", "url"],
                header=0, error_bad_lines=False,
                warn_bad_lines=False, encoding="ISO-8859-1")
wnid_to_urls['wnid'] = wnid_to_urls['wnid'].apply(lambda x: x.split("_")[0])
wnid_to_urls = wnid_to_urls.dropna()

wnid_to_words = pandas.read_csv('/Users/dtaieb/Downloads/words.txt',
                sep='\t', names=["wnid", "description"],
                header=0, error_bad_lines=False,
                warn_bad_lines=False, encoding="ISO-8859-1")
wnid_to_words = wnid_to_words.dropna()

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode23.py

请注意,我们需要清理wnid_to_urls数据集中的wnid列,因为它包含一个后缀,表示该图片在类别中的索引。

然后我们可以定义一个方法get_url_for_keywords,它返回一个字典,字典的键是类别,值是包含 URL 的数组:

def get_url_for_keywords(keywords):
    results = {}
    for keyword in keywords:
        df = wnid_to_words.loc[wnid_to_words['description'] == keyword]
        row_list = df['wnid'].values.tolist()
        descriptions = df['description'].values.tolist()
        if len(row_list) > 0:
            results[descriptions[0]] = \
            wnid_to_urls.loc[wnid_to_urls['wnid'] == \
            row_list[0]]["url"].values.tolist()
    return results

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode24.py

我们可以使用 PixieDust 的display轻松查看数据分布。和往常一样,随时可以自己进行更多探索:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_21.jpg

按类别分布的图片

现在我们可以构建代码来下载与我们选择的类别列表对应的图片。在我们的例子中,我们选择了水果:["apple", "orange", "pear", "banana"]。这些图片将下载到 PixieDust 主目录的子目录中(使用 PixieDust 的Environment助手类,来自pixiedust.utils包),并限制下载图片的数量为500,以提高速度:

注意

注意:以下代码使用了 Notebook 中先前定义的方法和导入内容。在尝试运行以下代码之前,请确保先运行相应的单元格。

from pixiedust.utils.environment import Environment
root_dir = ensure_dir_exists(os.path.join(Environment.pixiedustHome, "imageRecoApp")
image_dir = root_dir
image_dict = get_url_for_keywords(["apple", "orange", "pear", "banana"])
with open(os.path.join(image_dir, "retrained_label.txt"), "w") as f_label:
    for key in image_dict:
        f_label.write(key + "\n")
        path = ensure_dir_exists(os.path.join(image_dir, key))
        count = 0
        for url in image_dict[key]:
            download_image_into_dir(url, path)
            count += 1
            if count > 500:
                break;

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode25.py

代码的下一部分处理训练集中的每张图片,使用以下步骤:

注意

注意:如前所述,代码比较长,并且部分代码被省略,这里仅解释了重要部分。请不要直接运行以下代码,完整实现请参阅完整的 Notebook。

  1. 使用以下代码解码.jpeg文件:

    def add_jpeg_decoding(model):
        input_height = get_model_attribute(model,
                       "input_height")
        input_width = get_model_attribute(model, "input_width")
        input_depth = get_model_attribute(model, "input_depth")
        input_mean = get_model_attribute(model, "input_mean",
                     0)
        input_std = get_model_attribute(model, "input_std",
                    255)
    
        jpeg_data = tf.placeholder(tf.string,
                    name='DecodeJPGInput')
        decoded_image = tf.image.decode_jpeg(jpeg_data,
                        channels=input_depth)
        decoded_image_as_float = tf.cast(decoded_image,
                                 dtype=tf.float32)
        decoded_image_4d =  tf.expand_dims(
                           decoded_image_as_float,
                           0)
        resize_shape = tf.stack([input_height, input_width])
        resize_shape_as_int = tf.cast(resize_shape,
                              dtype=tf.int32)
        resized_image = tf.image.resize_bilinear(
                        decoded_image_4d,
                        resize_shape_as_int)
        offset_image = tf.subtract(resized_image, input_mean)
        mul_image = tf.multiply(offset_image, 1.0 / input_std)
        return jpeg_data, mul_image
    

    注意

    你可以在这里找到代码文件:

    github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode26.py

  2. 创建瓶颈值(根据需要进行缓存),通过调整图像大小和缩放来标准化图像。这是在以下代码中完成的:

    def run_bottleneck_on_image(sess, image_data,
        image_data_tensor,decoded_image_tensor,
        resized_input_tensor,bottleneck_tensor):
        # First decode the JPEG image, resize it, and rescale the pixel values.
        resized_input_values = sess.run(decoded_image_tensor,
            {image_data_tensor: image_data})
        # Then run it through the recognition network.
        bottleneck_values = sess.run(
            bottleneck_tensor,
            {resized_input_tensor: resized_input_values})
        bottleneck_values = np.squeeze(bottleneck_values)
        return bottleneck_values
    

    注意

    你可以在这里找到代码文件:

    github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode27.py

  3. 使用add_final_training_ops方法添加最终训练操作,放在一个公共命名空间下,方便在可视化图时进行操作。训练步骤如下:

    1. 使用tf.truncated_normal API 生成随机权重:

            initial_value = tf.truncated_normal(
                [bottleneck_tensor_size, class_count],
                stddev=0.001)
                layer_weights = tf.Variable(
                    initial_value, name='final_weights')
      
    2. 添加偏置,初始化为零:

            layer_biases = tf.Variable(tf.zeros([class_count]),
                name='final_biases')
      
    3. 计算加权和:

            logits = tf.matmul(bottleneck_input, layer_weights) +
                layer_biases
      
    4. 添加cross_entropy成本函数:

            cross_entropy =
                tf.nn.softmax_cross_entropy_with_logits(
                labels=ground_truth_input, logits=logits)
            with tf.name_scope('total'):
                cross_entropy_mean = tf.reduce_mean(
                cross_entropy)
      
    5. 最小化成本函数:

            optimizer = tf.train.GradientDescentOptimizer(
                learning_rate)
            train_step = optimizer.minimize(cross_entropy_mean)
      

为了可视化重新训练后的图,我们首先需要更新TensorGraphApp PixieApp,让用户选择可视化的模型:通用的 MobileNet 还是自定义模型。通过在主路由中添加<select>下拉菜单并附加pd_script元素来更新状态:

[[TensorGraphApp]]
return """
{%if this.custom_graph%}
<div style="margin-top:10px" pd_refresh>
    <pd_script>
self.graph = self.custom_graph if self.graph is not self.custom_graph else self.parent_pixieapp.graph
    </pd_script>
    <span style="font-weight:bold">Select a model to display:</span>
    <select>
 <option {%if this.graph!=this.custom_graph%}selected{%endif%} value="main">MobileNet</option>
 <option {%if this.graph==this.custom_graph%}selected{%endif%} value="custom">Custom</options>
    </select>
{%endif%}
<iframe seamless style="width:1200px;height:620px;border:0" srcdoc="{{code}}"></iframe>
"""

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode28.py

重新运行我们的ImageReco PixieApp 生成以下截图:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_22.jpg

可视化重新训练后的图

点击火车节点将显示运行反向传播算法的嵌套操作,以最小化前面add_final_training_ops中指定的cross_entropy_mean成本函数:

with tf.name_scope('cross_entropy'):
    cross_entropy = tf.nn.softmax_cross_entropy_with_logits(
        labels=ground_truth_input, logits=logits)
    with tf.name_scope('total'):
        cross_entropy_mean = tf.reduce_mean(cross_entropy)

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode29.py

以下截图展示了train命名空间的详细信息:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_23.jpg

训练过程中的反向传播

类似地,我们可以在LabelsApp中添加下拉切换,以切换通用 MobileNet 和自定义模型之间的可视化:

[[LabelsApp]]
@PixieApp
class LabelsApp():
    def setup(self):
        ...

    @route()
    def main_screen(self):
        return """
{%if this.custom_labels%}
<div style="margin-top:10px" pd_refresh>
    <pd_script>
self.current_labels = self.custom_labels if self.current_labels is not self.custom_labels else self.labels
    </pd_script>
    <span style="font-weight:bold">
        Select a model to display:</span>
    <select>
        <option {%if this.current_labels!=this.labels%}selected{%endif%} value="main">MobileNet</option>
        <option {%if this.current_labels==this.custom_labels%}selected{%endif%} value="custom">Custom</options>
    </select>
{%endif%}
<div pd_render_onload pd_entity="current_labels">
    <pd_options>
    {
        "table_noschema": "true",
        "handlerId": "tableView",
        "rowCount": "10000",
        "noChartCache": "true"

    }
    </pd_options>
</div>
        """

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode30.py

结果显示在以下截图中:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_24.jpg

显示每个模型的标签信息

第四部分 MVP 的最后一步是更新 score_image 方法,使其同时使用两个模型对图像进行分类,并将结果以字典形式存储,其中每个模型有一个条目。我们定义了一个本地方法 do_score_image,该方法返回前 5 个候选答案。

该方法会为每个模型调用,并将结果填充到一个字典中,字典的键是模型名称:

# classify an image given its url
def score_image(graph, model, url):
    # Download the image and build a tensor from its data
    t = read_tensor_from_image_file(model, download_image(url))

    def do_score_image(graph, output_layer, labels):
        # Retrieve the tensors corresponding to the input and output layers
        input_tensor = graph.get_tensor_by_name("import/" +
            input_layer + ":0");
        output_tensor = graph.get_tensor_by_name( output_layer +
            ":0");

        with tf.Session(graph=graph) as sess:
            # Initialize the variables
            sess.run(tf.global_variables_initializer())
            results = sess.run(output_tensor, {input_tensor: t})
        results = np.squeeze(results)
        # select the top 5 candidates and match them to the labels
        top_k = results.argsort()[-5:][::-1]
        return [(labels[i].split(":")[1], results[i]) for i in top_k]

    results = {}
    input_layer = get_model_attribute(model, "input_layer",
        "input")
    labels = load_labels(model)
    results["mobilenet"] = do_score_image(graph, "import/" +
        get_model_attribute(model, "output_layer"), labels)
    if "custom_graph" in model and "custom_labels" in model:
        with open(model["custom_labels"]) as f:
            labels = [line.rstrip() for line in f.readlines() if line != ""]
            custom_labels = ["{}:{}".format(i, label) for i,label in zip(range(len(labels)), labels)]
        results["custom"] = do_score_image(model["custom_graph"],
            "final_result", custom_labels)
    return results

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode31.py

由于我们修改了 score_image 方法返回的值,我们需要调整 ScoreImageApp 中返回的 HTML 片段,以便遍历 results 字典中的所有模型条目:

@route(score_url="*")
@templateArgs
def do_score_url(self, score_url):
    scores_dict = score_image(self.graph, self.model, score_url)
    return """
{%for model, results in scores_dict.items()%}
<div style="font-weight:bold">{{model}}</div>
<ul style="text-align:left">
{%for label, confidence in results%}
<li><b>{{label}}</b>: {{confidence}}</li>
{%endfor%}
</ul>
{%endfor%}
    """

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/sampleCode32.py

在这些更改生效后,PixieApp 将会自动调用可用的自定义模型,并且如果存在自定义模型,它会显示两个模型的结果。

下图显示了与 香蕉 相关的图像的结果:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_06_25.jpg

使用通用的 MobileNet 和自定义训练模型进行评分

读者会注意到自定义模型的分数相当低。一个可能的解释是,训练数据获取是完全自动化的,并且没有人工筛选。对这个示例应用程序的一个可能改进是,将训练数据获取和再训练步骤移到一个独立的 PixieApp 标签页中。我们还应当给用户机会验证图像,并拒绝质量差的图像。让用户重新标注错误分类的图像也是一个不错的主意。

注意

第四部分的完整 Notebook 可以在这里找到:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%206/Tensorflow%20VR%20Part%204.ipynb

本节讨论了使用 TensorFlow 在 Jupyter Notebook 中构建图像识别示例应用程序的增量方法,特别关注如何通过 PixieApps 实现算法的操作化。我们首先通过 TensorFlow DNNClassifier 估计器,从 pandas DataFrame 中构建了一个简单的分类模型。接着,我们将图像识别示例应用程序的 MVP 版本分为四部分来构建:

  1. 我们加载了预训练的 MobileNet 模型

  2. 我们为我们的图像识别示例应用程序创建了一个 PixieApp

  3. 我们将 TensorBoard 图形可视化集成到 PixieApp 中

  4. 我们使用户能够使用来自 ImageNet 的自定义训练数据重新训练模型

概述

机器学习是一个庞大的领域,享有巨大的增长,无论是在研究还是开发方面。在本章中,我们只探讨了与机器学习算法相关的极小一部分前沿技术,具体来说,是使用深度学习神经网络进行图像识别。对于一些刚刚开始接触机器学习的读者,示例 PixieApp 及其关联的算法代码可能一次性难以消化。然而,底层的目标是展示如何逐步构建一个应用程序,并利用机器学习模型。我们恰好使用了一个卷积神经网络模型进行图像识别,但任何其他模型都可以使用。

希望你已经对 PixieDust 和 PixieApp 编程模型如何帮助你完成自己的项目有了一个不错的了解,我强烈建议你以这个示例应用程序为起点,使用你选择的机器学习方法来构建自己的自定义应用程序。我还推荐将你的 PixieApp 部署为一个 web 应用程序,并通过 PixieGateway 微服务进行测试,看看它是否是一个可行的解决方案。

在下一章,我们将介绍另一个与大数据和自然语言处理相关的重要行业应用案例。我们将构建一个示例应用程序,通过自然语言理解服务分析社交媒体趋势。

第七章:分析研究:Twitter 情感分析与 NLP 及大数据

“数据是新的石油。”
未知

本章我们将探讨人工智能和数据科学的两个重要领域:自然语言处理NLP)和大数据分析。为了支持示例应用程序,我们重新实现了第一章中描述的Twitter 标签情感分析项目,编程与数据科学——一种新工具集,但这次我们利用 Jupyter Notebooks 和 PixieDust 构建实时仪表盘,分析来自与特定实体(例如公司提供的某个产品)相关的推文流中的数据,提供情感信息以及从相同推文中提取的其他趋势实体的信息。在本章的结尾,读者将学习如何将基于云的 NLP 服务,如IBM Watson 自然语言理解,集成到他们的应用程序中,并使用像 Apache Spark 这样的框架在(Twitter)规模上执行数据分析。

一如既往,我们将展示如何通过实现一个作为 PixieApp 的实时仪表盘,直接在 Jupyter Notebook 中运行来使分析工作可操作化。

开始使用 Apache Spark

大数据这一术语常常给人模糊不清和不准确的感觉。什么样的数据集才算是大数据呢?是 10 GB、100 GB、1 TB 还是更多?我喜欢的一个定义是:大数据是当数据无法完全装入单个机器的内存时。多年来,数据科学家被迫对大数据集进行抽样处理,以便能够在单台机器上处理,但随着并行计算框架的出现,这些框架能够将数据分布到多台机器的集群中,使得可以在整个数据集上进行工作,当然,前提是集群有足够的机器。与此同时,云技术的进步使得可以按需提供适合数据集大小的机器集群。

目前,有多种框架(大多数通常以开源形式提供)可以提供强大且灵活的并行计算能力。最受欢迎的一些包括 Apache Hadoop (hadoop.apache.org)、Apache Spark (spark.apache.org) 和 Dask (dask.pydata.org)。对于我们的Twitter 情感分析应用程序,我们将使用 Apache Spark,它在可扩展性、可编程性和速度方面表现出色。此外,许多云服务提供商提供了某种形式的 Spark 即服务,能够在几分钟内按需创建一个合适大小的 Spark 集群。

一些 Spark 即服务的云服务提供商包括:

注意

注意:Apache Spark 也可以轻松地在本地机器上安装用于测试,在这种情况下,集群节点通过线程进行模拟。

Apache Spark 架构

下图展示了 Apache Spark 框架的主要组件:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_07_01.jpg

Spark 高层架构

  • Spark SQL:该组件的核心数据结构是 Spark DataFrame,使得熟悉 SQL 语言的用户能够轻松地处理结构化数据。

  • Spark Streaming:用于处理流式数据的模块。正如我们稍后所看到的,我们将在示例应用中使用该模块,特别是 Spark 2.0 引入的结构化流处理(Structured Streaming)。

  • MLlib:提供一个功能丰富的机器学习库,在 Spark 规模上运行。

  • GraphX:用于执行图并行计算的模块。

如下图所示,主要有两种方式可以与 Spark 集群工作:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_07_02.jpg

与 Spark 集群工作的两种方式

  • spark-submit:用于在集群上启动 Spark 应用的 Shell 脚本

  • Notebooks:与 Spark 集群交互式执行代码语句

本书不涵盖 spark-submit shell 脚本的内容,但可以在以下网址找到官方文档:spark.apache.org/docs/latest/submitting-applications.html。在本章的其余部分,我们将重点介绍通过 Jupyter Notebooks 与 Spark 集群进行交互。

配置 Notebooks 以便与 Spark 一起使用

本节中的说明仅涵盖在本地安装 Spark 用于开发和测试。手动在集群中安装 Spark 超出了本书的范围。如果需要真正的集群,强烈建议使用基于云的服务。

默认情况下,本地 Jupyter Notebooks 会安装普通的 Python 内核。为了与 Spark 一起使用,用户必须执行以下步骤:

  1. spark.apache.org/downloads.html下载二进制分发包,安装 Spark 到本地。

  2. 使用以下命令在临时目录中生成内核规范:

    ipython kernel install --prefix /tmp
    
    

    注意

    注意:上述命令可能会生成警告消息,只要显示以下信息,这些警告可以安全忽略:

    已在/tmp/share/jupyter/kernels/python3 中安装 kernelspec python3

  3. 转到 /tmp/share/jupyter/kernels/python3,编辑 kernel.json 文件,向 JSON 对象中添加以下键(将 <<spark_root_path>> 替换为你安装 Spark 的目录路径,将 <<py4j_version>> 替换为你系统上安装的版本):

    "env": {
        "PYTHONPATH": "<<spark_root_path>>/python/:<<spark_root_path>>/python/lib/py4j-<<py4j_version>>-src.zip",
        "SPARK_HOME": "<<spark_root_path>>",
        "PYSPARK_SUBMIT_ARGS": "--master local[10] pyspark-shell",
        "SPARK_DRIVER_MEMORY": "10G",
        "SPARK_LOCAL_IP": "127.0.0.1",
        "PYTHONSTARTUP": "<<spark_root_path>>/python/pyspark/shell.py"
    }
    
  4. 你可能还想自定义 display_name 键,以使其在 Juptyer 界面中独特且易于识别。如果你需要查看现有内核的列表,可以使用以下命令:

    jupyter kernelspec list
    
    

    前述命令将为你提供内核名称和相关路径的列表。从路径中,你可以打开 kernel.json 文件,访问 display_name 值。例如:

     Available kernels:
     pixiedustspark16
     /Users/dtaieb/Library/Jupyter/kernels/pixiedustspark16
     pixiedustspark21
     /Users/dtaieb/Library/Jupyter/kernels/pixiedustspark21
     pixiedustspark22
     /Users/dtaieb/Library/Jupyter/kernels/pixiedustspark22
     pixiedustspark23
     /Users/dtaieb/Library/Jupyter/kernels/pixiedustspark23
    
    
  5. 使用以下命令安装带有编辑文件的内核:

    jupyter kernelspec install /tmp/share/jupyter/kernels/python3
    
    

    注意

    注意:根据环境不同,你可能在运行前述命令时会遇到“权限拒绝”的错误。在这种情况下,你可能需要使用管理员权限运行该命令,使用 sudo 或者按如下方式使用 --user 开关:

    jupyter kernelspec install --user /tmp/share/jupyter/kernels/python3

    如需了解更多安装选项的信息,可以使用 -h 开关。例如:

     jupyter kernelspec install -h
    
    
  6. 重启 Notebook 服务器并开始使用新的 PySpark 内核。

幸运的是,PixieDust 提供了一个 install 脚本来自动化前述的手动步骤。

注意

你可以在这里找到该脚本的详细文档:

pixiedust.github.io/pixiedust/install.html

简而言之,使用自动化 PixieDust install 脚本需要发出以下命令并按照屏幕上的说明操作:

jupyter pixiedust install

本章稍后会深入探讨 Spark 编程模型,但现在让我们在下一节定义我们 Twitter 情感分析 应用的 MVP 要求。

Twitter 情感分析应用

和往常一样,我们首先定义 MVP 版本的要求:

  • 连接 Twitter,获取由用户提供的查询字符串过滤的实时推文流

  • 丰富推文,添加情感信息和从文本中提取的相关实体

  • 使用实时图表显示有关数据的各种统计信息,并在指定的时间间隔内更新图表

  • 系统应该能够扩展到 Twitter 数据规模

以下图示展示了我们应用架构的第一个版本:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_07_03.jpg

Twitter 情感分析架构版本 1

对于第一个版本,应用将完全在一个 Python Notebook 中实现,并调用外部服务处理 NLP 部分。为了能够扩展,我们肯定需要将一些处理外部化,但对于开发和测试,我发现能够将整个应用封装在一个 Notebook 中显著提高了生产力。

至于库和框架,我们将使用 Tweepy(www.tweepy.org)连接到 Twitter,使用 Apache Spark 结构化流处理(spark.apache.org/streaming)处理分布式集群中的流数据,使用 Watson Developer Cloud Python SDK(github.com/watson-developer-cloud/python-sdk)访问 IBM Watson 自然语言理解(www.ibm.com/watson/services/natural-language-understanding)服务。

第一部分 – 使用 Spark 结构化流处理获取数据

为了获取数据,我们使用 Tweepy,它提供了一个优雅的 Python 客户端库来访问 Twitter API。Tweepy 支持的 API 非常广泛,详细介绍超出了本书的范围,但你可以在 Tweepy 官方网站找到完整的 API 参考:tweepy.readthedocs.io/en/v3.6.0/cursor_tutorial.html

你可以直接通过 PyPi 安装 Tweepy 库,使用pip install命令。以下命令展示了如何通过 Notebook 使用!指令安装:

!pip install tweepy

注意

注意:当前使用的 Tweepy 版本是 3.6.0。安装完库后,别忘了重启内核。

数据管道架构图

在深入了解数据管道的每个组件之前,最好先了解其整体架构并理解计算流。

如下图所示,我们首先创建一个 Tweepy 流,将原始数据写入 CSV 文件。然后,我们创建一个 Spark Streaming 数据框,读取 CSV 文件并定期更新新数据。从 Spark Streaming 数据框中,我们使用 SQL 创建一个 Spark 结构化查询,并将其结果存储在 Parquet 数据库中:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_07_04.jpg

流计算流程

Twitter 身份验证

在使用任何 Twitter API 之前,建议先进行身份验证。最常用的身份验证机制之一是 OAuth 2.0 协议(oauth.net),该协议使第三方应用程序能够访问网络服务。你需要做的第一件事是获取一组密钥字符串,这些字符串由 OAuth 协议用于对你进行身份验证:

  • 消费者密钥:唯一标识客户端应用程序的字符串(即 API 密钥)。

  • 消费者密钥:仅应用程序和 Twitter OAuth 服务器知道的密钥字符串。可以将其视为密码。

  • 访问令牌:用于验证请求的字符串。该令牌也在授权阶段用于确定应用程序的访问级别。

  • 访问令牌密钥:与消费者密钥类似,这是与访问令牌一起发送的密码字符串,用作密码。

要生成前面的密钥字符串,您需要访问apps.twitter.com,使用您的常规 Twitter 用户 ID 和密码进行身份验证,并按照以下步骤操作:

  1. 使用创建新应用按钮创建一个新的 Twitter 应用。

  2. 填写应用程序详情,同意开发者协议,然后点击创建您的 Twitter 应用按钮。

    提示

    注意:确保您的手机号码已添加到个人资料中,否则在创建 Twitter 应用时会出现错误。

    您可以为必填项网站输入提供一个随机 URL,并将URL输入留空,因为这是一个可选的回调 URL。

  3. 点击密钥和访问令牌标签以获取消费者和访问令牌。您可以随时使用页面上提供的按钮重新生成这些令牌。如果您这么做,您还需要在应用程序代码中更新这些值。

为了更容易进行代码维护,我们将把这些令牌放在 Notebook 顶部的单独变量中,并创建我们稍后将使用的tweepy.OAuthHandler类:

from tweepy import OAuthHandler
# Go to http://apps.twitter.com and create an app.
# The consumer key and secret will be generated for you after
consumer_key="XXXX"
consumer_secret="XXXX"

# After the step above, you will be redirected to your app's page.
# Create an access token under the "Your access token" section
access_token="XXXX"
access_token_secret="XXXX"

auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

创建 Twitter 流

为了实现我们的应用程序,我们只需要使用这里文档化的 Twitter 流 API:tweepy.readthedocs.io/en/v3.5.0/streaming_how_to.html。在此步骤中,我们创建一个 Twitter 流,将传入的数据存储到本地文件系统中的 CSV 文件中。通过继承自tweepy.streaming.StreamListener的自定义RawTweetsListener类完成此操作。通过重写on_data方法来处理传入数据的自定义处理。

在我们的案例中,我们希望使用标准 Python csv模块中的DictWriter将传入的 JSON 数据转换为 CSV 格式。由于 Spark Streaming 文件输入源仅在输入目录中创建新文件时触发,因此我们不能简单地将数据追加到现有文件中。相反,我们将数据缓冲到一个数组中,并在缓冲区达到容量时将其写入磁盘。

注意

为了简化,实施中没有包括处理完文件后的清理工作。另一个小的限制是,我们目前等待缓冲区填满后再写入文件,理论上如果没有新推文出现,这可能需要很长时间。

RawTweetsListener的代码如下所示:

from six import iteritems
import json
import csv
from tweepy.streaming import StreamListener
class RawTweetsListener(StreamListener):
    def __init__(self):
        self.buffered_data = []
        self.counter = 0

    def flush_buffer_if_needed(self):
        "Check the buffer capacity and write to a new file if needed"
        length = len(self.buffered_data)
        if length > 0 and length % 10 == 0:
            with open(os.path.join( output_dir,
                "tweets{}.csv".format(self.counter)), "w") as fs:
                self.counter += 1
                csv_writer = csv.DictWriter( fs,
                    fieldnames = fieldnames)
                for data in self.buffered_data:
 csv_writer.writerow(data)
            self.buffered_data = []

    def on_data(self, data):
        def transform(key, value):
            return transformskey if key in transforms else value

        self.buffered_data.append(
            {key:transform(key,value) \
                 for key,value in iteritems(json.loads(data)) \
                 if key in fieldnames}
        )
        self.flush_buffer_if_needed()
        return True

    def on_error(self, status):
        print("An error occured while receiving streaming data: {}".format(status))
        return False

注意

您可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode1.py

从前面的代码中有几个重要的地方需要注意:

  • 每条来自 Twitter API 的推文都包含大量数据,我们使用field_metadata变量选择保留的字段。我们还定义了一个全局变量fieldnames,它保存了要从流中捕获的字段列表,以及一个transforms变量,它包含一个字典,字典的键是所有具有变换函数的字段名,值是变换函数本身:

    from pyspark.sql.types import StringType, DateType
    from bs4 import BeautifulSoup as BS
    fieldnames = [f["name"] for f in field_metadata]
    transforms = {
        item['name']:item['transform'] for item in field_metadata if "transform" in item
    }
    field_metadata = [
        {"name": "created_at","type": DateType()},
        {"name": "text", "type": StringType()},
        {"name": "source", "type": StringType(),
             "transform": lambda s: BS(s, "html.parser").text.strip()
        }
    ]
    

    注意

    你可以在这里找到代码文件:

    github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode2.py

  • CSV 文件被写入定义在自己的变量中的output_dir目录。在启动时,我们首先删除该目录及其内容:

    import shutil
    def ensure_dir(dir, delete_tree = False):
        if not os.path.exists(dir):
            os.makedirs(dir)
        elif delete_tree:
            shutil.rmtree(dir)
            os.makedirs(dir)
        return os.path.abspath(dir)
    
    root_dir = ensure_dir("output", delete_tree = True)
    output_dir = ensure_dir(os.path.join(root_dir, "raw"))
    
    

    注意

    你可以在这里找到代码文件:

    github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode3.py

  • field_metadata包含了 Spark DataType,我们稍后将在创建 Spark 流查询时使用它来构建模式。

  • field_metadata还包含一个可选的变换lambda函数,用于在将值写入磁盘之前清理数据。作为参考,Python 中的 lambda 函数是一个内联定义的匿名函数(请参见docs.python.org/3/tutorial/controlflow.html#lambda-expressions)。我们在此使用它来处理常常以 HTML 片段形式返回的源字段。在这个 lambda 函数中,我们使用了 BeautifulSoup 库(它也在上一章中使用过)来提取只有文本的内容,如以下代码片段所示:

    lambda s: BS(s, "html.parser").text.strip()
    

现在,RawTweetsListener已经创建,我们定义了一个start_stream函数,稍后将在 PixieApp 中使用。此函数接受一个搜索词数组作为输入,并使用filter方法启动一个新的流:

from tweepy import Stream
def start_stream(queries):
    "Asynchronously start a new Twitter stream"
    stream = Stream(auth, RawTweetsListener())
 stream.filter(track=queries, async=True)
    return stream

注意

注意到传递给stream.filterasync=True参数。这是必要的,确保该函数不会阻塞,这样我们就可以在 Notebook 中运行其他代码。

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode4.py

以下代码启动了一个流,它将接收包含单词baseball的推文:

stream = start_stream(["baseball"])

当运行上述代码时,Notebook 中不会生成任何输出。然而,你可以在输出目录(即../output/raw)中看到生成的文件(如tweets0.csvtweets1.csv等),这些文件位于 Notebook 运行的路径下。

要停止流,我们只需调用disconnect方法,如下所示:

stream.disconnect()

创建一个 Spark Streaming DataFrame

根据架构图,下一步是创建一个 Spark Streaming DataFrame tweets_sdf,该 DataFrame 使用 output_dir 作为源文件输入。我们可以把 Streaming DataFrame 看作一个没有边界的表格,随着新数据从流中到达,新的行会不断被添加进来。

注意

注意:Spark Structured Streaming 支持多种类型的输入源,包括文件、Kafka、Socket 和 Rate。(Socket 和 Rate 仅用于测试。)

以下图表摘自 Spark 网站,能够很好地解释新数据是如何被添加到 Streaming DataFrame 中的:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_07_05.jpg

Streaming DataFrame 流程

来源: spark.apache.org/docs/latest/img/structured-streaming-stream-as-a-table.png

Spark Streaming Python API 提供了一种优雅的方式来使用 spark.readStream 属性创建 Streaming DataFrame,该属性会创建一个新的 pyspark.sql.streamingreamReader 对象,方便你链式调用方法,并能让代码更加清晰(有关此模式的更多细节,请参见 en.wikipedia.org/wiki/Method_chaining)。

例如,要创建一个 CSV 文件流,我们调用 format 方法并传入 csv,接着链式调用适用的选项,并通过指定目录路径调用 load 方法:

schema = StructType(
[StructField(f["name"], f["type"], True) for f in field_metadata]
)
csv_sdf = spark.readStream\
    .format("csv")\
 .option("schema", schema)\
    .option("multiline", True)\
 .option("dateFormat", 'EEE MMM dd kk:mm:ss Z y')\
    .option("ignoreTrailingWhiteSpace", True)\
    .option("ignoreLeadingWhiteSpace", True)\
    .load(output_dir)

注意

你可以在此处找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode5.py

spark.readStream 还提供了一个方便的高阶 csv 方法,它将路径作为第一个参数,并为选项提供关键字参数:

csv_sdf = spark.readStream \
    .csv(
        output_dir,
        schema=schema,
        multiLine = True,
        dateFormat = 'EEE MMM dd kk:mm:ss Z y',
        ignoreTrailingWhiteSpace = True,
        ignoreLeadingWhiteSpace = True
    )

注意

你可以在此处找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode6.py

你可以通过调用 isStreaming 方法来验证 csv_sdf DataFrame 是否真的是一个 Streaming DataFrame,返回值应为 true。以下代码还添加了 printSchema 方法的调用,以验证 schema 是否按照 field_metadata 配置如预期那样:

print(csv_sdf.isStreaming)
csv_sdf.printSchema()

返回值:

root
 |-- created_at: date (nullable = true)
 |-- text: string (nullable = true)
 |-- source: string (nullable = true)

在继续下一步之前,理解csv_sdf流数据框如何适应结构化流编程模型及其局限性非常重要。从本质上讲,Spark 的低级 API 定义了弹性分布式数据集RDD)数据结构,它封装了管理分布式数据的所有底层复杂性。像容错(集群节点因任何原因崩溃时,框架会自动重启节点,无需开发者干预)等特性都由框架自动处理。RDD 操作有两种类型:转换和动作。转换是对现有 RDD 的逻辑操作,直到调用动作操作时,转换才会在集群上立即执行(懒执行)。转换的输出是一个新的 RDD。内部,Spark 维护一个 RDD 有向无环图(DAG),记录所有生成 RDD 的血统,这在从服务器故障恢复时非常有用。常见的转换操作包括mapflatMapfiltersampledistinct。对数据框的转换(数据框在内部由 RDD 支持)也适用,且它们具有包括 SQL 查询的优点。另一方面,动作不会生成其他 RDD,而是对实际分布式数据执行操作,返回非 RDD 值。常见的动作操作包括reducecollectcounttake

如前所述,csv_sdf是一个流式数据框(Streaming DataFrame),这意味着数据会持续被添加到其中,因此我们只能对其应用转换,而不能执行操作。为了解决这个问题,我们必须先使用csv_sdf.writeStream创建一个流查询,这是一个pyspark.sql.streaming.DataStreamWriter对象。流查询负责将结果发送到输出接收器。然后,我们可以通过start()方法运行流查询。

Spark Streaming 支持多种输出接收器类型:

  • 文件:支持所有经典文件格式,包括 JSON、CSV 和 Parquet

  • Kafka:直接写入一个或多个 Kafka 主题

  • Foreach:对集合中的每个元素执行任意计算

  • 控制台:将输出打印到系统控制台(主要用于调试)

  • 内存:输出存储在内存中

在下一节中,我们将创建并运行一个结构化查询,针对csv_sdf使用输出接收器将结果存储为 Parquet 格式。

创建并运行结构化查询

使用tweets_sdf流数据框,我们创建一个流查询tweet_streaming_query,该查询将数据以append输出模式写入 Parquet 格式。

注意

注意:Spark 流查询支持三种输出模式:complete,每次触发时写入整个表;append,只写入自上次触发以来的增量行;以及update,只写入已修改的行。

Parquet 是一种列式数据库格式,提供了高效、可扩展的分布式分析存储。你可以在这里找到有关 Parquet 格式的更多信息:parquet.apache.org

以下代码创建并启动 tweet_streaming_query 流式查询:

tweet_streaming_query = csv_sdf \
    .writeStream \
    .format("parquet") \
 .option("path", os.path.join(root_dir, "output_parquet")) \
 .trigger(processingTime="2 seconds") \
 .option("checkpointLocation", os.path.join(root_dir, "output_chkpt")) \
    .start()

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode7.py

类似地,你可以使用 stop() 方法来停止流式查询,如下所示:

tweet_streaming_query.stop()

在上述代码中,我们使用 path 选项指定 Parquet 文件的位置,并使用 checkpointLocation 指定在服务器故障时用于恢复的数据位置。我们还指定了从流中读取新数据并将新行添加到 Parquet 数据库的触发间隔。

出于测试目的,你也可以使用 console sink 来查看每次生成新原始 CSV 文件时从 output_dir 目录读取的新行:

tweet_streaming_query = csv_sdf.writeStream\
    .outputMode("append")\
    .format("console")\
    .trigger(processingTime='2 seconds')\
    .start()

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode8.py

你可以在 Spark 集群主节点的系统输出中查看结果(你需要物理访问主节点机器并查看日志文件,因为不幸的是,由于操作在不同的进程中执行,输出不会显示在笔记本中。日志文件的位置取决于集群管理软件;有关更多信息,请参阅具体的文档)。

以下是特定批次显示的示例结果(标识符已被屏蔽):

-------------------------------------------
Batch: 17
-------------------------------------------
+----------+--------------------+-------------------+
|created_at|                text|             source|
+----------+--------------------+-------------------+
|2018-04-12|RT @XXXXXXXXXXXXX...|Twitter for Android|
|2018-04-12|RT @XXXXXXX: Base...| Twitter for iPhone|
|2018-04-12|That's my roommat...| Twitter for iPhone|
|2018-04-12|He's come a long ...| Twitter for iPhone|
|2018-04-12|RT @XXXXXXXX: U s...| Twitter for iPhone|
|2018-04-12|Baseball: Enid 10...|   PushScoreUpdates|
|2018-04-12|Cubs and Sox aren...| Twitter for iPhone|
|2018-04-12|RT @XXXXXXXXXX: T...|          RoundTeam|
|2018-04-12|@XXXXXXXX that ri...| Twitter for iPhone|
|2018-04-12|RT @XXXXXXXXXX: S...| Twitter for iPhone|
+----------+--------------------+-------------------+

监控活动流式查询

当流式查询启动时,Spark 会分配集群资源。因此,管理和监控这些查询非常重要,以确保你不会耗尽集群资源。随时可以通过以下代码获取所有正在运行的查询列表:

print(spark.streams.active)

结果:

[<pyspark.sql.streaming.StreamingQuery object at 0x12d7db6a0>, <pyspark.sql.streaming.StreamingQuery object at 0x12d269c18>]

然后,你可以通过使用以下查询监控属性来深入了解每个查询的细节:

  • id:返回查询的唯一标识符,该标识符在重启时仍会保留(从检查点数据恢复)

  • runId:返回为当前会话生成的唯一 ID

  • explain():打印查询的详细解释

  • recentProgress:返回最近的进度更新数组

  • lastProgress:返回最新的进度

以下代码打印每个活动查询的最新进度:

import json
for query in spark.streams.active:
    print("-----------")
    print("id: {}".format(query.id))
    print(json.dumps(query.lastProgress, indent=2, sort_keys=True))

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode9.py

第一个查询的结果显示如下:

-----------
id: b621e268-f21d-4eef-b6cd-cb0bc66e53c4
{
  "batchId": 18,
  "durationMs": {
    "getOffset": 4,
    "triggerExecution": 4
  },
  "id": "b621e268-f21d-4eef-b6cd-cb0bc66e53c4",
  "inputRowsPerSecond": 0.0,
  "name": null,
  "numInputRows": 0,
  "processedRowsPerSecond": 0.0,
  "runId": "d2459446-bfad-4648-ae3b-b30c1f21be04",
  "sink": {
    "description": "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@586d2ad5"
  },
  "sources": [
    {
      "description": "FileStreamSource[file:/Users/dtaieb/cdsdev/notebookdev/Pixiedust/book/Chapter7/output/raw]",
      "endOffset": {
        "logOffset": 17
      },
      "inputRowsPerSecond": 0.0,
      "numInputRows": 0,
      "processedRowsPerSecond": 0.0,
      "startOffset": {
        "logOffset": 17
      }
    }
  ],
  "stateOperators": [],
  "timestamp": "2018-04-12T21:40:10.004Z"
}

作为读者的练习,构建一个 PixieApp,它提供一个实时仪表盘,显示每个活跃流查询的更新详情,会很有帮助。

注意

注意:我们将在第三部分 – 创建实时仪表盘 PixieApp中展示如何构建这个 PixieApp。

从 Parquet 文件创建批处理 DataFrame

注意

注意:在本章的其余部分,我们将批处理 Spark DataFrame 定义为经典 Spark DataFrame,即非流式的。

这个流计算流程的最后一步是创建一个或多个批处理 DataFrame,我们可以用来构建分析和数据可视化。我们可以将这最后一步视为对数据进行快照,以便进行更深层次的分析。

有两种方法可以通过编程方式从 Parquet 文件加载批处理 DataFrame:

  • 使用spark.read(注意,我们不再像之前那样使用spark.readStream):

    parquet_batch_df = spark.read.parquet(os.path.join(root_dir, "output_parquet"))
    
  • 使用spark.sql

    parquet_batch_df = spark.sql(
    "select * from parquet.'{}'".format(
    os.path.join(root_dir, "output_parquet")
    )
    )
    

    注意

    你可以在这里找到代码文件:

    github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode10.py

这种方法的好处是,我们可以使用任何 ANSI SQL 查询来加载数据,而不必像第一种方法那样使用等效的低级 DataFrame API。

然后,我们可以通过重新运行前面的代码并重新创建 DataFrame 来定期刷新数据。我们现在可以为数据创建进一步的分析,例如,通过在数据上运行 PixieDust 的display()方法来生成可视化图表:

import pixiedust
display(parquet_batch_df)

我们选择条形图菜单,并将source字段拖到Keys字段区域。由于我们只想显示前 10 条推文,因此我们在要显示的行数字段中设置这个值。下图显示了 PixieDust 选项对话框:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_07_06.jpg

显示前 10 个推文来源的选项对话框

点击确定后,我们会看到以下结果:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_07_07.jpg

展示与棒球相关的推文数量按来源分类的图表

在这一部分中,我们已经展示了如何使用 Tweepy 库创建 Twitter 流,清洗原始数据并将其存储在 CSV 文件中,创建 Spark Streaming DataFrame,在其上运行流查询并将输出存储在 Parquet 数据库中,从 Parquet 文件创建批处理 DataFrame,并使用 PixieDust 的display()方法进行数据可视化。

注意

第一部分 – 使用 Spark 结构化流获取数据的完整笔记本可以在这里找到:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/Twitter%20Sentiment%20Analysis%20-%20Part%201.ipynb

在下一部分中,我们将探讨如何使用 IBM Watson 自然语言理解服务,丰富数据中的情感分析和实体提取。

第二部分 - 使用情感和最相关的提取实体丰富数据

在这一部分,我们将推特数据与情感信息进行丰富处理,例如,正面负面中性。我们还希望从推文中提取出最相关的实体,例如,运动,组织和地点。这些额外的信息将通过我们在下一部分构建的实时仪表板进行分析和可视化。从非结构化文本中提取情感和实体所使用的算法属于计算机科学和人工智能领域,称为自然语言处理NLP)。网上有许多教程提供了提取情感的算法示例。例如,您可以在 scikit-learn 的 GitHub 仓库找到一个全面的文本分析教程,链接为github.com/scikit-learn/scikit-learn/blob/master/doc/tutorial/text_analytics/working_with_text_data.rst

然而,对于这个示例应用程序,我们不会构建自己的 NLP 算法。而是选择一个提供文本分析(如情感和实体提取)的云服务。当您的需求比较通用,不需要训练自定义模型时,这种方法效果很好,尽管即便如此,许多服务提供商现在也提供了相关工具来完成此类任务。使用云服务提供商相比自己创建模型具有显著优势,比如节省开发时间、提高准确性和性能。通过简单的 REST 调用,我们可以生成所需数据并将其集成到应用程序流程中。如果需要,切换服务提供商也非常容易,因为与服务接口的代码已经很好地隔离。

对于这个示例应用程序,我们将使用IBM Watson 自然语言理解NLU)服务,它是 IBM Watson 认知服务家族的一部分,并且可以在 IBM Cloud 上使用。

开始使用 IBM Watson 自然语言理解服务

为新服务提供资源的过程对于每个云服务提供商通常都是相同的。登录后,您将进入服务目录页面,在那里可以搜索特定的服务。

要登录到 IBM Cloud,只需访问console.bluemix.net,如果还没有 IBM 账户,可以创建一个免费的账户。进入仪表板后,有多种方式可以搜索 IBM Watson NLU 服务:

然后,你可以点击自然语言理解来配置一个新实例。云服务提供商通常会为一些服务提供免费的或基于试用的计划,幸运的是,Watson NLU 也提供了这样的计划,但有限制,你只能训练一个自定义模型,每月最多处理 30,000 个 NLU 项目(对于我们的示例应用足够了)。选择Lite(免费)计划并点击创建按钮后,新配置的实例将出现在仪表盘上,并准备好接受请求。

注意

注意:创建服务后,你可能会被重定向到 NLU 服务的入门文档。如果是这种情况,只需返回仪表盘,应该能看到新创建的服务实例。

下一步是通过在笔记本中发出 REST 调用来测试服务。每个服务都会提供详细的文档,说明如何使用,包括 API 参考。在笔记本中,我们可以使用 requests 包根据 API 参考发出 GET、POST、PUT 或 DELETE 请求,但强烈建议检查服务是否提供具有高级编程访问功能的 SDK。

幸运的是,IBM Watson 提供了watson_developer_cloud开源库,其中包含多个支持流行编程语言(包括 Java、Python 和 Node.js)的开源 SDK。对于本项目,我们将使用 Python SDK,源代码和示例代码可以在此找到:github.com/watson-developer-cloud/python-sdk

以下pip命令直接从 Jupyter Notebook 安装watson_developer_cloud包:

!pip install Watson_developer_cloud

注意

请注意命令前的!,它表示这是一个 shell 命令。

注意:安装完成后,别忘了重新启动内核。

大多数云服务提供商使用一种通用模式,允许用户通过服务控制台仪表盘生成一组凭证,然后将其嵌入到客户端应用程序中。要生成凭证,只需点击 Watson NLU 实例的服务凭证标签,然后点击新建凭证按钮。

这将生成一组新的凭证,格式为 JSON,如下截图所示:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_07_09.jpg

为 Watson NLU 服务生成新凭证

现在我们已经拥有了服务的凭据,我们可以创建一个 NaturalLanguageUnderstandingV1 对象,它将提供对 REST API 的编程访问,如下所示的代码所示:

from watson_developer_cloud import NaturalLanguageUnderstandingV1
from watson_developer_cloud.natural_language_understanding_v1 import Features, SentimentOptions, EntitiesOptions

nlu = NaturalLanguageUnderstandingV1(
    version='2017-02-27',
    username='XXXX',
    password='XXXX'
)

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode11.py

注意:在前面的代码中,将 XXXX 文本替换为服务凭据中的适当用户名和密码。

version 参数指的是 API 的特定版本。要了解最新版本,请访问此处的官方文档页面:

www.ibm.com/watson/developercloud/natural-language-understanding/api/v1

在继续构建应用程序之前,让我们花点时间了解 Watson 自然语言服务所提供的文本分析功能,包括:

  • 情感

  • 实体

  • 概念

  • 类别

  • 情感

  • 关键词

  • 关系

  • 语义角色

在我们的应用程序中,Twitter 数据的丰富化发生在 RawTweetsListener 中,我们在其中创建了一个 enrich 方法,该方法将从 on_data 处理程序方法中调用。在这个方法中,我们使用 Twitter 数据和仅包含情感和实体的特征列表调用 nlu.analyze 方法,如下所示的代码所示:

注意

注意[[RawTweetsListener]] 符号表示以下代码是一个名为 RawTweetsListener 的类的一部分,用户不应尝试在没有完整类的情况下直接运行代码。像往常一样,您可以参考完整的笔记本进行查看。

[[RawTweetsListener]]
def enrich(self, data):
    try:
        response = nlu.analyze(
 text = data['text'],
 features = Features(
 sentiment=SentimentOptions(),
 entities=EntitiesOptions()
 )
 )
        data["sentiment"] = response["sentiment"]["document"]["label"]
        top_entity = response["entities"][0] if len(response["entities"]) > 0 else None
        data["entity"] = top_entity["text"] if top_entity is not None else ""
        data["entity_type"] = top_entity["type"] if top_entity is not None else ""
        return data
    except Exception as e:
 self.warn("Error from Watson service while enriching data: {}".format(e))

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode12.py

结果将存储在 data 对象中,随后会写入 CSV 文件。我们还会防范意外异常,跳过当前推文并记录警告信息,而不是让异常冒泡,从而停止 Twitter 流。

注意

注意:最常见的异常发生在推文数据使用该服务不支持的语言时。

我们使用在第五章中描述的 @Logger 装饰器,Python 和 PixieDust 最佳实践与高级概念,通过 PixieDust 日志框架记录日志消息。提醒一下,您可以使用来自另一个单元的 %pixiedustLog 魔法命令来查看日志消息。

我们仍然需要更改模式元数据以包括新的字段,如下所示:

field_metadata = [
    {"name": "created_at", "type": DateType()},
    {"name": "text", "type": StringType()},
    {"name": "source", "type": StringType(),
         "transform": lambda s: BS(s, "html.parser").text.strip()
    },
 {"name": "sentiment", "type": StringType()},
 {"name": "entity", "type": StringType()},
 {"name": "entity_type", "type": StringType()}
]

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode13.py

最后,我们更新on_data处理程序以调用enrich方法,如下所示:

def on_data(self, data):
    def transform(key, value):
        return transformskey if key in transforms else value
    data = self.enrich(json.loads(data))
 if data is not None:
        self.buffered_data.append(
            {key:transform(key,value) \
                for key,value in iteritems(data) \
                if key in fieldnames}
        )
        self.flush_buffer_if_needed()
    return True

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode14.py

当我们重新启动 Twitter 流并创建 Spark Streaming DataFrame 时,我们可以通过以下代码验证我们是否有正确的模式:

schema = StructType(
    [StructField(f["name"], f["type"], True) for f in field_metadata]
)
csv_sdf = spark.readStream \
    .csv(
        output_dir,
        schema=schema,
        multiLine = True,
        dateFormat = 'EEE MMM dd kk:mm:ss Z y',
        ignoreTrailingWhiteSpace = True,
        ignoreLeadingWhiteSpace = True
    )
csv_sdf.printSchema()

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode15.py

这将显示如下结果,如预期:

root
 |-- created_at: date (nullable = true)
 |-- text: string (nullable = true)
 |-- source: string (nullable = true)
 |-- sentiment: string (nullable = true)
 |-- entity: string (nullable = true)
 |-- entity_type: string (nullable = true)

类似地,当我们使用console接收器运行结构化查询时,数据将按批次显示在 Spark 主节点的控制台中,如下所示:

-------------------------------------------
Batch: 2
-------------------------------------------
+----------+---------------+---------------+---------+------------+-------------+
|created_at|           text|         source|sentiment|      entity|  entity_type|
+----------+---------------+---------------+---------+------------+-------------+
|2018-04-14|Some little ...| Twitter iPhone| positive|        Drew|       Person|d
|2018-04-14|RT @XXXXXXXX...| Twitter iPhone|  neutral| @XXXXXXXXXX|TwitterHandle|
|2018-04-14|RT @XXXXXXXX...| Twitter iPhone|  neutral|    baseball|        Sport|
|2018-04-14|RT @XXXXXXXX...| Twitter Client|  neutral| @XXXXXXXXXX|TwitterHandle|
|2018-04-14|RT @XXXXXXXX...| Twitter Client| positive| @XXXXXXXXXX|TwitterHandle|
|2018-04-14|RT @XXXXX: I...|Twitter Android| positive| Greg XXXXXX|       Person|
|2018-04-14|RT @XXXXXXXX...| Twitter iPhone| positive| @XXXXXXXXXX|TwitterHandle|
|2018-04-14|RT @XXXXX: I...|Twitter Android| positive| Greg XXXXXX|       Person|
|2018-04-14|Congrats to ...|Twitter Android| positive|    softball|        Sport|
|2018-04-14|translation:...| Twitter iPhone|  neutral|        null|         null|
+----------+---------------+---------------+---------+------------+-------------+

最后,我们使用 Parquet 的output接收器运行结构化查询,创建一个批量 DataFrame,并使用 PixieDust 的display()探索数据,展示例如按情感(positivenegativeneutral)聚类的推文计数,如下图所示:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_07_10.jpg

显示按情感分类的推文数的条形图,按实体聚类

注意

完整的笔记本《第二部分——通过情感和最相关的提取实体丰富数据》位于此处:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/Twitter%20Sentiment%20Analysis%20-%20Part%202.ipynb

如果你正在运行它,我鼓励你通过向模式添加更多字段、运行不同的 SQL 查询,并使用 PixieDust 的display()来可视化数据进行实验。

在接下来的部分,我们将构建一个展示 Twitter 数据多个指标的仪表盘。

第三部分——创建实时仪表盘 PixieApp

一如既往,我们首先需要定义 MVP 版本仪表盘的需求。这次我们将借用敏捷方法中的一个工具,称为用户故事,它从用户的角度描述我们希望构建的功能。敏捷方法还要求我们通过将不同的用户分类为角色,充分理解与软件互动的用户的背景。在我们的案例中,我们只使用一个角色:Frank,市场营销总监,想要实时了解消费者在社交媒体上讨论的内容

用户故事是这样的:

  • Frank 输入类似产品名称的搜索查询

  • 然后,展示一个仪表板,显示一组图表,展示有关用户情绪(正面、负面、中立)的度量

  • 仪表板还包含一个展示所有在推文中提到的实体的词云

  • 此外,仪表板还提供了一个选项,可以显示当前所有活跃的 Spark Streaming 查询的实时进度

注意

注意:最后一个功能对于 Frank 来说并不是必需的,但我们还是在这里展示它,作为之前练习的示例实现。

将分析功能重构为独立的方法

在开始之前,我们需要将启动 Twitter 流和创建 Spark Streaming 数据框的代码重构为独立的方法,并在 PixieApp 中调用这些方法。

start_stream, start_streaming_dataframestart_parquet_streaming_query 方法如下:

def start_stream(queries):
    "Asynchronously start a new Twitter stream"
    stream = Stream(auth, RawTweetsListener())
    stream.filter(track=queries, languages=["en"], async=True)
    return stream

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode16.py

def start_streaming_dataframe(output_dir):
    "Start a Spark Streaming DataFrame from a file source"
    schema = StructType(
        [StructField(f["name"], f["type"], True) for f in field_metadata]
    )
    return spark.readStream \
        .csv(
            output_dir,
            schema=schema,
            multiLine = True,
            timestampFormat = 'EEE MMM dd kk:mm:ss Z yyyy',
            ignoreTrailingWhiteSpace = True,
            ignoreLeadingWhiteSpace = True
        )

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode17.py

def start_parquet_streaming_query(csv_sdf):
    """
    Create and run a streaming query from a Structured DataFrame
    outputing the results into a parquet database
    """
    streaming_query = csv_sdf \
      .writeStream \
      .format("parquet") \
      .option("path", os.path.join(root_dir, "output_parquet")) \
      .trigger(processingTime="2 seconds") \
      .option("checkpointLocation", os.path.join(root_dir, "output_chkpt")) \
      .start()
    return streaming_query

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode18.py

作为准备工作的一部分,我们还需要管理 PixieApp 将要创建的不同流的生命周期,并确保在用户重新启动仪表板时,底层资源被正确停止。为此,我们创建了一个StreamsManager类,封装了 Tweepy 的twitter_stream和 CSV 流数据框。这个类有一个reset方法,它会停止twitter_stream,停止所有活动的流查询,删除先前查询创建的所有输出文件,并使用新的查询字符串启动一个新的流。如果reset方法在没有查询字符串的情况下被调用,我们将不会启动新的流。

我们还创建了一个全局的streams_manager实例,它将跟踪当前状态,即使仪表板被重新启动。由于用户可以重新运行包含全局streams_manager的单元,我们需要确保在当前全局实例被删除时,reset方法会自动调用。为此,我们重写了对象的__del__方法,这是 Python 实现析构函数的一种方式,并调用reset

StreamsManager的代码如下:

class StreamsManager():
    def __init__(self):
        self.twitter_stream = None
        self.csv_sdf = None

    def reset(self, search_query = None):
        if self.twitter_stream is not None:
            self.twitter_stream.disconnect()
        #stop all the active streaming queries and re_initialize the directories
        for query in spark.streams.active:
            query.stop()
        # initialize the directories
        self.root_dir, self.output_dir = init_output_dirs()
        # start the tweepy stream
        self.twitter_stream = start_stream([search_query]) if search_query is not None else None
        # start the spark streaming stream
        self.csv_sdf = start_streaming_dataframe(output_dir) if search_query is not None else None

 def __del__(self):
 # Automatically called when the class is garbage collected
 self.reset()

streams_manager = StreamsManager()

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode19.py

创建 PixieApp

如同在第六章,分析研究:TensorFlow 下的 AI 和图像识别,我们再次使用 TemplateTabbedApp 类来创建一个包含两个 PixieApp 的标签布局:

  • TweetInsightApp:允许用户指定查询字符串并显示与之关联的实时仪表盘

  • StreamingQueriesApp:监控活动结构化查询的进度

TweetInsightApp 的默认路由中,我们返回一个片段,提示用户输入查询字符串,如下所示:

from pixiedust.display.app import *
@PixieApp
class TweetInsightApp():
    @route()
    def main_screen(self):
        return """
<style>
    div.outer-wrapper {
        display: table;width:100%;height:300px;
    }
    div.inner-wrapper {
        display: table-cell;vertical-align: middle;height: 100%;width: 100%;
    }
</style>
<div class="outer-wrapper">
    <div class="inner-wrapper">
        <div class="col-sm-3"></div>
        <div class="input-group col-sm-6">
          <input id="query{{prefix}}" type="text" class="form-control"
              value=""
              placeholder="Enter a search query (e.g. baseball)">
          <span class="input-group-btn">
            <button class="btn btn-default" type="button"
 pd_options="search_query=$val(query{{prefix}})">
                Go
            </button>
          </span>
        </div>
    </div>
</div>
        """

TweetInsightApp().run()

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode20.py

以下截图显示了运行上述代码后的结果:

注意

注意:稍后我们会创建主 TwitterSentimentApp PixieApp,它具有标签布局,并包含此类。在此之前,我们只展示 TweetInsightApp 子应用程序的独立功能。

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_07_11.jpg

Twitter 情感仪表盘的欢迎界面

Go 按钮中,我们通过用户提供的查询字符串调用 search_query 路由。在这个路由中,我们首先启动各种流并创建一个批量数据框,该数据框从 Parquet 数据库所在的输出目录中存储为一个类变量,命名为 parquet_df。然后,我们返回由三个小部件组成的 HTML 片段,展示以下指标:

  • 按照实体分组的三种情感的柱状图

  • 显示推文情感分布的折线图子图

  • 用于实体的词云

每个小部件都在使用 pd_refresh_rate 属性定期调用特定的路由,相关文档可以参考第五章,Python 和 PixieDust 最佳实践与高级概念。我们还确保重新加载 parquet_df 变量,以获取自上次加载以来到达的新数据。该变量随后在 pd_entity 属性中引用,用于显示图表。

以下代码展示了 search_query 路由的实现:

import time
[[TweetInsightApp]]
@route(search_query="*")
    def do_search_query(self, search_query):
        streams_manager.reset(search_query)
        start_parquet_streaming_query(streams_manager.csv_sdf)
 while True:
 try:
 parquet_dir = os.path.join(root_dir,
 "output_parquet")
 self.parquet_df = spark.sql("select * from parquet.'{}'".format(parquet_dir))
 break
 except:
 time.sleep(5)
        return """
<div class="container">
 <div id="header{{prefix}}" class="row no_loading_msg"
 pd_refresh_rate="5000" pd_target="header{{prefix}}">
 <pd_script>
print("Number of tweets received: {}".format(streams_manager.twitter_stream.listener.tweet_count))
 </pd_script>
 </div>
    <div class="row" style="min-height:300px">
        <div class="col-sm-5">
            <div id="metric1{{prefix}}" pd_refresh_rate="10000"
                class="no_loading_msg"
                pd_options="display_metric1=true"
                pd_target="metric1{{prefix}}">
            </div>
        </div>
        <div class="col-sm-5">
            <div id="metric2{{prefix}}" pd_refresh_rate="12000"
                class="no_loading_msg"
                pd_options="display_metric2=true"
                pd_target="metric2{{prefix}}">
            </div>
        </div>
    </div>

    <div class="row" style="min-height:400px">
        <div class="col-sm-offset-1 col-sm-10">
            <div id="word_cloud{{prefix}}" pd_refresh_rate="20000"
                class="no_loading_msg"
                pd_options="display_wc=true"
                pd_target="word_cloud{{prefix}}">
            </div>
        </div>
    </div>
        """

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode21.py

从上述代码中有多个需要注意的地方:

  • 当我们尝试加载 parquet_df 批数据框时,Parquet 文件的输出目录可能尚未准备好,这会导致异常。为了解决这个时序问题,我们将代码包裹在 try...except 语句中,并使用 time.sleep(5) 等待 5 秒钟。

  • 我们还在页头显示当前推文的数量。为此,我们添加了一个每 5 秒刷新一次的<div>元素,并且在该元素中使用 <pd_script> 来打印当前的推文数量,使用 streams_manager.twitter_stream.listener.tweet_count 变量,它是我们在 RawTweetsListener 类中添加的变量。我们还更新了 on_data() 方法,以便每次新推文到达时增加 tweet_count 变量,以下代码展示了这一过程:

    [[TweetInsightApp]]
    def on_data(self, data):
            def transform(key, value):
                return transformskey if key in transforms else value
            data = self.enrich(json.loads(data))
            if data is not None:
     self.tweet_count += 1
                self.buffered_data.append(
                    {key:transform(key,value) \
                         for key,value in iteritems(data) \
                         if key in fieldnames}
                )
                self.flush_buffer_if_needed()
            return True
    

    同时,为了避免闪烁,我们通过在 <div> 元素中使用 class="no_loading_msg" 来阻止显示 加载旋转图标 图像。

  • 我们调用了三个不同的路由(display_metric1display_metric2display_wc),它们分别负责显示三个小部件。

    display_metric1display_metric2 路由非常相似。它们返回一个包含parquet_df作为pd_entitydiv,以及一个自定义的 <pd_options> 子元素,该元素包含传递给 PixieDust display() 层的 JSON 配置。

以下代码展示了 display_metric1 路由的实现:

[[TweetInsightApp]]
@route(display_metric1="*")
    def do_display_metric1(self, display_metric1):
        parquet_dir = os.path.join(root_dir, "output_parquet")
        self.parquet_df = spark.sql("select * from parquet.'{}'".format(parquet_dir))
        return """
<div class="no_loading_msg" pd_render_onload pd_entity="parquet_df">
    <pd_options>
    {
      "legend": "true",
      "keyFields": "sentiment",
      "clusterby": "entity_type",
      "handlerId": "barChart",
      "rendererId": "bokeh",
      "rowCount": "10",
      "sortby": "Values DESC",
      "noChartCache": "true"
    }
    </pd_options>
</div>
        """

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode22.py

display_metric2 路由遵循类似的模式,但使用了不同的一组 pd_options 属性。

最后一条路由是 display_wc,负责显示实体的词云。该路由使用 wordcloud Python 库,你可以通过以下命令安装它:

!pip install wordcloud

注意

注意:一如既往,安装完成后不要忘记重启内核。

我们使用了在第五章中记录的 @captureOutput 装饰器,Python 和 PixieDust 最佳实践与高级概念,如以下所示:

import matplotlib.pyplot as plt
from wordcloud import WordCloud

[[TweetInsightApp]]
@route(display_wc="*")
@captureOutput
def do_display_wc(self):
    text = "\n".join(
 [r['entity'] for r in self.parquet_df.select("entity").collect() if r['entity'] is not None]
 )
    plt.figure( figsize=(13,7) )
    plt.axis("off")
    plt.imshow(
        WordCloud(width=750, height=350).generate(text),
        interpolation='bilinear'
    )

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode23.py

传递给 WordCloud 类的文本是通过收集 parquet_df 批处理 DataFrame 中的所有实体生成的。

以下截图展示了在使用搜索查询 baseball 创建的 Twitter 流运行一段时间后的仪表盘:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_07_12.jpg

用于搜索查询“baseball”的 Twitter 情感仪表盘

第二个 PixieApp 用于监控正在积极运行的流查询。主路由返回一个 HTML 片段,该片段包含一个 <div> 元素,该元素定期(每 5000 毫秒)调用 show_progress 路由,如以下代码所示:

@PixieApp
class StreamingQueriesApp():
    @route()
    def main_screen(self):
        return """
<div class="no_loading_msg" pd_refresh_rate="5000" pd_options="show_progress=true">
</div>
        """

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode24.py

show_progress路由中,我们使用了本章之前描述的query.lastProgress监控 API,通过 Jinja2 {%for%} 循环遍历 JSON 对象,并如以下代码所示在表格中显示结果:

@route(show_progress="true")
    def do_show_progress(self):
        return """
{%for query in this.spark.streams.active%}
    <div>
    <div class="page-header">
        <h1>Progress Report for Spark Stream: {{query.id}}</h1>
    <div>
    <table>
        <thead>
          <tr>
             <th>metric</th>
             <th>value</th>
          </tr>
        </thead>
        <tbody>
 {%for key, value in query.lastProgress.items()%}
 <tr>
 <td>{{key}}</td>
 <td>{{value}}</td>
 </tr>
 {%endfor%}
        </tbody>
    </table>
{%endfor%}
        """

注意

您可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode25.py

以下截图显示了 PixieApp 的流查询监控:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_07_13.jpg

实时监控活动的 Spark 流查询

最后一步是使用TemplateTabbedApp类来集成完整的应用程序,如下所示的代码:

from pixiedust.display.app import *
from pixiedust.apps.template import TemplateTabbedApp

@PixieApp
class TwitterSentimentApp(TemplateTabbedApp):
    def setup(self):
 self.apps = [
 {"title": "Tweets Insights", "app_class": "TweetInsightApp"},
 {"title": "Streaming Queries", "app_class": "StreamingQueriesApp"}
 ]

app = TwitterSentimentApp()
app.run()

注意

您可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode26.py

我们的示例应用程序第三部分现已完成;您可以在这里找到完整的 Notebook:

注意

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/Twitter%20Sentiment%20Analysis%20-%20Part%203.ipynb

在下一部分,我们将讨论如何通过使用 Apache Kafka 进行事件流处理和 IBM Streams Designer 对流数据进行数据增强来使应用程序的数据管道更加可扩展。

第四部分 – 使用 Apache Kafka 和 IBM Streams Designer 增加可扩展性

注意

注意:本节为可选部分。它演示了如何通过使用基于云的流服务重新实现数据管道的部分,以实现更大的可扩展性。

在单个 Notebook 中实现整个数据管道使我们在开发和测试过程中具有很高的生产力。我们可以快速实验代码并测试更改,且占用的资源非常小。由于我们使用的是相对较小的数据量,性能也很合理。然而,显然我们不会在生产环境中使用这种架构,接下来我们需要问自己的是,随着来自 Twitter 的流数据量急剧增加,哪些瓶颈会阻碍应用程序的扩展。

在本节中,我们确定了两个改进的方向:

  • 在 Tweepy 流中,传入的数据会通过 on_data 方法发送到 RawTweetsListener 实例进行处理。我们需要确保在此方法中尽量减少时间消耗,否则随着传入数据量的增加,系统将会落后。在当前的实现中,数据是通过外部调用 Watson NLU 服务同步丰富的,然后将数据缓冲,最终写入磁盘。为了解决这个问题,我们将数据发送到 Kafka 服务,这是一个高可扩展性、容错的流平台,使用发布/订阅模式来处理大量数据。我们还使用了 Streaming Analytics 服务,它将从 Kafka 消费数据并通过调用 Watson NLU 服务来丰富数据。两个服务都可以在 IBM Cloud 上使用。

    注意

    注意:我们可以使用其他开源框架来处理流数据,例如 Apache Flink(flink.apache.org)或 Apache Storm(storm.apache.org)。

  • 在当前实现中,数据以 CSV 文件形式存储,我们使用输出目录作为源创建一个 Spark Streaming DataFrame。这个步骤会消耗 Notebook 和本地环境的时间和资源。相反,我们可以让 Streaming Analytics 将丰富后的事件写回到不同的主题,并创建一个以 Message Hub 服务作为 Kafka 输入源的 Spark Streaming DataFrame。

下图展示了我们示例应用程序的更新架构:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_07_14.jpg

使用 Kafka 和 Streams Designer 扩展架构

在接下来的几个部分中,我们将实现更新后的架构,首先将推文流式传输到 Kafka。

将原始推文流式传输到 Kafka

在 IBM Cloud 上配置 Kafka / Message Hub 服务实例的过程与我们配置 Watson NLU 服务时的步骤相同。首先,我们在目录中找到并选择该服务,选择定价计划后点击 创建。然后,我们打开服务仪表板,选择 服务凭证 标签以创建新的凭证,如下图所示:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_07_15.jpg

为 Message Hub 服务创建新的凭证

与 IBM Cloud 上的所有服务一样,凭证以 JSON 对象的形式提供,我们需要将其存储在 Notebook 中的一个变量里,代码如下所示(同样,别忘了将 XXXX 替换为您的用户名和服务凭证中的密码):

message_hub_creds = {
  "instance_id": "XXXXX",
  "mqlight_lookup_url": "https://mqlight-lookup-prod02.messagehub.services.us-south.bluemix.net/Lookup?serviceId=XXXX",
  "api_key": "XXXX",
  "kafka_admin_url": "https://kafka-admin-prod02.messagehub.services.us-south.bluemix.net:443",
  "kafka_rest_url": "https://kafka-rest-prod02.messagehub.services.us-south.bluemix.net:443",
  "kafka_brokers_sasl": [
    "kafka03-prod02.messagehub.services.us-south.bluemix.net:9093",
    "kafka01-prod02.messagehub.services.us-south.bluemix.net:9093",
    "kafka02-prod02.messagehub.services.us-south.bluemix.net:9093",
    "kafka05-prod02.messagehub.services.us-south.bluemix.net:9093",
    "kafka04-prod02.messagehub.services.us-south.bluemix.net:9093"
  ],
  "user": "XXXX",
  "password": "XXXX"
}

注意

您可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode27.py

关于与 Kafka 的接口,我们可以选择多个优秀的客户端库。我尝试了很多,但最终我使用得最多的是kafka-pythongithub.com/dpkp/kafka-python),它的优势是纯 Python 实现,因此更容易安装。

要从 Notebook 安装它,请使用以下命令:

!pip install kafka-python

:像往常一样,在安装任何库之后,不要忘记重启内核。

kafka-python库提供了一个KafkaProducer类,用于将数据作为消息写入服务,我们需要用之前创建的凭证来配置它。Kafka 有多个配置选项,涵盖所有选项超出了本书的范围。所需的选项与身份验证、主机服务器和 API 版本相关。

以下代码实现了RawTweetsListener类的__init__构造函数。它创建了一个KafkaProducer实例并将其存储为类变量:

[[RawTweetsListener]]
context = ssl.create_default_context()
context.options &= ssl.OP_NO_TLSv1
context.options &= ssl.OP_NO_TLSv1_1
kafka_conf = {
    'sasl_mechanism': 'PLAIN',
    'security_protocol': 'SASL_SSL',
    'ssl_context': context,
    "bootstrap_servers": message_hub_creds["kafka_brokers_sasl"],
    "sasl_plain_username": message_hub_creds["user"],
    "sasl_plain_password": message_hub_creds["password"],
    "api_version":(0, 10, 1),
    "value_serializer" : lambda v: json.dumps(v).encode('utf-8')
}
self.producer = KafkaProducer(**kafka_conf)

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode28.py

我们为value_serializer键配置了一个 lambda 函数,用于序列化 JSON 对象,这是我们将用于数据的格式。

:我们需要指定api_version键,否则库会尝试自动发现其值,这会导致由于kafka-python库中的一个 bug(只在 Mac 上可复现)引发NoBrokerAvailable异常。编写本书时,尚未提供该 bug 的修复。

现在,我们需要更新on_data方法,通过使用tweets主题将推文数据发送到 Kafka。Kafka 主题就像一个频道,应用程序可以发布或订阅它。在尝试向主题写入之前,确保该主题已经创建,否则会引发异常。此操作在以下ensure_topic_exists方法中完成:

import requests
import json

def ensure_topic_exists(topic_name):
    response = requests.post(
 message_hub_creds["kafka_rest_url"] +
 "/admin/topics",
 data = json.dumps({"name": topic_name}),
 headers={"X-Auth-Token": message_hub_creds["api_key"]}
 )
    if response.status_code != 200 and \
       response.status_code != 202 and \
       response.status_code != 422 and \
       response.status_code != 403:
        raise Exception(response.json())

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode29.py

在前面的代码中,我们向路径/admin/topic发出了一个 POST 请求,载荷为包含我们想要创建的主题名称的 JSON 数据。请求必须使用凭证中提供的 API 密钥和X-Auth-Token头进行身份验证。我们还确保忽略 HTTP 错误码 422 和 403,它们表示该主题已经存在。

on_data方法的代码现在看起来简单得多,如下所示:

[[RawTweetsListener]]
def on_data(self, data):
    self.tweet_count += 1
 self.producer.send(
 self.topic,
 {key:transform(key,value) \
 for key,value in iteritems(json.loads(data)) \
 if key in fieldnames}
 )
    return True

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode30.py

如我们所见,通过这段新代码,我们在on_data方法中所花费的时间最少,这是我们想要实现的目标。推文数据现在正在流入 Kafka 的tweets主题,准备通过我们将在下一节讨论的流式分析服务进行丰富化。

使用流式分析服务丰富推文数据

在这一步,我们需要使用 Watson Studio,这是一个集成的基于云的 IDE,提供多种数据处理工具,包括机器学习/深度学习模型、Jupyter Notebooks、流式数据流等。Watson Studio 是 IBM Cloud 的一个配套工具,可以通过datascience.ibm.com访问,因此无需额外注册。

登录到 Watson Studio 后,我们创建一个新的项目,命名为Thoughtful Data Science

注意

注意:创建项目时,选择默认选项是可以的。

然后,我们进入设置标签页创建一个流式分析服务,它将成为驱动我们丰富化过程的引擎,并将其与项目关联。请注意,我们也可以像为本章中其他服务一样,在 IBM Cloud 目录中创建该服务,但由于我们仍然需要将其与项目关联,最好也在 Watson Studio 中进行创建。

设置标签页中,我们向下滚动到关联服务部分,点击添加服务下拉菜单,选择流式分析。在接下来的页面中,您可以选择现有新建。选择新建并按照步骤创建服务。创建完成后,新创建的服务应已与项目关联,如下图所示:

注意

注意:如果有多个免费选项,可以任选其一。

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_07_16.jpg

将流式分析服务与项目关联

现在我们准备创建定义推文数据丰富处理的流式数据流。

我们进入资源标签页,向下滚动到流式数据流部分,点击新建流式数据流按钮。在接下来的页面中,我们为其命名,选择流式分析服务,选择手动并点击创建按钮。

现在我们在流式设计器中,它由左侧的操作符调色板和一个可以用来图形化构建流式数据流的画布组成。对于我们的示例应用程序,我们需要从调色板中选择三个操作符并将它们拖放到画布上:

  • 调色板中的源部分的消息中心:我们数据的输入源。进入画布后,我们将其重命名为Source Message Hub(通过双击进入编辑模式)。

  • 处理和分析部分的代码:它将包含调用 Watson NLU 服务的数据丰富化 Python 代码。我们将操作符重命名为Enrichment

  • 来自调色板的目标部分中的 Message Hub:丰富数据的输出源。我们将其重命名为目标 Message Hub

接下来,我们创建源 Message Hub丰富之间,以及丰富目标 Message Hub之间的连接。要创建两个操作符之间的连接,只需将第一个操作符末尾的输出端口拖动到另一个操作符的输入端口。请注意,源操作符右侧只有一个输出端口,表示它仅支持外部连接,而目标操作符左侧只有一个输入端口,表示它仅支持内部连接。处理与分析部分的任何操作符都有左右两个端口,因为它们同时接受和发送连接。

以下截图显示了完整的画布:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_07_17.jpg

推文丰富流处理

现在让我们看一下这三个操作符的配置。

注意

注意:要完成此部分,请确保运行生成主题的代码,并将其发送到我们在前一部分讨论过的 Message Hub 实例。否则,Message Hub 实例将为空,且无法检测到任何模式。

点击源 Message Hub。右侧会出现一个动画窗格,提供选择包含推文的 Message Hub 实例的选项。第一次使用时,您需要创建与 Message Hub 实例的连接。选择tweets作为主题。点击编辑输出模式,然后点击检测模式,以从数据中自动填充模式。您还可以使用显示预览按钮预览实时流数据,如下图所示:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_07_18.jpg

设置模式并预览实时流数据

现在选择代码操作符,执行调用 Watson NLU 的代码。右侧的动画上下文窗格包含一个 Python 代码编辑器,其中包含所需实现的模板代码,分别是init(state)process(event, state)函数。

init方法中,我们实例化了NaturalLanguageUnderstandingV1实例,如下代码所示:

import sys
from watson_developer_cloud import NaturalLanguageUnderstandingV1
from watson_developer_cloud.natural_language_understanding_v1 import Features, SentimentOptions, EntitiesOptions

# init() function will be called once on pipeline initialization
# @state a Python dictionary object for keeping state. The state object is passed to the process function
def init(state):
    # do something once on pipeline initialization and save in the state object
 state["nlu"] = NaturalLanguageUnderstandingV1(
 version='2017-02-27',
 username='XXXX',
 password='XXXX'
 )

注意

您可以在此处找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode31.py

注意:我们需要通过位于 Python 编辑器窗口上方的Python 包链接安装Watson_developer_cloud库,如下图所示:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_07_19.jpg

将 watson_cloud_developer 包添加到流处理中

每次事件数据都会调用该过程方法。我们使用它来调用 Watson NLU,并将额外的信息添加到事件对象中,如下代码所示:

# @event a Python dictionary object representing the input event tuple as defined by the input schema
# @state a Python dictionary object for keeping state over subsequent function calls
# return must be a Python dictionary object. It will be the output of this operator.
# Returning None results in not submitting an output tuple for this invocation.
# You must declare all output attributes in the Edit Schema window.
def process(event, state):
    # Enrich the event, such as by:
    # event['wordCount'] = len(event['phrase'].split())
    try:
        event['text'] = event['text'].replace('"', "'")
 response = state["nlu"].analyze(
 text = event['text'],
 features=Features(sentiment=SentimentOptions(), entities=EntitiesOptions())
 )
        event["sentiment"] = response["sentiment"]["document"]["label"]
        top_entity = response["entities"][0] if len(response["entities"]) > 0 else None
        event["entity"] = top_entity["text"] if top_entity is not None else ""
        event["entity_type"] = top_entity["type"] if top_entity is not None else ""
    except Exception as e:
        return None
 return event

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode32.py

注意:我们还必须通过使用编辑输出架构链接声明所有输出变量,如下截图所示:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_07_20.jpg

声明所有输出变量用于代码操作符

最后,我们配置目标 Message Hub 以使用enriched_tweets主题。请注意,首次需要手动创建该主题,方法是进入 IBM Cloud 上的 Message Hub 实例的仪表板并点击添加主题按钮。

然后我们使用主工具栏中的保存按钮保存流。流中的任何错误,无论是代码中的编译错误、服务配置错误还是其他任何错误,都将在通知面板中显示。在确保没有错误后,我们可以使用运行按钮运行流,该按钮将带我们进入流数据监控屏幕。此屏幕由多个面板组成。主面板显示不同的操作符,数据以小球的形式流动在操作符之间的虚拟管道中。我们可以点击管道,在右侧面板中显示事件负载。这对于调试非常有用,因为我们可以可视化数据如何在每个操作符中进行转换。

注意

注意:Streams Designer 还支持在代码操作符中添加 Python 日志消息,然后可以将其下载到本地机器进行分析。你可以在这里了解更多关于此功能的信息:

dataplatform.cloud.ibm.com/docs/content/streaming-pipelines/downloading_logs.html

下图显示了流式数据监控屏幕:

https://github.com/OpenDocCN/freelearn-ds-zh/raw/master/docs/ds-py/img/B09699_07_21.jpg

Twitter 情感分析流数据的实时监控屏幕

现在,我们的丰富推文数据已经通过enriched_tweets主题流入 Message Hub 实例。在下一节中,我们将展示如何使用 Message Hub 实例作为输入源创建 Spark Streaming DataFrame。

使用 Kafka 输入源创建 Spark Streaming DataFrame

在最后一步中,我们创建一个 Spark Streaming DataFrame,它从 enriched_tweets Kafka 主题中消费经过增强的推文,这个主题属于 Message Hub 服务。为此,我们使用内置的 Spark Kafka 连接器,并在 subscribe 选项中指定我们想要订阅的主题。同时,我们还需要在 kafka.bootstrap.servers 选项中指定 Kafka 服务器的列表,这些信息通过读取我们之前创建的全局 message_hub_creds 变量来获取。

注意

注意:你可能已经注意到,不同的系统为此选项使用不同的名称,这使得它更容易出错。幸运的是,如果拼写错误,异常将显示一个明确的根本原因信息。

上述选项是针对 Spark Streaming 的,我们仍然需要配置 Kafka 凭证,以便较低级别的 Kafka 消费者可以与 Message Hub 服务进行正确的身份验证。为了正确地将这些消费者属性传递给 Kafka,我们不使用 .option 方法,而是创建一个 kafka_options 字典,并将其作为参数传递给加载方法,代码如下所示:

def start_streaming_dataframe():
    "Start a Spark Streaming DataFrame from a Kafka Input source"
    schema = StructType(
        [StructField(f["name"], f["type"], True) for f in field_metadata]
    )
 kafka_options = {
 "kafka.ssl.protocol":"TLSv1.2",
 "kafka.ssl.enabled.protocols":"TLSv1.2",
 "kafka.ssl.endpoint.identification.algorithm":"HTTPS",
 'kafka.sasl.mechanism': 'PLAIN',
 'kafka.security.protocol': 'SASL_SSL'
 }
    return spark.readStream \
        .format("kafka") \
 .option("kafka.bootstrap.servers", ",".join(message_hub_creds["kafka_brokers_sasl"])) \
 .option("subscribe", "enriched_tweets") \
 .load(**kafka_options)

注意

你可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode33.py

你可能认为代码到此为止就完成了,因为 Notebook 的其他部分应该与 第三部分 – 创建实时仪表板 PixieApp 一致。这个想法是正确的,直到我们运行 Notebook 并开始看到 Spark 抛出异常,提示 Kafka 连接器无法找到。这是因为 Kafka 连接器并不包含在 Spark 的核心发行版中,必须单独安装。

不幸的是,这类基础设施层面的问题并不直接与手头的任务相关,然而它们经常发生,我们最终花费大量时间去修复它们。在 Stack Overflow 或其他技术网站搜索通常能够快速找到解决方案,但有时答案并不显而易见。在这种情况下,由于我们是在 Notebook 中运行,而不是在 spark-submit 脚本中运行,因此没有太多现成的帮助,我们只能自己尝试直到找到解决方法。要安装 spark-sql-kafka,我们需要编辑本章前面讨论过的 kernel.json 文件,并将以下选项添加到 "PYSPARK_SUBMIT_ARGS" 项中:

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0

当内核重启时,这个配置将自动下载依赖并将其缓存到本地。

现在应该可以正常工作了吧?嗯,暂时还不行。我们仍然需要配置 Kafka 的安全性,以使用我们 Message Hub 服务的凭证,而该服务使用 SASL 作为安全协议。为此,我们需要提供一个JAAS(即Java 认证和授权服务)配置文件,其中包含服务的用户名和密码。Kafka 的最新版本提供了一种灵活的机制,允许使用名为sasl.jaas.config的消费者属性以编程方式配置安全性。不幸的是,Spark 的最新版本(截至写作时为 2.3.0)尚未更新为 Kafka 的最新版本。因此,我们必须退回到另一种配置 JAAS 的方式,即设置一个名为java.security.auth.login.config的 JVM 系统属性,并指向一个jaas.conf配置文件的路径。

我们首先在选择的目录中创建jaas.conf文件,并将以下内容添加到其中:

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
 username="XXXX"
 password="XXXX";
};

在上述内容中,将XXXX替换为从 Message Hub 服务凭证中获得的用户名和密码。

然后,我们将以下配置添加到kernel.json中的"PYSPARK_SUBMIT_ARGS"条目:

--driver-java-options=-Djava.security.auth.login.config=<<jaas.conf path>>

作为参考,这里是一个包含这些配置的示例kernel.json

{
 "language": "python",
 "env": {
  "SCALA_HOME": "/Users/dtaieb/pixiedust/bin/scala/scala-2.11.8",
  "PYTHONPATH": "/Users/dtaieb/pixiedust/bin/spark/spark-2.3.0-bin-hadoop2.7/python/:/Users/dtaieb/pixiedust/bin/spark/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip",
  "SPARK_HOME": "/Users/dtaieb/pixiedust/bin/spark/spark-2.3.0-bin-hadoop2.7",
  "PYSPARK_SUBMIT_ARGS": "--driver-java-options=-Djava.security.auth.login.config=/Users/dtaieb/pixiedust/jaas.conf --jars /Users/dtaieb/pixiedust/bin/cloudant-spark-v2.0.0-185.jar --driver-class-path /Users/dtaieb/pixiedust/data/libs/* --master local[10] --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell",
  "PIXIEDUST_HOME": "/Users/dtaieb/pixiedust",
  "SPARK_DRIVER_MEMORY": "10G",
  "SPARK_LOCAL_IP": "127.0.0.1",
  "PYTHONSTARTUP": "/Users/dtaieb/pixiedust/bin/spark/spark-2.3.0-bin-hadoop2.7/python/pyspark/shell.py"
 },
 "display_name": "Python with Pixiedust (Spark 2.3)",
 "argv": [
  "python",
  "-m",
  "ipykernel",
  "-f",
  "{connection_file}"
 ]
}

注意

您可以在这里找到代码文件:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode34.json

注意:我们在修改kernel.json时,应该始终重启 Notebook 服务器,以确保所有新配置能够正确重新加载。

其余的 Notebook 代码没有变化,PixieApp 仪表盘应该依然能够正常工作。

注意

我们现在已经完成了示例应用的第四部分;您可以在这里找到完整的笔记本:

github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/Twitter%20Sentiment%20Analysis%20-%20Part%204.ipynb

在本节末尾我们编写的额外代码提醒我们,与数据打交道的旅程永远不会是一条直线。我们必须准备好应对不同性质的障碍:可能是依赖库中的错误,或外部服务的限制。克服这些障碍不必让项目停滞太久。由于我们主要使用开源组件,我们可以借助像 Stack Overflow 这样的社交网站上志同道合的开发者社区,获取新的想法和代码示例,并在 Jupyter Notebook 中快速实验。

总结

在本章中,我们构建了一个数据管道,用于分析包含非结构化文本的大量流数据,并应用来自外部云服务的 NLP 算法来提取情感和文本中发现的其他重要实体。我们还构建了一个 PixieApp 仪表板,显示从推文中提取的实时指标和洞察。我们还讨论了多种分析大规模数据的技术,包括 Apache Spark 结构化流处理、Apache Kafka 和 IBM Streaming Analytics。像往常一样,这些示例应用程序的目标是展示如何构建数据管道,特别关注如何利用现有框架、库和云服务的可能性。

在下一章中,我们将讨论时间序列分析,这是另一个具有广泛行业应用的数据科学话题,我们将通过构建一个金融投资组合分析应用程序来说明这一点。

Logo

更多推荐