如何将 Apache Airflow 用于机器学习工作流
Apache Airflow 是一个强大的工具,用于创建、调度和监控工作流,但它是为 ETL 任务而构建的。机器学习任务需要特定的资源,并且它们的执行细节应该是版本控制的。如果您有资源来维护 Kubernetes 集群,您可以使用 KubernetesPodOperator 扩展机器学习任务。如果您想专注于构建模型,您可以使用为机器学习任务扩展 Airflow。这样,您还将获得每次执行的自动版本控
Apache Airflow 是一个流行的平台,用于在 Python 中创建、调度和监控工作流。 它在 Github 上有超过 15,000 颗星,被 Twitter、Airbnb 和 Spotify 等公司的数据工程师使用。
如果您使用的是 Apache Airflow,那么您的架构可能已经根据任务数量及其要求进行了演变。 在 Skillup.co 工作时,我们首先有几百个 DAG 来执行我们所有的数据工程任务,然后我们开始做机器学习。
我们想继续使用 Airflow 来编排机器学习管道,但我们很快意识到我们需要一个解决方案来远程执行机器学习任务。
在本文中,我们将看到:
- 在 Airflow 中扩展工作节点的不同策略。
- 机器学习任务与传统 ETL 管道有何不同。
- 如何在云端轻松执行 Airflow 任务。
- 如何获得每个机器学习任务的自动版本控制。
使用执行器扩展 Apache Airflow
Apache Airflow 具有基于调度程序、工作节点、元数据数据库、Web 服务器和队列服务的多节点架构。
使用 Airflow 时的首要选择之一是执行器的类型。 执行程序与调度程序通信,以便在每个任务排队时为其分配资源。 执行器之间的差异归结为他们可用的资源。
Airflow 配置示例如下:
顺序执行器
默认执行器可以轻松地在本地测试 Airflow。 它在一台机器上按顺序运行任务,并使用 SQLite 存储任务的元数据。
本地执行者
本地执行器可以并行运行任务,并且需要像 PostgreSQL 这样支持并行的数据库。 虽然您可以在生产环境中运行本地执行器,但通常迁移到 Celery 执行器以提高可用性和可扩展性。
Celery 执行者
Celery 执行器需要设置 Redis 或 RabbitMQ 来将消息分发给工作人员。 Airflow 然后将任务分配给可以在一台或多台机器上运行的 Celery workers。 我们在 Skillup.co 使用的执行器,能够运行多达 256 个并发数据工程任务。
Kubernetes 执行器
Kubernetes 执行器为每个任务实例创建一个新的 pod。 它允许您根据任务要求动态扩展和缩减。
使用 operators 扩展 Apache Airflow
另一种扩展 Airflow 的方法是使用 operators 远程执行一些任务。 2018 年,Jessica Laughlin 认为我们都在错误地使用 Airflow,正确的方法是只使用 Kubernetes Operator。她认为应该有一个没有错误的 Operator 来执行任何任意任务,而不是越来越多的特定于功能的 Operators。
Kubernetes Operator
Kubernetes Operator 将在新 pod 中启动任务。当您有一组需要定期运行的任务时,我发现将 Kubernetes Operator 仅用于具有特定要求的任务是一个更好的主意。
我看到 Kubernetes Operator 的主要问题是您仍然需要了解 Kubernetes 配置系统并设置集群。例如,Dailymotion 在 Google Kubernetes Engine 上的集群中部署了 Airflow,并决定使用 KubernetesPodOperator 扩展 Airflow 以执行机器学习任务。
在我们的案例中,我们是一个小型数据团队,几乎没有资源来设置 Kubernetes 集群。我们希望专注于构建机器学习模型,而不是管理基础设施。
机器学习任务与 ETL 任务有何不同?
在 Skillup.co,我们作为一个小团队必须在一年内构建和部署多个数据产品。 我们知道我们想使用开源库构建我们的模型,从经典机器学习模型到深度学习。 我们还在寻找一个机器学习平台来帮助我们对所有模型进行扩展和版本控制。
Airflow 可以很好地跟踪元数据数据库中的每个任务细节,但机器学习任务与 ETL 任务有不同的要求。 机器学习任务与数据、代码、环境、参数和指标相关联。 Airflow 不会收集和显示该信息。 而 Kubernetes 只在基础设施方面为您提供帮助。
在一个地方收集每次执行的所有相关信息有助于调试机器学习模型。 在下表中,您可以看到我们为更快地迭代机器学习模型而跟踪的信息。
我们如何选择用于扩展机器学习任务
您已经可以在 Google DataFlow、Amazon SageMaker 和 Databricks 等机器学习平台找到多个 Airflow operators。 这些 operators 的问题在于它们都有不同的规范,并且仅限于在这些平台上执行代码。
在我们开始在 Skillup.co 进行任何机器学习之前,我们将 Airflow 用于所有数据工程,这些数据工程主要由 Airflow BashOperator 调用的 Python CLI 组成。
然后我们决定使用基于开放标准的机器学习平台 Valohai 来帮助我们远程启动机器学习任务并获得自动版本控制。
有了混合解决方案,我们可以在 Airflow 安装中保留敏感数据,并将机器学习委托给 Valohai。
下图是使用 Airflow DAG 的机器学习工作流程。由于 Valohai 的原因,蓝色任务可以远程执行。
感谢 Valohai 的开放 API,我们开发了开源的 airflow-valohai-plugin 来集成两个平台。去年,我们用它在生产中发布了四个机器学习模型。
Valohai Operator
Valohai 算子背后的想法类似于 Kubernetes 算子。优点是您不需要了解 Kubernetes,并且您还可以获得机器学习的自动版本控制。
Valohai 将根据您的要求、代码和数据负责启动和停止云实例。 Valohai operator 只需在 Docker 容器中执行命令,轮询是否完成并返回最终状态代码。
通过提供 Docker 镜像和代码存储库,您可以执行任何语言和库的代码。您还可以访问 AWS、Google 和 Azure 中的 50 多个云环境。
要在 Airflow 上创建任务,您只需要指定 Valohai 项目和要执行的步骤。如果需要,您还可以覆盖默认的云环境、输入和参数。
from airflow.operators.valohai import ValohaiSubmitExecutionOperator
train = ValohaiSubmitExecutionOperator(
task_id='train_model',
# Specify the project and step from Valohai
project_name='tensorflow-example',
step='Train model (MNIST)',
# Override defaults
environment='aws-eu-west-1-g3-4xlarge',
inputs={
'training-set-images': ValohaiSubmitExecutionOperator.get_output_uri(
task=preprocess,
name='mnist-train-images.gz'),
'training-set-labels': ValohaiSubmitExecutionOperator.get_output_uri(
task=preprocess,
name='mnist-train-labels.gz'),
'test-set-images': ValohaiSubmitExecutionOperator.get_output_uri(
task=preprocess,
name='mnist-test-images.gz'),
'test-set-labels': ValohaiSubmitExecutionOperator.get_output_uri(
task=preprocess,
name='mnist-test-labels.gz'),
},
parameters={
'dropout': 0.9,
'learning_rate': 0.001,
'max_steps': 300,
'batch_size': 200,
},
# Associate a task to a previously created DAG
dag=dag,
)
# Set dependencies between tasks
preprocess >> train
上面从 Airflow 源 向 Valohai 提交执行的示例代码。
另一方面,您需要通过创建 valohai.yaml 在 Valohai 端进行一些轻型配置。 valohai.yaml
用作配置文件,用于设置默认值并验证机器环境、docker 镜像、运行命令、参数和每次执行的输入数据文件。
从一开始就有了机器版本控制,这有助于我们调试数据、代码和参数,从而实现预测,并更快地修复预测。就像您希望您的 Airflow 任务是幂等的以避免重新启动它们时产生副作用一样,您希望您的机器学习模型基于代码和数据的审计版本。如果您总是根据存储在数据湖中的文件训练模型,这很容易做到。下面您可以在 Valohai UI 中看到已解析的配置以执行。
Valohai 执行详细信息 UI 如下:
Valohai 构建在两种智能选择之上,这两种选择使其易于与任何编程语言和库集成。
首先,选择适用于所有编程语言的 CLI 优先的接口。 CLI 是一种流行的接口,用于包装函数以在本地执行它们。 CLI 也是 Bash、Kubernetes 和 Valohai operators 的接口。
其次,从标准输出中收集执行指标,而不必为每种语言安装自定义库。 所有语言都有将 JSON 对象写入标准输出的工具。 Valohai 将自动解析该对象,例如,帮助您比较每个模型的准确性。
具有机器学习执行参数和准确性的 Valohai UI 如下所示:
您还可以在 Valohai UI 中手动启动执行,而不会产生任何副作用。在 Airflow 中,清除任务的状态将触发下游任务。
最后但同样重要的是,新的 Valohai operator 让您可以轻松地将一个执行的输出作为下一个执行的输入传递。这帮助我们创建了数据在 S3 上自动进行版本控制的管道。此外,每个新的执行都在与 S3 存储桶相同的云提供商和区域上运行,这使得 Valohai 可以快速将其下载到 AWS EC2 实例上。
总结
Apache Airflow 是一个强大的工具,用于创建、调度和监控工作流,但它是为 ETL 任务而构建的。机器学习任务需要特定的资源,并且它们的执行细节应该是版本控制的。
如果您有资源来维护 Kubernetes 集群,您可以使用 KubernetesPodOperator 扩展机器学习任务。
如果您想专注于构建模型,您可以使用 ValohaiSubmitExecutionOperator 为机器学习任务扩展 Airflow。这样,您还将获得每次执行的自动版本控制。
更多推荐
所有评论(0)