[创新](https://res.cloudinary.com/practicaldev/image/fetch/s--KASs89--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://raw.githubusercontent.com /94459/innovateaiml-airflow/main/images/banner.png)

支持即将到来的在线活动的一系列帖子的一部分,即 2 月 24 日格林威治标准时间上午 9:00 举行的创新 AI/ML - 您可以在此处注册

  • 第 1 部分 -Apache Airflow 托管工作流的安装和配置

  • 第 2 部分 -使用权限

  • 第 3 部分 -访问适用于 Apache Airflow 环境的 Amazon Managed Workflows

  • 第 4 部分 - 通过命令行与 Apache Airflow 的 Amazon Managed Workflows 交互 <本帖

  • 第 5 部分 -适用于您的开发工作流程的简单 CI/CD 系统

  • 第 6 部分 -监控和记录

  • 第 7 部分 - 使用 Apache Airflow 自动化简单的 AI/ML 管道

在这篇文章中,我将介绍第 4 部分,如何通过命令行交互和访问 Apache Airflow。具体来说,我将介绍几件事:

  1. Amazon Managed Workflows for Apache Airflow 如何使用并支持命令行或程序访问

  2. 演练和一些如何做到这一点的例子。

你需要什么

  • 具有正确权限级别的 AWS 账户

  • 已配置并运行 AWS CLI 工具的环境

  • 访问支持 Apache Airflow 托管工作流的 AWS 区域

  • 已设置适用于 Apache Airflow 的 Amazon Managed Workflows 环境 - 理想情况下,您应该在此处遵循第 1 部分。

Apache 气流 cli

Apache Airflow 提供了一个全面的 cli(您可以在此处阅读详细信息),但重要的是要知道,在使用 MWAA 时,您访问 cli 的方式以及可用的选项都是不同的。如果您来自自安装/管理的 Apache Airflow,则值得花一些时间了解差异 - 您可以在此处阅读。稍后我们将了解如何在您的 MWAA 环境中使用它们。

权限

当我把这个博客放在一起时,我遇到了权限错误(拒绝访问),因为我确保我只配置了所需的最低权限并遵循最低权限原则。

我创建了一个名为 MWAA-CLI-Access-{Env} 的新 IAM 策略,其中包含特定于每个环境的以下详细信息。如果你想在你的环境中启用它,你可以使用通配符,但这对我来说太宽容了。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "airflow:CreateCliToken",
            "Resource": [
                "arn:aws:airflow:{region}:{aws-account-no}:environment/{env}"
            ]
        }
    ]
}

进入全屏模式 退出全屏模式

创建后,您可以将此策略附加到一个组,然后将任何 IAM 用户添加到该组,您希望启用这些用户有权运行我在下面概述的这些命令行工具。

Apache Airflow 通过命令行

当您想通过命令行或以编程方式与您的 MWAA 环境交互时,您可以打开许多选项。其中一些您将在之前的帖子中提到过。你所采取的方法将受到你想要达到的目标的影响。例如,一些方法更适合构建/配置和管理 MWAA 环境,而其他选项提供更好的使用 Apache Airflow 功能的能力。

让我们看看这些选项以及它们提供的功能。

  • aws cli - 您拥有的第一个选项是使用标准 aws cli 能够与 MWAA 交互。您可以做很多事情,从检查/询问您的 MWAA 环境到能够创建和更新它们。这些在官方文档页面中有详细记录,您可以在此处看到。您可能会想我是否可以使用这些来创建 MWAA 环境,而不是像之前的帖子中那样使用 CloudFormation 之类的东西。简短的回答是肯定的,但是,您将需要使用许多不同的 aws cli 命令来配置所有其他组件(网络、安全角色、S3 存储桶等),因此您可能会发现使用它是一个更好的选择与 CloudFormation 预置环境结合使用。这将取决于您的偏好和您已经使用的内容。

  • shell 脚本 - 您的下一个选项是通过 shell 进行交互,在这里您可以使用标准 bash 脚本在您已启动并正在运行的环境中直接与您的 Apache Airflow cli 交互。您可以使用 Apache Airflow cli 执行诸如启动/暂停工作流、列出您拥有的工作流、检查工作流状态等操作。

  • boto3 - 您拥有的最后一个选项是使用 boto3 库编写一些 Python 代码,该库是适用于 Python 的 AWS 开发工具包。这使您能够结合 aws cli 的功能并与 Apache Airflow cli 交互,但在您的 python 应用程序的上下文中

现在让我们看一下所有这些示例。

通过 AWS cli 的示例

从 aws cli,您可以使用以下命令访问 aws mwaa cli

aws mwaa help

进入全屏模式 退出全屏模式

将输出可用的命令,然后您可以发出以下命令以获取有关如何使用特定命令的更详细帮助。

aws mwaa {command} help

进入全屏模式 退出全屏模式

如果我们想列出给定 AWS 区域中的当前 MWAA 环境,我们可以使用以下命令:

aws mwaa list-environments

进入全屏模式 退出全屏模式

这将向我们展示类似的内容(假设您已经从之前的帖子中创建了一个环境)

{
    "Environments": [
        "apache-airflow-aimlinnovate",
        "apache-airflow-innovate",
        "airflow-blogpost-dublin"
    ]
}

进入全屏模式 退出全屏模式

如果我们想获得该环境的详细信息——也许我们想查看工作人员的大小,获取 Apache Airflow UI 或其他东西,我们可以使用:

aws mwaa get-environment --name {an environment}

进入全屏模式 退出全屏模式

这会给你这样的东西。

{
    "Environment": {
        "AirflowConfigurationOptions": {},
        "AirflowVersion": "1.10.12",
        "Arn": "arn:aws:airflow:eu-west-1:xxxxxxxxxxxx:environment/airflow-blogpost-dublin",
        "CreatedAt": 1610632127.0,
        "DagS3Path": "dags",
        "EnvironmentClass": "mw1.medium",
        "ExecutionRoleArn": "arn:aws:iam:: xxxxxxxxxxxx:role/airflow-demo-mwaa-eks-iamrole",
        "LastUpdate": {
            "CreatedAt": 1611137820.0,
            "Status": "SUCCESS"
        },
        "LoggingConfiguration": {
            "DagProcessingLogs": {
                "CloudWatchLogGroupArn": "arn:aws:logs:: xxxxxxxxxxxx:log-group:airflow-ricsue-dublin-DAGProcessing",
                "Enabled": true,
                "LogLevel": "INFO"
            },
            "SchedulerLogs": {
                "CloudWatchLogGroupArn": "arn:aws:logs:: xxxxxxxxxxxx:log-group:airflow-ricsue-dublin-Scheduler",
                "Enabled": true,
                "LogLevel": "INFO"
            },
            "TaskLogs": {
                "CloudWatchLogGroupArn": "arn:aws:logs:: xxxxxxxxxxxx:log-group:airflow-ricsue-dublin-Task",
                "Enabled": true,
                "LogLevel": "INFO"
            },
            "WebserverLogs": {
                "CloudWatchLogGroupArn": "arn:aws:logs:: xxxxxxxxxxxx:log-group:airflow-ricsue-dublin-WebServer",
                "Enabled": true,
                "LogLevel": "INFO"
            },
            "WorkerLogs": {
                "CloudWatchLogGroupArn": "arn:aws:logs:: xxxxxxxxxxxx:log-group:airflow-ricsue-dublin-Worker",
                "Enabled": true,
                "LogLevel": "INFO"
            }
        },
        "MaxWorkers": 5,
        "Name": "ricsue-dublin",
        "NetworkConfiguration": {
            "SecurityGroupIds": [
                "sg-0c88ef4755c295zzz"
            ],
            "SubnetIds": [
                "subnet-0493dffd0282f4xxx",
                "subnet-08f416023356ffyyy"
            ]
        },
        "RequirementsS3Path": "requirements/requirements.txt",
        "ServiceRoleArn": "arn:aws:iam:: xxxxxxxxxxxx:role/aws-service-role/airflow.amazonaws.com/AWSServiceRoleForAmazonMWAA",
        "SourceBucketArn": "arn:aws:s3:::airflow-mybucket",
        "Status": "AVAILABLE",
        "Tags": {},
        "WebserverAccessMode": "PUBLIC_ONLY",
        "WebserverUrl": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee.c5.eu-west-1.airflow.amazonaws.com",
        "WeeklyMaintenanceWindowStart": "SUN:14:00"
    }
}

进入全屏模式 退出全屏模式

然后我们可以使用 jq 等工具来访问这些信息的某些部分。了解 Apache Airflow UI 的 url 在稍后的一些脚本中很有用。我们可以使用以下命令轻松获取此信息:

aws mwaa get-environment --name {name of your environment} | jq -r '.Environment.WebserverUrl'

进入全屏模式 退出全屏模式

通过命令行的示例

现在让我们看看我们将如何与 Apache Airflow 本身进行交互。如文档所述,此处为有很多 Apache Airflow 命令,我们可以使用它们来执行诸如查看当前工作流 (DAG)、触发甚至删除 DAG 并查看它们的状态之类的事情。许多不可用的对于托管服务不一定有意义,因此不应该带来太多不便。

假设我们想使用 Apache Airflow 列出当前的 DAG。我们知道 cli 命令是 list_dags,所以我们可以组合一个 shell 脚本来与特定环境的 Apache Airflow cli 交互,如下所示。

#!/bin/bash
[ $# -eq 0 ] && echo "Usage: $0 MWAA environment name " && exit

if [[ $2 == "" ]]; then
    dag="list_dags"

elif [ $2 == "list_tasks" ] && [[ $3 != "" ]]; then
    dag="$2 $3"

elif [ $2 == "list_dags" ] || [ $2 == "version" ] || [ $2 == "variables" ]; then
    dag=$2

else
    echo "Not a valid command"
    exit 1
fi

CLI_JSON=$(aws mwaa create-cli-token --name $1) \
    && CLI_TOKEN=$(echo $CLI_JSON | jq -r '.CliToken') \
    && WEB_SERVER_HOSTNAME=$(echo $CLI_JSON | jq -r '.WebServerHostname') \
    && CLI_RESULTS=$(curl --request POST "https://$WEB_SERVER_HOSTNAME/aws_mwaa/cli" \
    --header "Authorization: Bearer $CLI_TOKEN" \
    --header "Content-Type: text/plain" \
    --data-raw "$dag" ) \
    && echo "Output:" \
    && echo $CLI_RESULTS | jq -r '.stdout' | base64 --decode \
    && echo "Errors:" \
    && echo $CLI_RESULTS | jq -r '.stderr' | base64 --decode

进入全屏模式 退出全屏模式

现在这是一个非常粗略的脚本,您可以扩展它以支持除 list_dags 或 version 之外的其他命令(只需编辑第 4 行以添加您要检查的其他命令),但希望您明白这一点。

当您按如下方式运行此命令时(airflow-cli.sh 与我从上述文件创建的脚本以及我放入 GitHub 存储库中的脚本相同)

./airflow-cli.sh {environment} list_dags

进入全屏模式 退出全屏模式

您应该得到以下输出

-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
bakery_sales
db_cleanup
multiple_steps
spark_pi_example

进入全屏模式 退出全屏模式

您可以重复相同的操作,这次使用 version 命令。

Output:
1.10.12

进入全屏模式 退出全屏模式

请记住,这确实假设您已根据您的用户帐户安装和配置了 AWS cli。

一些 Apache Airflow cli 命令采用两个而不是单个值。上面的脚本向您展示了如何扩展它以适应这种情况。在上面的示例中,list_tasks 命令需要一个 DAG id。当我们运行以下命令时:

./airflow-cli.sh {environment} list_tasks bakery_sales

进入全屏模式 退出全屏模式

我们得到以下输出

Output:
[2021-02-01 15:05:40,386] {{__init__.py:50}} INFO - Using executor CeleryExecutor
[2021-02-01 15:05:40,386] {{dagbag.py:417}} INFO - Filling up the DagBag from /usr/local/airflow/dags
add_steps
create_job_flow
watch_step

进入全屏模式 退出全屏模式

您应该有足够的资源开始使用您自己的改进脚本来与 Apache Airflow 命令进行交互。

通过 python 的示例

如果我们更喜欢这种方法,我们也可以使用 boto3,AWS Python 开发工具包。这是一个示例脚本,它再次采用多个参数(环境和命令),然后使用与 bash 脚本中相同的技术,发布到 Apache Airflow 命令解释器,然后返回响应。

import requests
import boto3
import base64
import argparse


def parse_args():
    """Parse argument values from command-line"""

    parser = argparse.ArgumentParser(description='Arguments required for script.')
    parser.add_argument('-e', '--env', required=True, help='The name of your Mwaa environment')
    parser.add_argument('-c', '--command', required=True, help='The name of the Apache Airflow command you want to run.')
    args = parser.parse_args()
    return args


def main():
    args = parse_args()

    print("Connecting to MWaa environment" + ": " + args.env)
    client = boto3.client('mwaa')
    response = client.create_cli_token(Name=str(args.env))

    print("Using this command" + ": " + args.command)

    auth_token=response.get('CliToken')
    hed = {'Content-Type': 'text/plain', 'Authorization': 'Bearer ' + auth_token}
    data = str(args.command)
    url = 'https://{web_server}/aws_mwaa/cli'.format(web_server=response.get('WebServerHostname'))
    r = requests.post(url, data=data, headers=hed)
    print_output(r)


def print_output(r):
    output = base64.b64decode(r.json()['stdout']).decode('utf8')
    print(output)

if __name__ == '__main__':
    main()

进入全屏模式 退出全屏模式

当您使用如下示例输入运行此程序时(我已将我的脚本称为 mwaa-python.py,但您可以将脚本创建为您喜欢的任何名称,并将 {environment 更改为您自己的 MWAA 环境名称):

python mwaa-eg-python.py -e {environment} -c version

进入全屏模式 退出全屏模式

你应该得到类似的东西:

Connecting to MWaa environment: ricsue-dublin
Using this command: version
1.10.12

进入全屏模式 退出全屏模式

您将在支持这些帖子的存储库中找到这些脚本。

通过 AWS Lambda 触发

接下来你可以做的是获取这样的代码,然后通过 AWS Lambda 配置一个函数,以潜在地集成事件源以触发某些 DAG(例如,Amazon S3 存储桶中的新数据删除)

这些是我设置它所遵循的步骤。

[1] 在 AWS Lambda (Python/3.7) 中创建一个新的 Python 函数。您可以使用基本执行角色,但随后需要创建 IAM 策略并将其附加到此函数,以允许它创建身份验证令牌。这就是我的样子(为我的环境更改了 { })

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "airflow:CreateCliToken",
            "Resource": [
                "arn:aws:airflow:{region}:{aws-account-no}:environment/{name of your env}"
            ]
        }
    ]
}

进入全屏模式 退出全屏模式

对于函数的代码,我使用了与上面相同的示例,但将其更改为 AWS Lambda 所期望的:

import json
import requests
import boto3
import base64

print('Loading function')


def lambda_handler(event, context):

    print("Connecting to MWaa environment" + ": " +  event['environment'])
    client = boto3.client('mwaa')
    response = client.create_cli_token(Name=str( event['environment']))

    print("Using this command" + ": " +  event['command'])

    auth_token=response.get('CliToken')
    hed = {'Content-Type': 'text/plain', 'Authorization': 'Bearer ' + auth_token}
    data = str(args.command)
    url = 'https://{web_server}/aws_mwaa/cli'.format(web_server=response.get('WebServerHostname'))
    r = requests.post(url, data=data, headers=hed)
    output = base64.b64decode(r.json()['stdout']).decode('utf8')
    print(output)

    return event['environment']  # Echo back the first key value

进入全屏模式 退出全屏模式

[2] 您将需要创建一个 AWS Lambda 层,其中包含此函数使用的库(boto3、请求),因此我使用了以下过程(但请随意使用您自己的如果您熟悉构建图层,请执行此操作):

  • 创建和设置 AWS Cloud9 IDE

  • 启动时,创建一个名为 mwaa-layers 的新文件夹

  • 从此目录中,使用命令“pip install requests -t ./python”和“pip install boto3 -t ./python”将使用 pip 下载 python 库文件

  • 使用命令“zip -r mwaa-layer.zip python”打包 zip 文件,这将创建一个名为 mwaa-layer.zip 的 zip 文件,它将您下载的所有库打包到 Python 文件夹中

  • 使用命令“aws lambda publish-layer-version --layer-name mwaa-dag-layer --zip-file fileb://mwaa-dag.zip --compatible-runtimes python3.7”然后创建层.

  • 您现在可以将此层附加到您在第一步中创建的函数。

[3] 您现在可以创建测试 json 文件来检查此功能是否有效。使用 AWS Lambda 控制台,按以下格式创建一个新的测试操作:

{
  "environment": "{your environment}",
  "command": "{airflow command}"
}

进入全屏模式 退出全屏模式

所以我的看起来像:

{
  "environment": "ricsue-dublin",
  "command": "version"
}

进入全屏模式 退出全屏模式

[4] 您现在可以通过“测试”按钮调用函数并使用刚刚创建的测试操作来测试您的函数。您应该看到如下所示的输出:

START RequestId: 2f9b423e-1097-4e7f-bbc5-b1ff394c504e Version: $LATEST
Connecting to MWaa environment: ricsue-dublin
Using this command: version
1.10.12

END RequestId: 2f9b423e-1097-4e7f-bbc5-b1ff394c504e
REPORT RequestId: 2f9b423e-1097-4e7f-bbc5-b1ff394c504e  Duration: 2717.00 ms    Billed Duration: 2717 ms    Memory Size: 128 MB Max Memory Used: 36 MB  

进入全屏模式 退出全屏模式

笔记!由于我们使用默认值,如果您的执行超时,请再试一次或只是增加函数超时时间。

结论

第四篇文章到此结束。您现在将了解如何通过命令行以编程方式与 Apache Airflow 进行交互,以及在哪里可以找到有关支持和不支持的信息的位置。

请务必查看 MWAA 文档](https://docs.aws.amazon.com/mwaa/latest/userguide/access-airflow-ui.html)站点上的[此页面,该站点提供了有关此主题的更多详细信息。如果有您需要并且缺少的功能,请与我们联系。我们的驱动力是对客户的痴迷。

在下一篇文章中,我们将着眼于设置一个简单的 CI/CD 管道,它可以让您简化工作流程开发生命周期的生活。

如果有具体想看的内容,请联系。

Logo

ModelScope旨在打造下一代开源的模型即服务共享平台,为泛AI开发者提供灵活、易用、低成本的一站式模型服务产品,让模型应用更简单!

更多推荐