持久功能与 Apache Airflow
最近我一直在关注Apache Airflow,因为我注意到 Python 开发人员和云提供商因为支持“工作流”场景而受到了很多关注。就上下文而言,我是 Azure 上Durable Functions的创建者,我们很快将宣布适用于 Python 的 Durable Functions 全面上市 (GA)。 Airflow 和 Durable Functions 都支持在 Python 中构建工作流
最近我一直在关注Apache Airflow,因为我注意到 Python 开发人员和云提供商因为支持“工作流”场景而受到了很多关注。就上下文而言,我是 Azure 上Durable Functions的创建者,我们很快将宣布适用于 Python 的 Durable Functions 全面上市 (GA)。 Airflow 和 Durable Functions 都支持在 Python 中构建工作流,因此我认为值得进行一些调查以了解这两种技术之间的差异。这篇博文是我进行这种比较的尝试,我希望人们觉得它有些用处。
最后,我了解到 Durable Functions 和 Apache Airflow 正在尝试使用不同的方法解决不同的问题,尽管它们都支持在 Python 中实现“工作流”。主要区别在于支持的工作流类型以及 Python 在创作过程中所扮演的角色。我将在这篇文章中详细介绍这些差异。
编排器与 DAG
我发现的最重要的_技术_差异是编程模型。 Durable Functions 允许您通过编写个协调器函数来构建工作流。 Orchestrator 函数描述了操作的执行方式以及操作的执行顺序。为了说明,下面是 Python 中的一个简单的顺序编排,它使用 Python 的生成器语法依次调用三个称为“活动”的任务t1
、t2
和t3
。
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
x = yield context.call_activity("t1", None)
y = yield context.call_activity("t2", x)
z = yield context.call_activity("t3", y)
return z
main = df.Orchestrator.create(orchestrator_function)
进入全屏模式 退出全屏模式
每个yield
表达式都会导致协调器函数等待计划任务完成,然后将结果保存到局部变量以供以后使用。当以这种方式产生时,编排器功能可以从内存中卸载,并且保持工作流的进度。
编排可以采用许多不同类型的动作,包括活动功能,子处理,他们可以等待外部事件,制造[HTTTP CALL]CALL](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-http-features?tabs=python#consuming-http-apis),并使用[Durable Tirable Timerserserserserersererserersererserersererserersererserersererserers]]]]]]]]。 Orchestrator 函数还可以与称为实体函数的持久的类似actor的对象进行交互。所有这些都是使用正常的过程编码结构完成的,这意味着您可以使用编程语言功能,例如条件、循环、函数调用、通过 try/except/finally 处理异常(用于实现补偿逻辑)等。对于开发人员来说,这个模型是非常自然,甚至可以很好地扩展到非常复杂的工作流程。可靠性和分布式执行由底层框架为您处理。
Apache Airflow 编程模型非常不同,它使用更具声明性的语法来使用 Python 定义DAG(有向无环图)。为了说明,让我们再次假设我们定义了三个任务,t1
、t2
和t3
。您可以在 Airflow 中使用以下代码实现与上述类似的顺序工作流:
dag = DAG('hello_world', description='Sequential DAG',
schedule_interval='0 12 * * *',
start_date=datetime(2020, 11, 28), catchup=False)
t1 = PythonOperator(task_id='t1', dag=dag, python_callable=t1)
t2 = PythonOperator(task_id='t2', dag=dag, python_callable=t2)
t3 = PythonOperator(task_id='t3', dag=dag, python_callable=t3)
t1 >> t2 >> t3
进入全屏模式 退出全屏模式
请务必注意,Airflow Python 脚本实际上只是将 DAG 的结构指定为代码的配置文件。与普通 Python 脚本不同,您无法传递动态输入、检查输出或使用条件、循环、错误处理或语言的其他功能进行控制流。因此,最好将 Airflow DAG 创作视为恰好使用 Python 进行配置的工作流 DSL。
为了进一步说明这一点,请考虑一个常见的“批准工作流”用例。在这种情况下,有人提交了需要经理批准的采购订单。工作流等待批准并在收到批准后立即进入处理步骤。但是,如果 72 小时内未收到批准(可能批准人正在休假),则会安排上报任务来帮助解决待处理的批准。使用 Durable Functions,我们可以使用如下代码实现此工作流:
import azure.durable_functions as df
from datetime import timedelta
def orchestrator_function(context: df.DurableOrchestrationContext):
yield context.call_activity("RequestApproval", None)
# create a timer task that expires 72 hours from now
due_time = context.current_utc_datetime + timedelta(hours=72)
timeout_task = context.create_timer(due_time)
# create a task that completes when an "Approval" event is received
approval_task = context.wait_for_external_event("Approval")
# context.task_any() waits until any one task completes and returns it
winning_task = yield context.task_any([approval_task, timeout_task])
if approval_task == winning_task:
timeout_task.cancel()
yield context.call_activity("Process", approval_task.result)
else:
yield context.call_activity("Escalate", None)
main = df.Orchestrator.create(orchestrator_function)
进入全屏模式 退出全屏模式
如您所见,任务是动态调度的,具有输入和输出,输出可用于决定工作流中的下一步。另一方面,Airflow DAG 更优化用于定义不一定需要传递数据的静态数据管道。
这两种工作流创作模型之间存在重要的权衡。在任务调度是动态的 Durable Functions 的“命令式代码”模型中,您可以表达一组更大的可能工作流。但是,您必须小心确保您的协调器代码是确定性的,并且不会违反协调器代码约束。 Airflow 的“声明性代码”模型更加静态和受约束,但这些约束使得构建分析 DAG 和执行诸如创建可视化之类的工具变得更加容易(后续部分将对此进行更多介绍)。
活动与操作员
另一个重要的区别是活动和运营商的区别。两者都代表可以安排的工作流任务。在 Durable Functions 中,一切都是函数。这包括编排器以及正在编排的活动。但是,由您为每个活动功能编写业务逻辑。如前所述,Durable Functions 提供了几种低级原始任务类型,如计时器、外部事件处理程序和 HTTP 操作,但在其当前版本中不提供广泛的库“预煮”任务。
运算符 Apache Airflow 中的更像是传统的工作流操作,因为您可以从中选择一个预先存在的运算符库。最基本的是PythonOperator
,它允许您将 Python 函数作为任务运行。此外,还存在用于与外部系统交互的大量操作符,例如数据库、HTTP 端点、S3 等云数据服务等。您可以在此处找到支持的操作符的完整列表。您还可以利用 OSS 社区开发的其他算子,或通过 Airflow插件创建自己的算子。由于这个现有运算符库,许多 Airflow 创作的工作流可能根本不涉及任何自定义代码。
事件驱动与 CRON
Apache Airflow DAG 主要在预定义的 CRON 计划上触发。更多信息在这里。也可以使用 CLI 进行一次性运行。如果您需要在数据处理管道中进行回填或“追赶”执行,Airflow CRON 模型特别有用。
另一方面,Durable Functions 依赖于 Azure Functions 的事件驱动机制来触发业务流程。例如,您将创建一个客户端函数以在收到 HTTP 请求、队列消息到达或基于来自许多其他支持的触发器类型之一的事件时使用动态输入启动一个或多个编排实例,包括基于 CRON 的定时器触发事件。
自己动手与内置管理工具
我怀疑 Apache Airflow 的最大吸引力之一是允许您轻松管理和检查 DAG 的 UI 工具。您可以在此处查看 Airflow UI 屏幕截图的完整列表。在我看来,一些特别有用的视图是图形视图可以可视化您的 DAG 并实时显示其进度,以及甘特图可以显示您工作流程中每个步骤所花费的时间。正如我之前提到的,DAG 的静态特性使得构建用于管理和监控目的的可视化工具变得很容易。您可以在此处](https://www.youtube.com/watch?v=6eNiCLanXJY)找到展示其中一些功能[的精彩视频。
Durable Functions 通过HTTP和语言特定的 SDK API公开一组管理 API。但是,没有用于进行这种可视化的内置工具(尽管有一些3rd 方工具可用)。在监控方面,Durable Functions 向Application Insights发出详细的遥测数据,这非常强大,但在创建警报、可视化等方面有自己的学习曲线。有关 Durable Functions 的诊断和监控的更多信息可以在这里找到.
无服务器与虚拟机
如果此时您认为 Durable Functions 和 Apache Airflow 都可能满足您的需求,那么另一个需要考虑的重要区别是支持的托管环境的范围。 Durable Functions 的显着优势在于它可以在完全无服务器的环境中运行(通过Azure Functions)。这意味着无需设置服务器或虚拟机,无需配置数据库,横向扩展是自动且弹性的,您只需在工作完成时付费。当您的工作量非常轻时,这通常会转化为每月几美分或更少。
另一方面,Apache Airflow 管理其他云中可用的产品,包括Astronomer.io、Google Cloud Composer和Amazon Managed Workflows for Apache Airflow (MWAA)。这些托管产品提供基础架构的自动扩展和管理,但您仍然使用按小时 VM 计费模型。在撰写本文时,无论您执行多少工作流程,这些云上的_最便宜_配置每月的成本都在 100 美元到 400 美元之间。
结论
我在检查 Apache Airflow 时得出的最重要结论是,它是专门为静态数据处理管道设计和优化的。如果您需要构建具有一组已知步骤和依赖项的 ETL 管道,那么 Airflow 可能是一个不错的选择,因为 Airflow 社区提供了各种内置工具和大量插件。
但是,如果您正在寻找一个支持更广泛的工作流原语和场景的平台,并且如果您需要代码优先编排提供的灵活性,那么像 Durable Functions 之类的东西可能更合适。
更多推荐
所有评论(0)