使用 Apache NiFi 创建云数据库迁移工作流

我们要达到什么目的

Apache NiFi (“nye fye”) 是 Apache 软件基金会的一个项目,旨在以工作流的形式自动化系统之间的数据流。它支持强大且可扩展的数据路由、转换和系统中介逻辑的有向图。我们将利用这些功能,以高度可扩展的工作流的形式整合 MySQL 数据库迁移到云(在本例中为 Azure)的所有手动、繁琐和容易出错的步骤。

从历史上看,NiFi 基于“NiagaraFiles”,这是一个由美国国家安全局 (NSA) 开发的系统。它在 2014 年作为 NSA 技术转让计划的一部分开源。

我们需要什么

对于本实验,我们将需要以下内容:

  • 微软 Azure 订阅

在本文中,我随机选择使用 Microsoft Azure 作为我的目标云环境。您当然可以使用您选择的任何其他提供商 AWS、Open Telekom Cloud、Hetzner 等,并根据每个平台的细微差别调整配置步骤。主要的事情是我们需要一个位于云中的 MySQL 实例,这与提供者无关。

  • 1 Azure Database for MySQL — Basic,1 个 vCore,50 GB 即可。

  • 1 MySQL 服务器作为 docker 容器在本地运行。

  • 1 Apache NiFi 服务器在本地作为 docker 容器运行。

让我们开始供应和配置我们的资源。

预配 Azure Database for MySQL

打开您的 Azure 门户并导航到 Marketplace。搜索可用的 MySQL 产品并选择通常位于列表中的第一个:Azure Database for MySQL。选择单台服务器而不是灵活的服务器——我们的小实验室不需要那些额外的好东西。

Azure 的 Marketplace 登陆刀片

填写服务器所需的设置,确保将版本设置为 8.0,并将计算和存储计划从 General Purpose 切换为 Basic、1 vCores、50GB 以节省一些现金。然后填写您的凭据。

填写 Azure 数据库服务器所需的信息

更改您的价格等级(如果您愿意)以降低成本。

只要您的资源已部署,请打开其刀片并复制服务器的名称。

! zoz100037](https://devpress-image.s3.cn-north-1.jdcloud-oss.com/a/fec111d1c9_1*FGhJj0ZuHOLrOXBziOgouA.jpg)

为您当前的 IP 地址设置防火墙通过规则 - 在连接安全部分下:

对于这个,您只需编写一个脚本,每天自动更新您的 IP 地址。

现在我们准备测试与我们的 MySQL 服务器的连接。打开您的偏好工具并输入我们迄今为止收集的连接详细信息。我个人使用免费版的 TablePlus,但不要将此作为推荐。如果建立了您的联系,我们可以称其为成功并进入我们实验室的下一个难题。

适用于 macOS 的 TablePlus

在创建函数、触发如下或导入架构时可能会出现上述错误。

像 CREATE FUNCTION 或 CREATE TRIGGER 这样的 DDL 语句被写入二进制日志,因此辅助副本可以执行它们。副本 SQL 线程具有完全权限,可用于提升权限。为了防止启用了二进制日志记录的服务器出现这种危险,MySQL 引擎要求存储函数创建者除了通常的 CREATE ROUTINE 特权之外还具有 SUPER 特权。在创建函数、触发如下或导入模式时,这可能会导致以下错误:

错误 1419:您没有超级权限并且启用了二进制日志记录(您可能希望使用不太安全的 log_bin_trust_function_creators 变量)

要解决此错误,请将门户中的服务器参数刀片中的log_bin_trust_function_creators设置为 1,暂时仅用于迁移:

提供本地 MySQL 数据库

本指南绝不是关于我们应该如何配置 MySQL 实例或集群的最佳实践集合。我以最快的方式运行一些服务器来达到我的目的。

我选择将我的本地 MySQL 服务器(它将作为迁移的源)部署为 docker 容器。为了旋转已经填充了数据的东西,我选择了一个有用的图像 genschsa/mysql-employees,我在 docker hub 中找到了它,它部署了一个 MySQL 实例以及一个预先填充的“Employees”示例数据库。

码头工人运行-d\

--name mysql-员工\

-p 3306:3306\

-e MYSQL\ROOT\PASSWORDu003d**{{PASSWORD}}** \

-v ~/mysql/data:/var/lib/mysql \

genschsa/mysql-员工

使用上面的指令从这个图像创建一个 docker 容器,在你用你的偏好和本地路径替换 root 密码后,将用作 MySQL 数据文件的卷。使用您的凭据登录此数据库实例,您应该会看到几个表、视图和函数:

genschsa/mysql-employees映像中的示例员工数据库。

配置 Apache NiFi 服务器

本文不是 Apache NiFi 的入门读物,同时它也不需要您有任何 Apache NiFi 经验。您可以按照指南一步一步地构建迁移工作流程,而无需任何先验知识。

如果您有兴趣了解 NiFi,可以访问该项目的官方页面:

[

Apache NiFi

一个易于使用、功能强大、可靠的数据处理和分发系统。 Apache NiFi 支持强大且...

nifi.apache.org

](https://nifi.apache.org)

或者这篇文章在向初学者解释 Apache NiFi 方面做得非常好:

[

在 Docker 容器上设置 Apache Nifi

Apache Nifi 是一个简单但强大的数据处理工具。您想使用它但不知道从哪里开始?本指南...

媒体网

](https://medium.com/analytics-vidhya/setting-apache-nifi-on-docker-containers-a00e862a8399)

我们将使用以下命令将 Apache NiFi 服务器部署为 docker 容器(首先将所需的凭据替换为您选择的凭据):

docker run --name nifi \

-p 8443:8443\

-d\

-e SINGLE_USER_CREDENTIALS_USERNAMEu003d**{{USERNAME}}** \

-e SINGLE_USER_CREDENTIALS_PASSWORDu003d**{{PASSWORD}}** \

阿帕奇/尼菲:最新

然后打开浏览器并导航到以下 URL 地址:

https://localhost:8443/nifi/

输入您的凭据,您将进入一个空的工作流程画布:

创建迁移工作流

1.添加一个_GenerateFlowFile_类型的Processor作为我们工作流的入口点(如下图所示):

2\。添加一个 ExecuteStreamCommand 类型的 Processor 作为转储和导出我们的 source 数据库的步骤 - 并称他为 ExportMysqlDump:

让我们配置我们希望这个组件执行的外部命令。

作为命令路径设置:

/usr/bin/mysqldump

并作为 Command Arguments 填写众所周知的 mysql-client 参数,但用分号分隔(用您自己的值替换突出显示的值):

-u;root;-P;3306;-h;{{HOSTNAME_OR_CONTAINER_IP}};-p**{{PASSWORD}}**;--数据库;员工;--例程;--触发器;--单一事务;--按主排序;--gtid;--force

这里有个问题,我们会在配置整个工作流程后讨论它。

通过将连接线从第一个拖到后者来连接两个处理器。您现在应该能够观察到在它们之间注入了一个 Queue 组件:

我们稍后将看到这些队列如何为工作流程做出贡献,以及我们如何使用它们来获得有用的见解或调试我们的工作流程。

3\。 Azure Database for MySQL 不允许任何用户拥有 SUPER 权限或 SET_USER_ID 权限,当您第一次尝试运行迁移工作流时,这将导致以下错误:

第 295 行的错误 1227 (42000):访问被拒绝;您需要(至少其中之一)SUPER 或 SET_USER_ID 权限才能执行此操作

在导入转储文件或运行脚本时,使用 DEFINER 语句执行 CREATE VIEW、FUNCTION、PROCEDURE、TRIGGER 或 EVENT 时可能会发生上述错误。

为了预先缓解这种情况,我们将添加第二个 _ExecuteStreamCommand 类型的 Processor。这个处理器(我们称他为 ReplaceDefinersCommand)将编辑转储文件脚本并将 DEFINER 值替换为具有管理员权限的用户,该用户是执行导入或执行脚本文件。

! zoz100076](https://devpress-image.s3.cn-north-1.jdcloud-oss.com/a/0fd47f631f_1*3EmhA4b0uyXJcVHfthgAVA.jpg)

作为命令路径设置:

sed

并作为命令参数:

-e;"s/DEFINER[ ]*u003d[ ]*[^*]*\/\/";-e;"s/DEFINER[ \ ]*u003d.*FUNCTION/FUNCTION/";-e;"s/DEFINER[ ]*u003d.*PROCEDURE/PROCEDURE/";-e;"s/DEFINER[ ]*u003d .*TRIGGER/TRIGGER/";-e;"s/DEFINER[ ]*u003d.*EVENT/EVENT/"

通过将连接线从第一个拖到第二个来连接两个 ExecuteCommandStream 处理器。您现在应该能够观察到画布上在它们之间添加了第二个 Queue 组件。

4\。添加第三个 ExecuteStreamCommand 类型的 Processor(与 ExportMysqlDump 相同的练习). 这一步会将转储导入我们的 target 数据库 - 并称他为 ImportMysqlDump。让我们配置它:

作为命令路径设置:

/usr/bin/mysql

并作为命令参数:

-u;root;-P;3306;-h;{{HOSTNAME_OR_EIP}};-p**{{PASSWORD}};**--force

通过将连接线从第一个拖到第二个,将 ReplaceDefinersCommand 与这个新的处理器连接起来。您现在应该能够观察到画布上在它们之间添加了第二个 Queue 组件:

! zwz 100085 zwz 100086 zwz 100084

重命名您的处理器,以便更直观并立即清楚工作流程的每一步是关于什么的。

5\。添加一个 LogAttribute 类型的 Processor - 工作流中的这个组件将为预定义的日志级别发出 FlowFile 的属性。

然后在 ExportMysqlDump 和 LogAttribute Processors 之间拖动一个连接,并在 Create Connection 弹出窗口中定义两个新关系,原始状态和非零状态。前者是从处理器处理的原始队列消息,后者是在工作流的此步骤期间引发的潜在错误(非零结果)。每个关系都会在工作流中注入一个专用队列。对 ReplaceDefinersCommand 处理器重复相同的步骤。

对于 ImportMySqlDump 和 LogAttribute 处理器,这一次激活所有 3 个可用的关系选项。输出流一将记录我们导入工作流步骤的成功结果。

毕竟,我们的 LogAttribute Processor 及其依赖项应该在画布上看起来像这样:

我们的工作流程现已完成。

6\。启动处理器。您会注意到画布上每个处理器的左上角都出现了一个停止标志。这意味着即使我们启动了工作流的新实例,处理器也不会执行任何命令。为了启动它们,对于它们中的每一个——除了 LogAttribute,下图中标有蓝色的启动按钮:

启动您的 ExecuteStreamCommand 处理器

我们现在准备好尝试了吗? ,如果您还记得第 2 步,我们提到有一个问题!两个 ExecuteStreamCommand 处理器将使用 mysql-client 执行远程 MySQL 实例的导出和导入,但 Apache NiFi 容器不知道这个包。所以我们必须连接到我们的容器并安装所需的客户端。

在 Apache NiFi 容器中安装 MariaDB 客户端

让我们首先以 root 身份启动一个 bash 终端:

docker exec -it -u 0 nifi /bin/bash

然后安装客户端:

apt-get 更新 -y

apt-get install -y mariadb-client

快速检查一切是否到位,为此请转到 /usr/bin/ 并确保您可以找到 mysqldumpmysql 可执行文件:

启动迁移工作流

打开 GenerateFlowFile 组件的级联菜单并单击 Run Once:

! swz 100103 swz 100104 swz 100102

从画布开始迁移工作流程

当前活动的处理器将在画布的右上角标有此标志:

! swz 100106 swz 100107 swz 100105

指示此处理器上有活动线程的标志。

让我们看看发生了什么,迁移是否通过,如果没有,我们如何调试和跟踪问题的根源。画布现在将在每个处理器和队列中使用更多数据进行更新:

! swz 100109 swz 100110 swz 100108

我们的第一次迁移尝试以及视觉元素如何告知我们每个步骤的过程和状态。

GenerateFlowFile 通知我们已向管道发送了 1 个请求(Out 1 — 在框内以蓝色标记)。 ExecuteMysqlDump 处理器成功运行并写出大小为 160.59MB 的转储。它的日志队列向我们显示,我们在 original 中有一个新条目,在 nonzero status 中有零条目。后者表示处理器运行时没有任何错误。让我们看看 original 队列中写入了什么。打开队列:

单击要检查的行中的信息标志。

在队列消息的属性下,我们可以看到处理器执行了哪个命令:

这是 ExecuteStreamCommand 处理器向源数据库发出并生成转储文件的命令。

到目前为止一切顺利,轮到第二个 ExecuteStreamCommand 处理器了,它负责将转储导入目标数据库。我们可以看到他收到了 160.59MB 的输入(即我们从前一个处理器生成的转储文件)将其写入 original 队列,但似乎迁移没有按计划进行,因为我们有 nonzero status 中的项目队列。让我们先看看 original 队列中有什么(打开 List Queue 并选择与该工作流实例对应的元素)。我们可以通过查看或下载来检查由 ExportMysqlDump 处理器移交的生成的转储文件,

! zwz 100121 zwz 100122 zwz 100120

下载处理器处理/使用的输入

或检查已执行的命令以查看是否有错误消息(在我们的示例中是错误消息):

如果您从非零状态队列中打开消息,您会发现相同的信息——只有命令及其错误被写下来,与保存消息有效负载的原始相反。

找出问题所在的更快方法是将鼠标悬停在处理器右上角的红色标志(出现错误时会出现)上:

让我们按照 Microsoft Docs 中的说明修复该问题,然后重试。

[

配置 SSL - Azure Database for MySQL

适用于:Azure Database for MySQL - 单服务器 Azure Database for MySQL 支持连接您的 Azure 数据库......

docs.microsoft.com

](https://docs.microsoft.com/en-us/azure/mysql/single-server/how-to-configure-ssl)

下载文档中提到的证书后,使用以下命令将其复制到正在运行的 Apache NiFi 容器(假设您为容器提供的名称是 nifi):

docker cp BaltimoreCyberTrustRoot.crt.pem nifi:/usr/bin

然后在 ImportDumpProcessor 的命令参数中附加以下选项:

;--ssl-ca;/usr/bin/BaltimoreCyberTrustRoot.crt.pem

上面关于 SSL 证书的部分是非常特定于 Azure 的。如果您的目标是另一个云提供商,则步骤可能会有所不同,因此在这种情况下最好查阅您的提供商的文档。

现在开始一个新的迁移实例。您将观察到 ImportMysqlDump 处理器在一段时间后处于执行模式:

! zwz 100130 zwz 100131 zwz 100129

迁移正在进行中!

一段时间后,当工作流将

  • 在任何处理器中都没有更多活动线程

  • ImportMysqlDump Processor的_outcome_队列中有一条额外的消息

  • Import MysqlDump Processor的_nonzero status_队列中没有额外的消息

检查您的数据库 - 迁移已成功完成:

我们的目标数据库现在存在于云、模式和数据中!

结论

我们以零成本在不到 15 分钟的时间内成功地将一个极易出错且要求苛刻的用例(如将 RDBMS 迁移到云)转变为高度可扩展的全自动工作流。

希望您发现这篇文章有用,请继续关注并关注我以获取更多与开源相关的主题。

Logo

华为、百度、京东云现已入驻,来创建你的专属开发者社区吧!

更多推荐