使用 Apache Airflow 的托管工作流程自动化您的 ELT 工作流程 - 第二部分
第二部分 - 自动化 Amazon EMR 在Part One中,我们使用 Apache Airflow 在 Amazon Athena 上自动化了一个示例 ELT 工作流。在这篇博文的第二部分中,我们将做同样的事情,但使用 Amazon EMR 自动化相同的示例 ELT 工作流。 确保您回顾了第一部分的设置。可以在此处的GitHub 存储库中找到所有代码,以便您自己复制。 自动化 Amazon
第二部分 - 自动化 Amazon EMR
在Part One中,我们使用 Apache Airflow 在 Amazon Athena 上自动化了一个示例 ELT 工作流。在这篇博文的第二部分中,我们将做同样的事情,但使用 Amazon EMR 自动化相同的示例 ELT 工作流。
确保您回顾了第一部分的设置。可以在此处的GitHub 存储库中找到所有代码,以便您自己复制。
自动化 Amazon EMR
回顾一下:我们正在使用 Movielens 数据集,将其加载到我们在 Amazon S3 上的数据湖中,并且我们被要求 a) 创建一个新表,其中包含我们关心的信息的子集,在这种情况下是特定类型的电影, b) 使用数据湖中可用的相同信息子集创建一个新文件。
作为我们试图自动化的一组手动步骤的一部分,我们使用了 Amazon EMR(同样与上一篇文章一样,如果您想查看这些手动步骤,请参阅 GitHub 存储库中的文档)以及一些 Apache Hive和 Presto SQL 脚本来创建表和导出文件。由于我们正在自动化它,我们不需要做很多事情,因为我们将其吸收为手动工作的一部分(例如,我已经有一个名为 XX 的数据库,所以我不需要重新创建它)我们需要构建到工作流程中。所以在高层次上,这些步骤看起来像:
-
创建我们的 Apache Hive 和 Presto SQL 脚本并将它们上传到 Amazon S3 上的某个位置
-
检查数据库是否存在,如果不存在则创建
-
创建表来导入电影和收视率数据(使用我们上传的脚本)
-
创建一个只包含我们正在寻找的信息的新表(在这个例子中,特定类型的电影)
-
将新表导出为 csv 文件(再次使用我们已经上传的脚本)
-
将导出的 csv 文件移动到数据湖中的新位置
-
清理并关闭所有资源,以便我们可以最大限度地降低运行此操作的成本
毫不奇怪,这个工作流程的开始方式与前一个工作流程非常相似。但是,这次我们使用的是 Amazon EMR,如果我们查看可用的 Apache Airflow 运算符,我们会发现有一个 Amazon EMR 运算符可以让我们的生活变得轻松。我们可以在 Apache Airflow 网站上查看此运算符的文档,Amazon EMR Operators
作为我们工作流程的一部分,我们想要创建一个 Amazon EMR 集群,添加一些步骤来运行一些 Presto 和 Apache Hive 查询,然后终止集群,因此我们需要添加这些运算符(EmrCreateJobFlowOperator、EmrAddStepsOperator、EmrTerminateJobFlowOperator 和 EmrStepSensor)在我们的 DAG 中
from airflow import DAG, settings, secrets
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.models import Variable
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.dates import days_ago
import os
import sys
import boto3
import time
进入全屏模式 退出全屏模式
我们工作流程的下一部分是相同的,只是这次我们添加了更多变量。在之前的工作流程中,我对流派进行了硬编码,所以这次我想将其添加为变量,这意味着我们可以创建一个工作流,对其进行参数化,然后根据需要多次运行它,只需更改该变量“流派”和“流派_t”
s3_dlake = Variable.get("s3_dlake", default_var="undefined")
emr_db = Variable.get("emr_db", default_var="undefined")
emr_output = Variable.get("emr_output", default_var="undefined")
genre = Variable.get("emr_genre", default_var="undefined")
genre_t = Variable.get("emr_genre_table", default_var="undefined")
进入全屏模式 退出全屏模式
如果我们查看我们希望自动化的步骤,第一个是将我们的 Apache Hive 和 Presto 脚本上传到 Amazon S3 上的一个位置,我们可以从我们的 Amazon EMR 步骤运行它们。我们可以在 Apache Airflow 之外创建这些并上传它们,这是一个选项。然而,在本演练中,我将使用我们定义的相同变量创建这些脚本,以确保这些脚本随着我们的需求变化而动态变化。
为此,我将使用 PythonOperator 定义一个名为“create_emr_scripts”的新任务。
create_emr_scripts = PythonOperator (
task_id='create_emr_scripts',
provide_context=True,
python_callable=py_create_emr_scripts,
dag=dag
)
进入全屏模式 退出全屏模式
我需要创建一个名为“py_create_emr_scripts”的支持函数,所以让我们看一下这段代码。此代码将五个文件写入 Amazon S3 位置 {s3_dlake}/scripts,每个文件对应于我们在手动步骤中创建的 SQL。
def py_create_emr_scripts(**kwargs):
s3 = boto3.resource('s3')
print("Creating scripts which will be executed by Amazon EMR - will overwrite existing scripts")
# create create-film-db.hql
object1 = s3.Object(s3_dlake, 'scripts/create-film-db.hql')
object1.put(Body=HIVE_CREATE_DB)
# create create-film-db-tables.hql
object2 = s3.Object(s3_dlake, 'scripts/create-film-db-tables.hql')
object2.put(Body=HIVE_CREATE_DB_TABLES)
# create create-genre-film-table.hql
object3 = s3.Object(s3_dlake, 'scripts/create-genre-film-table.hql')
object3.put(Body=HIVE_CREATE_GENRE_TABLE)
# create create-genre.sql
object4 = s3.Object(s3_dlake, 'scripts/create-genre.sql')
object4.put(Body=PRESTO_SQL_GEN_GENRE_CSV)
# create run-presto-query.sh
object5 = s3.Object(s3_dlake, 'scripts/run-presto-query.sh')
object5.put(Body=PRESTO_SCRIPT_RUN_EXPFILE)
进入全屏模式 退出全屏模式
这些变量(HIVE_CREATE_DB、HIVE_CREATE_DB_TABLES 等)在工作流中定义如下:
HIVE_CREATE_DB = """
create database {database};
""".format(database=emr_db)
HIVE_CREATE_DB_TABLES = """
CREATE EXTERNAL TABLE {database}.movies (
movieId INT,
title STRING,
genres STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION 's3://{datalake}/movielens/movies/';
CREATE EXTERNAL TABLE {database}.ratings (
userId INT,
movieId INT,
rating INT,
timestampId TIMESTAMP
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION 's3://{datalake}/movielens/ratings-alt/';
""".format(database=emr_db,datalake=s3_dlake)
HIVE_CREATE_GENRE_TABLE = """
CREATE EXTERNAL TABLE {database}.{genre_t} (
title STRING,
year INT,
rating INT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION 's3://{datalake}/movielens/{genre}/';
""".format(database=emr_db,genre=genre,datalake=s3_dlake,genre_t=genre_t)
PRESTO_SCRIPT_RUN_EXPFILE = """
#!/bin/bash
aws s3 cp s3://{datalake}/scripts/create-genre.sql .
presto-cli --catalog hive -f create-genre.sql --output-format TSV > {genre_t}-films.tsv
aws s3 cp {genre_t}-films.tsv s3://{datalake}/movielens/{genre}/
""".format(database=emr_db,genre=genre,datalake=s3_dlake,genre_t=genre_t)
PRESTO_SQL_GEN_GENRE_CSV = """
WITH {genre}data AS (
SELECT REPLACE ( m.title , '"' , '' ) as title, r.rating
FROM {database}.movies m
INNER JOIN (SELECT rating, movieId FROM {database}.ratings) r on m.movieId = r.movieId WHERE REGEXP_LIKE (genres, '{genre}')
)
SELECT substr(title,1, LENGTH(title) -6) as title, replace(substr(trim(title),-5),')','') as year, AVG(rating) as avrating from {genre}data GROUP BY title ORDER BY year DESC, title ASC ;
""".format(database=emr_db,genre=genre)
进入全屏模式 退出全屏模式
如您所见,我们使用标准 python 替换变量中的值,因此我们动态生成了脚本,这些脚本将在我们的 Amazon EMR 步骤开始时启动。
如果我们现在在工作流的底部添加依赖关系/关系详细信息(到目前为止,我们只定义了一个任务,其余的只是支持函数和代码),我们最终会得到:
from airflow import DAG, settings, secrets
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.models import Variable
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.dates import days_ago
import os
import sys
import boto3
import time
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
}
DAG_ID = os.path.basename(__file__).replace('.py', '')
dag = DAG(
dag_id=DAG_ID,
default_args=default_args,
description='DevDay EMR DAG',
schedule_interval=None,
start_date=days_ago(2),
tags=['devday','demo'],
)
s3_dlake = Variable.get("s3_dlake", default_var="undefined")
emr_db = Variable.get("emr_db", default_var="undefined")
emr_output = Variable.get("emr_output", default_var="undefined")
genre = Variable.get("emr_genre", default_var="undefined")
genre_t = Variable.get("emr_genre_table", default_var="undefined")
HIVE_CREATE_DB = """
create database {database};
""".format(database=emr_db)
HIVE_CREATE_DB_TABLES = """
CREATE EXTERNAL TABLE {database}.movies (
movieId INT,
title STRING,
genres STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION 's3://{datalake}/movielens/movies/';
CREATE EXTERNAL TABLE {database}.ratings (
userId INT,
movieId INT,
rating INT,
timestampId TIMESTAMP
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION 's3://{datalake}/movielens/ratings-alt/';
""".format(database=emr_db,datalake=s3_dlake)
HIVE_CREATE_GENRE_TABLE = """
CREATE EXTERNAL TABLE {database}.{genre_t} (
title STRING,
year INT,
rating INT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION 's3://{datalake}/movielens/{genre}/';
""".format(database=emr_db,genre=genre,datalake=s3_dlake,genre_t=genre_t)
PRESTO_SCRIPT_RUN_EXPFILE = """
#!/bin/bash
aws s3 cp s3://{datalake}/scripts/create-genre.sql .
presto-cli --catalog hive -f create-genre.sql --output-format TSV > {genre_t}-films.tsv
aws s3 cp {genre_t}-films.tsv s3://{datalake}/movielens/{genre}/
""".format(database=emr_db,genre=genre,datalake=s3_dlake,genre_t=genre_t)
PRESTO_SQL_GEN_GENRE_CSV = """
WITH {genre}data AS (
SELECT REPLACE ( m.title , '"' , '' ) as title, r.rating
FROM {database}.movies m
INNER JOIN (SELECT rating, movieId FROM {database}.ratings) r on m.movieId = r.movieId WHERE REGEXP_LIKE (genres, '{genre}')
)
SELECT substr(title,1, LENGTH(title) -6) as title, replace(substr(trim(title),-5),')','') as year, AVG(rating) as avrating from {genre}data GROUP BY title ORDER BY year DESC, title ASC ;
""".format(database=emr_db,genre=genre)
def py_create_emr_scripts(**kwargs):
s3 = boto3.resource('s3')
print("Creating scripts which will be executed by Amazon EMR - will overwrite existing scripts")
# create create-film-db.hql
object1 = s3.Object(s3_dlake, 'scripts/create-film-db.hql')
object1.put(Body=HIVE_CREATE_DB)
# create create-film-db-tables.hql
object2 = s3.Object(s3_dlake, 'scripts/create-film-db-tables.hql')
object2.put(Body=HIVE_CREATE_DB_TABLES)
# create create-genre-film-table.hql
object3 = s3.Object(s3_dlake, 'scripts/create-genre-film-table.hql')
object3.put(Body=HIVE_CREATE_GENRE_TABLE)
# create create-genre.sql
object4 = s3.Object(s3_dlake, 'scripts/create-genre.sql')
object4.put(Body=PRESTO_SQL_GEN_GENRE_CSV)
# create run-presto-query.sh
object5 = s3.Object(s3_dlake, 'scripts/run-presto-query.sh')
object5.put(Body=PRESTO_SCRIPT_RUN_EXPFILE)
create_emr_scripts = PythonOperator (
task_id='create_emr_scripts',
provide_context=True,
python_callable=py_create_emr_scripts,
dag=dag
)
create_emr_scripts
进入全屏模式 退出全屏模式
这些顺序很重要,因为如果您使用/调用的内容未在工作流代码中定义,您可能会看到错误。
当我们提交此代码时,几秒钟后,我们将在工作流中看到一个名为“create_emr_scripts”的任务,我们可以启用(打开)然后触发它。如果我们现在转到 Amazon S3 数据湖的脚本文件夹,我们应该会看到我们的新脚本已准备就绪。
每次我们重新运行此任务时,脚本都会被覆盖,以确保它们包含正确的值。
现在我们有了脚本,接下来我们需要做的是通过 Amazon EMR 运行这些脚本。如果需要,我们可以使用现有的 Amazon EMR 集群,然后将步骤提交到该集群,但在本演练中,我将创建一个自动终止的 Amazon EMR 集群,添加步骤,然后终止该集群。
如果您想使用现有的 Amazon EMR 集群,您需要更改代码以获取 Amazon EMR 集群 ID 的输入值。有很多方法可以做到这一点:通过触发 DAG 时的配置值,通过存储在 AWS Secrets manager 等东西中的变量,或者使用 PythonOperator 中的一些代码来查找该集群 ID。
为了启动我们的集群,我们使用 EmrCreateJobFlowOperator 运算符,它只接受一个值,“job_flow_overrides”,这是一个您需要定义的变量,其中包含您的 Amazon EMR 集群的配置详细信息(您要使用的应用程序,集群的大小和数量、配置细节等)
create_emr_database_cluster = EmrCreateJobFlowOperator(
task_id='create_emr_database_cluster',
job_flow_overrides=JOB_FLOW_OVERRIDES,
dag=dag
)
进入全屏模式 退出全屏模式
如我们所见,我们定义了一个名为 JOB_FLOW_OVERRIDES 的变量,其中包含我们的 Amazon EMR 集群详细信息。您还可以看到我们再次替换变量,以便 Amazon EMR 集群根据我们的用例使用正确的配置详细信息。这允许我们在许多不同的应用程序中使用标准模板。
JOB_FLOW_OVERRIDES = {
'Name': 'devday-demo-cluster-airflow',
'ReleaseLabel': 'emr-5.32.0',
'LogUri': 's3n://{{ var.value.s3_dlake }}/logs',
'Applications': [
{
'Name': 'Spark',
},
{
'Name': 'Pig',
},
{
'Name': 'Hive',
},
{
'Name': 'Presto',
}
],
'Instances': {
'InstanceFleets': [
{
'Name': 'MASTER',
'InstanceFleetType': 'MASTER',
'TargetSpotCapacity': 1,
'InstanceTypeConfigs': [
{
'InstanceType': 'm5.xlarge',
},
]
},
{
'Name': 'CORE',
'InstanceFleetType': 'CORE',
'TargetSpotCapacity': 1,
'InstanceTypeConfigs': [
{
'InstanceType': 'r5.xlarge',
},
],
},
],
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': False,
'Ec2KeyName': 'ec2-rocket',
},
'Configurations': [
{
'Classification': 'hive-site',
'Properties': {'hive.metastore.client.factory.class': 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory'}
},
{
'Classification': 'presto-connector-hive',
'Properties': {'hive.metastore.glue.datacatalog.enabled': 'true'}
},
{
'Classification': 'spark-hive-site',
'Properties': {'hive.metastore.client.factory.class': 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory'}
}
],
'VisibleToAllUsers': True,
'JobFlowRole': 'EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
'EbsRootVolumeSize': 32,
'StepConcurrencyLevel': 1,
'Tags': [
{
'Key': 'Environment',
'Value': 'Development'
},
{
'Key': 'Name',
'Value': 'Airflow EMR Demo Project'
},
{
'Key': 'Owner',
'Value': 'Data Analytics Team'
}
]
}
进入全屏模式 退出全屏模式
为了实现我的目标,我创建了一个包含 Apache Hive 和 Presto 应用程序的 Amazon EMR 集群,为简单起见,我使用 AWS Glue 数据目录作为元存储(您可以轻松地将其更改为您的环境使用的内容,例如可能像 MySQL 实例)。我们在配置中配置的一件事是值“KeepJobFlowAliveWhenNoSteps': True”,因为我们希望 Amazon EMR 集群一直运行,直到它完成我们的所有步骤,然后才终止它。
因此,此步骤现在将启动我们的 Amazon EMR 集群。让我们将其添加到工作流依赖图中。
create_emr_scripts >> create_emr_database_cluster
进入全屏模式 退出全屏模式
如果我们提交代码然后启动这个工作流,我们现在应该开始看到我们的 Amazon EMR 集群启动了。在 UI 中,查看“create_emr_database_cluster”的日志,您将在日志文件中看到与此类似的内容:
[2021-04-19 14:59:18,888] {{standard_task_runner.py:78}} INFO - Job 116460: Subtask create_emr_database_cluster
[2021-04-19 14:59:18,994] {{logging_mixin.py:112}} INFO - Running %s on host %s <TaskInstance: devday-emr-create.create_emr_database_cluster 2021-04-19T14:58:50.688217+00:00 [running]> ip-10-192-21-41.eu-west-1.compute.internal
[2021-04-19 14:59:19,138] {{emr_create_job_flow_operator.py:66}} INFO - Creating JobFlow using aws-conn-id: s3_default, emr-conn-id: emr_default
[2021-04-19 14:59:19,437] {{emr_create_job_flow_operator.py:73}} INFO - JobFlow with id j-2JRII3WTAD9PG created
进入全屏模式 退出全屏模式
将显示 Amazon EMR 集群 ID(此处为“j-2JRII3WTAD9PG”),这很重要,我们稍后会看到。在继续之前,请确保您通过控制台手动终止此集群。我们不想让 Amazon EMR 集群继续运行,因此我们使用不同的运算符 EmrTerminateJobFlowOperator 创建了一个新任务。
terminate_emr_cluster = EmrTerminateJobFlowOperator(
task_id='terminate_emr_cluster',
job_flow_id="{{ task_instance.xcom_pull('create_emr_database_cluster', key='return_value') }}",
aws_conn_id='aws_default',
)
进入全屏模式 退出全屏模式
这里有一些新东西。首先,我们有“aws_conn_id”参数,这是此操作员所必需的,我们在使用 Apache Airflow 的托管工作流时设置为该值。如果您托管/使用您自己的 Apache Airflow 版本,这将对应于您在 Apache Airflow UI 中定义的连接的名称。接下来要注意的是“job_flow_id”,它使用了 Apache Airflow 的另一个特性 xcom。 Xcoms 是 Apache Airflow 中允许任务交换信息的功能,在这种情况下,我们正在“拉取”Amazon EMR 集群 ID 的详细信息(正如我们在上一个任务中看到的),以便我们可以终止正确的集群。
我们现在可以将此任务添加到工作流中:
create_emr_scripts >> create_emr_database_cluster >> terminate_emr_cluster
进入全屏模式 退出全屏模式
当 DAG 出现在 UI 中时,提交并启动它。在这个阶段,除了启动和终止 Amazon EMR 集群之外,它没有做任何有趣的事情。接下来,我们需要添加要在该运行集群上执行的步骤。
如果我们看看我们试图自动化的任务,第一个是创建数据库。我们将脚本(一个简单的 Apache Hive 脚本)上传到 Amazon S3 的 /scripts 文件夹中。但是和之前在 Amazon Athena 演练中一样,我们需要在此处添加一些逻辑来跳过已经存在的数据库的创建。我们的工作流程将检查数据库是否存在,如果不存在,则创建数据库并导入我们需要的 Movielens 表,如果数据库已经存在则跳过此步骤。
因为我们已经在上一篇文章中介绍了分支逻辑,所以这次我将只介绍 Amazon EMR 步骤。要添加一个步骤,在本例中运行 hive 脚本,我们使用 EmrAddStepsOperator,它将启动一个新步骤,由我们想要运行它的 Amazon EMR 集群执行。当我们使用 EmrAddStepsOperator 算子时,我们会使用一个对应的算子 EmrStepSensor,它会跟踪任务的状态(是成功还是失败)。这是这两个新任务的代码。
create_emr_database_step = EmrAddStepsOperator(
task_id='create_emr_database_step',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_database_cluster', key='return_value') }}",
aws_conn_id='aws_default',
on_failure_callback=cleanup_emr_cluster_if_steps_fail,
steps=CREATE_DATABASE,
)
create_emr_database_sensor = EmrStepSensor(
task_id='create_emr_database_sensor',
job_flow_id="{{ task_instance.xcom_pull('create_emr_database_cluster', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='create_emr_database_step', key='return_value')[0] }}",
on_failure_callback=cleanup_emr_cluster_if_steps_fail,
aws_conn_id='aws_default',
)
进入全屏模式 退出全屏模式
在“create_emr_database_step”任务中,您可以看到我们再次使用 Xcoms,以在我们使用 EmrAddStepsOperator 时获取 Amazon EMR 集群 ID 的名称。在“create_emr_database_sensor”任务中,您可以看到我们正在使用 XComs 额外获取我们需要跟踪的任务的名称 - 在本例中,“create_emr_database_step”任务.这将确保此任务正在监视正确的步骤。
接下来要注意的是“create_emr_database_step”任务中的“stepsu003d”参数。这是我们定义提交到 Amazon EMR 的实际步骤的地方,我们在一个变量中定义它,在本例中称为 CREATE_DATABASE。这是代码。
CREATE_DATABASE = [
{
'Name': 'Create Genre Database',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'hive-script',
'--run-hive-script',
'--args',
'-f',
's3://{{ var.value.s3_dlake }}/scripts/create-film-db.hql'
]
}
}
]
进入全屏模式 退出全屏模式
如果我们通过 Amazon EMR 控制台手动提交任务,这与我们将遵循的过程相同。您会注意到我们再次使用变量以确保我们不会对任何内容进行硬编码。
我们还没有准备好提交这个。在提交要由 Amazon EMR 集群执行的步骤时,有时这些步骤可能会失败。如果发生这种情况,工作流程将停止/停止,这将使我们的 Amazon EMR 集群继续运行(我们必须为此付费)。我们需要一种短路的方法,为此我们使用 Apache Airflow 的“on_failure_callback”特性,它允许我们在任务失败的情况下调用我们定义的函数。在上面的示例中,我们定义了一个名为“cleanup_emr_cluster_if_steps_fail”的清理函数,如下所示:
def cleanup_emr_cluster_if_steps_fail(context):
print("This is invoked when a running EMR cluster has a step running that fails.")
print("If we do not do this, the DAG will stop but the cluster will still keep running")
early_terminate_emr_cluster = EmrTerminateJobFlowOperator(
task_id='terminate_emr_cluster',
job_flow_id=context["ti"].xcom_pull('create_emr_database_cluster'),
aws_conn_id='aws_default',
)
return early_terminate_emr_cluster.execute(context=context)
进入全屏模式 退出全屏模式
如您所见,我们正在使用 Xcoms 获取 Amazon EMR 集群 ID,然后终止它。现在,如果我们的脚本出现恶意,我们仍然会终止集群。
如果我们现在接受这些新任务,连同分支逻辑,我们现在有这个额外的代码:
CREATE_DATABASE = [
{
'Name': 'Create Genre Database',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'hive-script',
'--run-hive-script',
'--args',
'-f',
's3://{{ var.value.s3_dlake }}/scripts/create-film-db.hql'
]
}
}
]
def check_emr_database(**kwargs):
ath = boto3.client('athena')
try:
response = ath.get_database(
CatalogName='AwsDataCatalog',
DatabaseName=emr_db
)
print("Database already exists - skip creation")
return "skip_emr_database_creation"
except:
print("No EMR Database Found")
return "create_emr_database_step"
def cleanup_emr_cluster_if_steps_fail(context):
print("This is invoked when a running EMR cluster has a step running that fails.")
print("If we do not do this, the DAG will stop but the cluster will still keep running")
early_terminate_emr_cluster = EmrTerminateJobFlowOperator(
task_id='terminate_emr_cluster',
job_flow_id=context["ti"].xcom_pull('create_emr_database_cluster'),
aws_conn_id='aws_default',
)
return early_terminate_emr_cluster.execute(context=context)
check_emr_database = BranchPythonOperator(
task_id='check_emr_database',
provide_context=True,
python_callable=check_emr_database,
retries=1,
dag=dag,
)
skip_emr_database_creation = DummyOperator(
task_id="skip_emr_database_creation",
trigger_rule=TriggerRule.NONE_FAILED,
dag=dag,
)
emr_database_checks_done = DummyOperator(
task_id="emr_database_checks_done",
trigger_rule=TriggerRule.NONE_FAILED,
dag=dag,
)
create_emr_database_step = EmrAddStepsOperator(
task_id='create_emr_database_step',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_database_cluster', key='return_value') }}",
aws_conn_id='aws_default',
on_failure_callback=cleanup_emr_cluster_if_steps_fail,
steps=CREATE_DATABASE,
)
create_emr_database_sensor = EmrStepSensor(
task_id='create_emr_database_sensor',
job_flow_id="{{ task_instance.xcom_pull('create_emr_database_cluster', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='create_emr_database_step', key='return_value')[0] }}",
on_failure_callback=cleanup_emr_cluster_if_steps_fail,
aws_conn_id='aws_default',
)
create_emr_scripts >> create_emr_database_cluster >> check_emr_database
check_emr_database >> skip_emr_database_creation >> emr_database_checks_done
check_emr_database >> create_emr_database_step >> create_emr_database_sensor >> emr_database_checks_done
>> emr_database_checks_done >> terminate_emr_cluster
进入全屏模式 退出全屏模式
如果我们签入此代码,然后触发 DAG,我们现在应该看到 Amazon EMR 集群启动,运行创建数据库的步骤,然后终止集群。
工作流程的其余部分重复上述过程,添加添加步骤。您可以在 GitHub 存储库](https://github.com/094459/devday-elt-automation/blob/main/dags/devday-emr-create.py)中查看完整的工作流程[
运行工作流
提交代码后,您应该在 Apache Airflow UI 中拥有可用的工作流,然后可以通过 UI 触发它们。随着每个步骤的开始、运行和完成,您应该能够看到生成的信息和日志(包括 DAG 中包含的任何 Print 语句)。
工作流程完成后,您现在可以查看结果。如果我们使用 Hue,我们可以连接到新数据库并使用 Presto 中的标准 SQL 查看新信息。如果我们查看 Amazon S3 数据湖,我们可以看到我们有新文件。
下一步是什么?
感谢您一直坚持到最后,我希望您发现了解如何使用 Apache Airflow 等开源工具来自动化您的 ELT(或就此而言 ETL)任务很有用。请留意未来的 DevDay Data 活动,我将引导您完成端到端的构建,但我希望您在这里有足够的信息来亲自尝试一下。
不难看出您可以如何在此示例的基础上进行构建。一些例子可能是:
-
使用您在 AWS Lambda 上部署的函数来触发自动化工作流程 - 例如,您收到的新数据更新可能会导致这些表/导出文件自动刷新
-
使用其他 Airflow 运算符,例如 Amazon SageMaker 的运算符,允许您触发自动机器学习模型训练/调整
-
使用额外的工作流程作为摄取工作流程的一部分,将 movielens 数据库放入数据湖,然后触发 ELT 工作流程
与往常一样,请随时联系并提供意见/问题。
更多推荐
所有评论(0)