使用 Apache Airflow 编排混合工作流程

在最近与客户的一些讨论中,开源如何越来越多地被用作一种通用机制来帮助构建可重用的解决方案,这些解决方案可以保护工程和开发时间、技能以及跨本地和云环境工作的投资。在 2021 年,我的帖子谈到了如何在任何地方(AWS、您的数据中心、其他云)和任何东西(英特尔和 Arm)上构建和部署容器化应用程序。我想结合从那篇文章(和代码)中学到的知识,并将其应用于我一直深入研究的另一个主题,Apache Airflow。我想探索如何将两者结合起来,看看如何开始构建跨混合架构无缝工作的数据管道。

用例

那么我们为什么要这样做呢?我可以看到许多现实世界的应用程序,但对我来说突出的两个是:

  • ,您希望在数据分析管道中利用和使用现有的遗留/传统系统

  • 当地法规和合规性对可以处理数据的位置进行额外控制

在这篇文章中,我将展示如何解决这两个用例,将开源技术与许多 AWS 产品和服务相结合,使您能够使用 Apache Airflow 跨异构环境编排工作流。在此演示中,我想向您展示如何编排数据管道以跨所有基于云和非云的数据孤岛构建集中式数据湖,尊重您可能拥有的本地处理和控制。

这就是我们最终要构建的。

[架构图](https://res.cloudinary.com/practicaldev/image/fetch/s--dQW6swXn--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://raw.githubusercontent. com/94459/blogpost-airflow-hybrid/main/images/ricsue-airflow-hybrid.png)

与往常一样,您可以在此 GitHub 存储库中找到此演练的代码,blogpost-airflow-hybrid

方法

首先,我们需要创建我们的演示客户数据。我使用Mockaroo并发现生成示例数据非常直观,然后使用它来设置在 MySQL 上运行的示例客户数据库。我不打算介绍如何设置,但我已经在 repo 中包含了数据脚本,如果你想自己设置,我在这篇博客的末尾有一个部分我会分享我的设置。演示将有两个 MySQL 数据库正在运行,具有相同的数据库架构但具有不同的数据。一个将在 AWS 的 Amazon RDS MySQL 实例上运行,另一个将在 Home HQ 的本地 Ubuntu 机器上运行。

这个演示/演练的目标是编排一个工作流程,使我们能够根据一些特定的标准(例如模拟一些监管机构的控制)从这些数据库中进行批量提取,并将它们上传到我们的数据湖中。我们将为此使用 Amazon S3 存储桶,因为这是一种常见的场景。

我将采用的方法是创建一个 Apache Airflow 工作流 (DAG) 并利用 Apache Airflow 运算符 ECSOperator,它允许我们启动基于容器的图像。我们启动的基于容器的图像将包含我们的 ETL 代码,这将被参数化,以便我们可以多次重复使用它,通过在启动期间提供参数来更改行为(例如,不同的 SQL 查询)。最后,我们将使用 ECS Anywhere,它使用开源amazon-ecs-agent来简化我们如何在任何地方(在 AWS、本地或其他云中)运行容器。

为了使这篇博文更容易理解,我将把它分解成不同的任务。首先,创建我们可以用来提取数据的 ETL 容器。之后我将展示我们如何通过 Amazon ECS 运行它,然后介绍部署 ECS Anywhere 的过程,以便我们可以在 Amazon EC2 实例(云)和我不起眼的(非常老的)Ubuntu 18.04 机器上运行它(on-前)。一切正常后,我们将切换到 Apache Airflow 并创建 DAG 将这些部分组合在一起并创建一个工作流,该工作流将定义和启动这些容器以运行我们的 ETL 脚本并将我们的数据上传到我们的数据湖中。

是时候开始了。

先决条件

如果您想跟随,那么您将需要以下内容:

  • 具有正确权限级别的 AWS 账户以及在本地设置和运行的 AWS cli

  • AWS CDK 安装和配置(最新版本,v1 或 v2)

  • Docker 和 Docker Compose 以便能够构建您的容器映像并在本地进行测试

  • 访问支持 Apache Airflow 的托管工作流的 AWS 区域 - 我将使用 eu-west-2(伦敦)

  • 已设置适用于 Apache Airflow 的 Amazon Managed Workflows 环境。你可以关注之前的一篇文章,我已经包含了关于如何在 repo 中构建该环境的 AWS CDK 代码

  • MySQL 数据库运行一个数据库和你可以查询的数据——它不必和我用的一样,你可以用你自己的。我已经提供了 sql 客户虚拟数据,如果你自己尝试这个演示,你想使用它。他们在这里的关键是你有一个在本地运行,一个在云中运行。我正在使用 Amazon RDS MySQL,但如果您愿意,您可以轻松地以不同的方式运行 MySQL。我在博文末尾提供了一些有关如何设置的详细信息。

  • AWS Secrets 配置为包含 MySQL 数据库的连接详细信息

根据我在 AWS 账单控制台上的信息,运行 24 小时的 AWS 成本约为 45 美元。

创建我们的 ETL 容器

创建我们的 ETL 脚本

首先,我们需要创建能够实现 ETL 魔法的 Python 代码。为简单起见,我创建了一个简单的脚本,该脚本接受一些参数,然后运行查询,并将结果上传到 Amazon S3。

读取数据-q.py

from copy import copy
from mysql.connector import MySQLConnection, Error
from python_mysql_dbconfig import read_db_config
import sys
import csv
import boto3
import json
import socket
def query_with_fetchone(query2run,secret,region):
    try:
        # Grab MySQL connection and database settings. We areusing AWS Secrets Manager 
        # but you could use another service like Hashicorp Vault
        # We cannot use Apache Airflow to store these as this script runs stand alone
        secret_name = secret
        region_name = region
        session = boto3.session.Session()
        client = session.client(
            service_name='secretsmanager',
            region_name=region_name
        )
        get_secret_value_response = client.get_secret_value(SecretId=secret_name)
        info=json.loads(get_secret_value_response['SecretString'])
        pw=info['password']
        un=info['username']
        hs=info['host']
        db=info['database']
        # Output to the log so we can see and confirm WHERE we are running and WHAT
        # we are connecting to

        print("Connecting to ",str(hs)," database ", str(db), " as user ", str(un))
        print("Database host IP is :", socket.gethostbyname(hs))
        print("Source IP is ", socket.gethostname())

        conn = MySQLConnection(user=un, password=pw, host=hs, database=db)
        cursor = conn.cursor()
        query=query2run
        print("Query is", str(query))
        cursor.execute(query)
        records = cursor.fetchall()
        c = csv.writer(open("temp.csv","w"))
        c.writerows(records)
        print("Records exported:")
        for row in records:
            print(row[0],",",row[1],",",row[2],",",row[3],",",row[4],",",row[5], ",",row[6],",",row[7] )

    except Error as e:
        print(e)

    finally:
        cursor.close()
        conn.close()
def upload_to_s3(s3bucket,s3folder,region):
    # We will upload the temp (temp.csv) file and copy it based on the input params of the script (bucket and dir/file)
    try:
        s3 = boto3.client('s3', region_name=region)
        s3.upload_file('temp.csv',s3bucket,s3folder)
    except FileNotFoundError:
        print("The file was not found")
        return False
    except Error as e:
        print(e)

if __name__ == '__main__':
    try:
        arg = sys.argv[2]
    except IndexError:
        raise SystemExit(f"Usage: {sys.argv[0]} <s3 bucket><s3 file><query><secret><region>")
    # The script needs the following arguments to run
    # 1. Target S3 bucket where the output of the SQL script will be copied
    # 2. Target S3 folder/filename 
    # 3. The query to execute
    # 4. The parameter store (we use AWS Secrets) which holds the values on where to find the MySQL database
    # 5. The AWS region
    s3bucket=sys.argv[1]
    s3folder=sys.argv[2]
    query2run=sys.argv[3]
    secret=sys.argv[4]
    region=sys.argv[5]
    query_with_fetchone(query2run,secret,region)
    upload_to_s3(s3bucket,s3folder,region)

进入全屏模式 退出全屏模式

AWS 秘密

这个脚本接受了一些参数(上面的脚本中记录了),你可能会看到第四个参数是一个我们还没有提到的值,即参数存储。当提出这样的解决方案时,我们需要一个中央存储,我们的容器将能够以安全的方式访问一些关键信息,无论它们可能在哪里运行。使用这种方法时在文件中包含凭证信息感觉是错误的(这是一种选择,如果您愿意,可以使用 repo 中的代码来实现)。我决定使用 AWS Secrets 来存储 MySQL 数据库的凭证和连接详细信息。您当然可以使用 HashiCorp 的 Vault 等其他服务来做类似的事情。我们所需要的只是一种能够 1/存储重要凭据,2/在容器需要运行的任何地方访问它们的方法。

我们的脚本需要四条信息:数据库主机、数据库名称、连接数据库的用户和用户密码。

如下创建一个 json 文件(我称为 tmp-elt.json),然后更改您需要的值:

{
      "username": "app",
      "password": "{secure password}",
      "host": "localmysql-airflow-hybrid",
      "database" : "localdemo"
}

进入全屏模式 退出全屏模式

aws secretsmanager create-secret --name localmysql-airflow-hybrid --description "Used by Apache Airflow ELT container" --secret-string file://tmp-elt.json --region={your region}

进入全屏模式 退出全屏模式

这给了我这个输出。

{
    "ARN": "arn:aws:secretsmanager:eu-west-2:xxxxx:secret:localmysql-airflow-hybrid-XXXXXX",
    "Name": "localmysql-airflow-hybrid",
    "VersionId": "45ff7ccf-2b10-4c0e-bbbc-9fb161a65ecd"
}

进入全屏模式 退出全屏模式

我们可以通过尝试检索秘密来检查它是否存储了所有内容。

**注意!**这将显示秘密值,因此在使用此命令时要小心,以确保您不会意外泄露这些值。

aws secretsmanager get-secret-value --secret-id localmysql-airflow-hybrid --region={your region}

进入全屏模式 退出全屏模式

如果成功,您将看到自己的价值观。我们创建的 Python 脚本与此步骤相同,只是它使用 boto3 库来与这些值交互并获取这些值。

您现在应该已经为您的 MySQL 数据库定义了 AWS Secrets。在我的具体情况下,我创建的两个是:

  • localmysql-airflow-hybrid - 配置为通过 localmysql.beachgeek.co.uk 主机在我的“本地”网络上查找数据库

  • rds-airflow-hybrid - 配置为指向我在 Amazon RDS 中配置的 MySQL 数据库

我们现在已准备好连接到这些数据库并开始提取一些有价值的见解......

测试我们的脚本

为了确保在打包和容器化之前一切正常,我们可以从命令行运行它。您可以运行以下命令:

(来自码头文件夹)

python app/read-data-q.py

进入全屏模式 退出全屏模式

这给了我们这个信息性消息

Usage: app/read-data-q.py <s3 bucket><s3 file><query><secret><region>

进入全屏模式 退出全屏模式

这是意料之中的,因为我们没有提供任何论据。现在我们可以尝试一些参数。

python app/read-data-q.py ricsue-airflow-hybrid customer/regional-001.csv "select * from customers WHERE country = 'Spain' " rds-airflow-hybrid eu-west-2

进入全屏模式 退出全屏模式

并且应该发生几件事。首先,您应该看到如下内容:

Connecting to  demords.cidws7o4yy7e.eu-west-2.rds.amazonaws.com  database  demo  as user  admin
Database host IP is : 18.169.169.151
Source IP is  a483e7ab1cb5.ant.amazon.com
Query is select * from customers WHERE country = 'Spain'
Records exported:
171 , 2021-06-21 18:24:25 , Wiatt , Revell , wrevell4q@umn.edu , Female , 26.6.23.83 , Spain
632 , 2021-11-14 18:44:25 , Sheppard , Rylett , sryletthj@java.com , Genderfluid , 50.1.207.70 , Spain
783 , 2021-03-29 18:05:12 , Sloane , Maylour , smaylourlq@1und1.de , Female , 194.84.247.201 , Spain

进入全屏模式 退出全屏模式

第二个是,如果您转到您的 Amazon S3 存储桶,您现在应该有一个名为(使用上面的示例)customer/regional-001.csv 的文件夹和文件。

假设你成功了,我们现在准备进入下一阶段,将其打包,以便我们可以通过容器运行它。

容器化我们的 ETL 脚本

大多数组织都有自己的容器化应用程序工作流程,因此如果您愿意,可以随意采用/使用这些工作流程。

为了容器化我们的应用程序,我们需要创建一个 Docker 文件。当我们使用 Python 时,我将为这个容器选择一个 Python 基础镜像 (public.ecr.aws/docker/library/python:latest)。

我们在上一步中创建的脚本位于 app 文件夹中,因此我们使用 WORKDIR 进行设置。然后我们复制并运行 PIP 以安装我们的 Python 依赖项(mysql-connector-python 和 boto3)。然后我们复制文件并指定我们希望这个容器在运行时如何启动(执行“python3 app/read-data.py”)

FROM  public.ecr.aws/docker/library/python:latest

WORKDIR /app

COPY requirements.txt requirements.txt
RUN pip3 install -r requirements.txt

COPY . .

CMD [ "python3", "app/read-data.py"]

进入全屏模式 退出全屏模式

要构建/打包我们的容器,我们需要 1/ 使用 docker build 构建我们的镜像,2/ 标记镜像,然后 3/ 将镜像推送到容器存储库。为了简化这部分过程,您将找到一个脚本 (setup.sh),它采用我们的 Python 脚本并创建我们的容器存储库(在 Amazon ECR 中)、构建并标记,然后再将图像推送到该存储库。

在运行脚本之前,您只需更改变量 AWS_DEFAULT_REGION、AWS_ACCOUNT,然后如果您想根据自己的目的进行自定义,请更改 AWS_ECR_REPO 和 COMMIT_HASH。

#!/usr/bin/env bash

# Change these values for your own environment
# it should match what values you use in the CDK app
# if you are using this script together to deploy
# the multi-arch demo

AWS_DEFAULT_REGION=eu-west-2
AWS_ACCOUNT=704533066374
AWS_ECR_REPO=hybrid-airflow
COMMIT_HASH="airflw"

# You can alter these values, but the defaults will work for any environment

IMAGE_TAG=${COMMIT_HASH:=latest}
AMD_TAG=${COMMIT_HASH}-amd64
DOCKER_CLI_EXPERIMENTAL=enabled
REPOSITORY_URI=$AWS_ACCOUNT.dkr.ecr.$AWS_DEFAULT_REGION.amazonaws.com/$AWS_ECR_REPO

# Login to ECR

$(aws ecr get-login --region $AWS_DEFAULT_REGION --no-include-email)

# create AWS ECR Repo

if (aws ecr describe-repositories --repository-names $AWS_ECR_REPO ) then
    echo "Skipping the create repo as already exists"
else
    echo "Creating repos as it does not exists"
    aws ecr create-repository --region $AWS_DEFAULT_REGION --repository-name $AWS_ECR_REPO
fi

# Build initial image and upload to ECR Repo

docker build -t $REPOSITORY_URI:latest .
docker tag $REPOSITORY_URI:latest $REPOSITORY_URI:$AMD_TAG
docker push $REPOSITORY_URI:$AMD_TAG

# Create the image manifests and upload to ECR

docker manifest create $REPOSITORY_URI:$COMMIT_HASH $REPOSITORY_URI:$AMD_TAG
docker manifest annotate --arch amd64 $REPOSITORY_URI:$COMMIT_HASH $REPOSITORY_URI:$AMD_TAG
docker manifest inspect $REPOSITORY_URI:$COMMIT_HASH
docker manifest push $REPOSITORY_URI:$COMMIT_HASH

进入全屏模式 退出全屏模式

运行脚本时,您应该会看到类似于以下内容的输出。通过我的家庭宽带连接,这大约需要 10-15 分钟才能完成,因此所需的时间将取决于您的互联网速度(上传)有多好:

Login Succeeded

An error occurred (RepositoryNotFoundException) when calling the DescribeRepositories operation: The repository with name 'hybrid-airflow' does not exist in the registry with id '704533066374'
Creating repos as it does not exists
{
    "repository": {
        "repositoryArn": "arn:aws:ecr:eu-west-1:704533066374:repository/hybrid-airflow",
        "registryId": "704533066374",
        "repositoryName": "hybrid-airflow",
        "repositoryUri": "704533066374.dkr.ecr.eu-west-1.amazonaws.com/hybrid-airflow",
        "createdAt": 1646401450.0,
        "imageTagMutability": "MUTABLE",
        "imageScanningConfiguration": {
            "scanOnPush": false
        },
        "encryptionConfiguration": {
            "encryptionType": "AES256"
        }
    }
}
[+] Building 1.1s (10/10) FINISHED
 => [internal] load build definition from Dockerfile                                                                         0.2s
 => => transferring dockerfile: 43B                                                                                          0.0s
 => [internal] load .dockerignore                                                                                            0.1s
 => => transferring context: 2B                                                                                              0.0s
 => [internal] load metadata for public.ecr.aws/docker/library/python:latest                                                 0.0s
 => [internal] load build context                                                                                            0.1s
 => => transferring context: 6.51kB                                                                                          0.0s
 => [1/5] FROM public.ecr.aws/docker/library/python:latest                                                                   0.0s
 => CACHED [2/5] WORKDIR /app                                                                                                0.0s
 => CACHED [3/5] COPY requirements.txt requirements.txt                                                                      0.0s
 => CACHED [4/5] RUN pip3 install -r requirements.txt                                                                        0.0s
 => [5/5] COPY . .                                                                                                           0.3s
 => exporting to image                                                                                                       0.2s
 => => exporting layers                                                                                                      0.1s
 => => writing image sha256:3a8c17fca355a08ac805d91484b2c40a221e66538bde05d541ce4c63889a420f                                 0.0s
 => => naming to 704533066374.dkr.ecr.eu-west-1.amazonaws.com/hybrid-airflow:latest                                          0.0s

Use 'docker scan' to run Snyk tests against images to find vulnerabilities and learn how to fix them
The push refers to repository [704533066374.dkr.ecr.eu-west-1.amazonaws.com/hybrid-airflow]
79b32f350172: Pushed
d2495859a48d: Pushing [=>                                                 ]  6.078MB/236.4MB
251a457ff355: Pushed
0ff6b99009cb: Pushed
6da81147b608: Pushed
3a12cc54983b: Pushed
9018eb0987e9: Pushing [=========>                                         ]  11.09MB/56.36MB
ccfb32241621: Pushing [=================================================> ]  18.36MB/18.48MB
204e42b3d47b: Pushing [=>                                                 ]  14.71MB/528.4MB
613ab28cf833: Pushing [==>                                                ]  7.124MB/151.9MB
bed676ceab7a: Waiting
6398d5cccd2c: Waiting
0b0f2f2f5279: Waiting
..
..

airflw-amd64: digest: sha256:4012f734f03c125e2dfd97977aafba07232bd3e91936759b6045f764bc948d15 size: 3052
Created manifest list 704533066374.dkr.ecr.eu-west-2.amazonaws.com/hybrid-airflow:airflw
{
   "schemaVersion": 2,
   "mediaType": "application/vnd.docker.distribution.manifest.list.v2+json",
   "manifests": [
      {
         "mediaType": "application/vnd.docker.distribution.manifest.v2+json",
         "size": 3052,
         "digest": "sha256:4012f734f03c125e2dfd97977aafba07232bd3e91936759b6045f764bc948d15",
         "platform": {
            "architecture": "amd64",
            "os": "linux"
         }
      }
   ]
}
sha256:9b97129b993f94ffb9699a2ea75cce9dae6c541a3f81bc5433c12f6825d71a18

进入全屏模式 退出全屏模式

如果您打开 Amazon ECR 控制台,您现在应该会看到您的新容器存储库和映像。容器映像现在应该可以通过以下资源 uri 获得

704533066374.dkr.ecr.eu-west-2.amazonaws.com/hybrid-airflow:airflw

进入全屏模式 退出全屏模式

测试我们的容器化 ETL 脚本

现在我们已经将这个脚本容器化了,让我们看看它是如何工作的。首先我们可以尝试以下

docker run 704533066374.dkr.ecr.eu-west-2.amazonaws.com/hybrid-airflow:airflw

进入全屏模式 退出全屏模式

哪个应该为您提供以下输出

Unable to find image '704533066374.dkr.ecr.eu-west-2.amazonaws.com/hybrid-airflow:airflw' locally
airflw: Pulling from hybrid-airflow
Digest: sha256:9b97129b993f94ffb9699a2ea75cce9dae6c541a3f81bc5433c12f6825d71a18
Status: Downloaded newer image for 704533066374.dkr.ecr.eu-west-2.amazonaws.com/hybrid-airflow:airflw
Usage: app/read-data-q.py <s3 bucket><s3 file><query><secret><region>

进入全屏模式 退出全屏模式

注意! 如果您收到如下错误:

docker:来自守护程序的错误响应:70453366374.dkr.ecr.eu-west-2.amazonaws.com/hybrid-airflow 的请求访问被拒绝,存储库不存在或可能需要“docker login”:拒绝:您的授权令牌已已到期。重新验证并重试。

然后您可以重新登录到您的 Amazon ECR 环境。这是我使用的命令,您的命令会根据您所在的地区而有所不同

$(aws ecr get-login --region eu-west-2 --no-include-email)

我们知道这会产生错误,因为我们没有提供正确的参数,但我们可以看到它提供了预期的行为。现在让我们试试这个命令:

docker run 704533066374.dkr.ecr.eu-west-2.amazonaws.com/hybrid-airflow:airflw ricsue-airflow-hybrid customer/regional-001.csv "select * from customers WHERE country = 'Spain' " rds-airflow-hybrid eu-west-2

进入全屏模式 退出全屏模式

这次我得到一个不同的错误。错误的结尾部分为我们提供了一些线索..

...
...
botocore.exceptions.NoCredentialsError: Unable to locate credentials

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/app/app/read-data-q.py", line 79, in <module>
    query_with_fetchone(query2run,secret,region)
  File "/app/app/read-data-q.py", line 50, in query_with_fetchone cursor.close()
UnboundLocalError: local variable 'cursor' referenced before assignment

进入全屏模式 退出全屏模式

错误的原因是该容器没有用于与 AWS 交互的 AWS 凭证。我们可以很容易地解决这个问题。

我们可以在我们的容器中添加我们的 AWS 凭证,但我不建议这样做,而且这样做通常是一个非常糟糕的主意。相反,我们可以创建两个环境变量(AWS_ACCESS_KEY_ID 和 AWS_SECRET_ACCESS_KEY),当我们将容器作为环境变量运行时,我们会将它们传递给 Docker。

注意 请不要公开披露这些值,或将它们存储在其他人可能复制它们的地方。

export AWS_ACCESS_KEY_ID=XXXXXXXXX
export AWS_SECRET_ACCESS_KEY=XXXXXXX

进入全屏模式 退出全屏模式

我们现在可以稍微改变一下我们的 Docker 运行命令,然后再试一次

docker run -e AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID -e AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY 704533066374.dkr.ecr.eu-west-2.amazonaws.com/hybrid-airflow:airflw ricsue-airflow-hybrid customer/regional-001.csv "select * from customers WHERE country = 'Spain' " rds-airflow-hybrid eu-west-2

进入全屏模式 退出全屏模式

而这一次,成功!

Connecting to  demords.cidws7o4yy7e.eu-west-2.rds.amazonaws.com  database  demo  as user  admin
Database host IP is : 18.169.169.151
Source IP is  e8d3ffa8e4a2
Query is select * from customers WHERE country = 'Spain'
Records exported:
171 , 2021-06-21 18:24:25 , Wiatt , Revell , wrevell4q@umn.edu , Female , 26.6.23.83 , Spain
632 , 2021-11-14 18:44:25 , Sheppard , Rylett , sryletthj@java.com , Genderfluid , 50.1.207.70 , Spain
783 , 2021-03-29 18:05:12 , Sloane , Maylour , smaylourlq@1und1.de , Female , 194.84.247.201 , Spain

进入全屏模式 退出全屏模式

对于目光敏锐的人来说,您可以看到输出是相同的(这是再保证),但源 IP 是不同的。这是因为这一次,源主机是运行此脚本的 Docker 容器。

对运行类似数据集的本地 MySQL 服务器进行测试怎么样?我运行这个命令,唯一的区别是 AWS Secret,它决定了要尝试连接的 MySQL 数据库。

docker run -e AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID -e AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY 704533066374.dkr.ecr.eu-west-2.amazonaws.com/hybrid-airflow:airflw ricsue-airflow-hybrid customer/regional-002.csv "select * from customers WHERE country = 'Spain' " localmysql-airflow-hybrid eu-west-2

进入全屏模式 退出全屏模式

我们得到一个错误。

Traceback (most recent call last):
  File "/app/app/read-data-q.py", line 31, in query_with_fetchone
    print("Database host IP is :", socket.gethostbyname(hs))
socket.gaierror: [Errno -2] Name or service not known

During handling of the above exception, another exception occurred:

进入全屏模式 退出全屏模式

这是可以预料的。在本演练开始时,我解释说我们使用 /etc/hosts 来解析数据库主机。 Docker 再次允许我们使用命令行选项启用基于主机的查找, --networku003dhost 以便我们可以使用以下命令重试:

docker run --network=host -e AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID -e AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY 704533066374.dkr.ecr.eu-west-2.amazonaws.com/hybrid-airflow:airflw ricsue-airflow-hybrid customer/regional-002.csv "select * from customers WHERE country = 'Spain' " localmysql-airflow-hybrid eu-west-2

进入全屏模式 退出全屏模式

成功!这次我们得到了一组不同的数据(两个区域的示例数据不一样),我们可以看到连接详细信息已更新,以反映我们正在访问本地数据库。在现实世界的场景中,这些将是您用来连接到资源的本地 DNS 可解析地址。

Connecting to  localmysql.beachgeek.co.uk  database  localdemo  as user  root
Database host IP is : 127.0.1.1
Source IP is  28d3faa8e331
Query is select * from customers WHERE country = 'Spain'
Records exported:
164 , 2021-04-02 , Dag , Delacourt , ddelacourt4j@nydailynews.com , Male , 200.12.59.159 , Spain

进入全屏模式 退出全屏模式

回顾一下到目前为止我们所做的是将我们的 ETL 脚本容器化,并成功地针对 MySQL 数据库的本地和远程实例运行它。下一阶段是将这些转移到容器编排器上。我将使用 Amazon ECS,这是我最喜欢的运行容器应用程序的方式。好吧,让我们直接开始吧。

在 Amazon ECS 上运行我们的 ETL 容器

创建我们的 ECS 集群

我们现在要在 AWS 上设置一个 Amazon ECS 集群,与我们目前工作的区域相同。我们可以手动执行此操作,但为了简化这部分设置,您将在 GitHub 存储库中找到 cdk/cdk-ecs 文件夹,其中包含将部署和配置 Amazon ECS 集群的 CDK 堆栈,然后创建 ECS 任务定义将包含我们上面所做的 ETL 容器的所有位。我已经在 v1 和 v2 CDK 上对此进行了测试,并且两者都可以正常工作。这将创建一个 VPC 并在该集群中部署一个新的 ECS 集群和一个 ECS 节点。

我们首先需要更新 app.py 并更新一些属性。您需要为自己的环境更新 AWS 和区域账户。 ecr-repo 和 image-tag 的值需要与我们在设置 Amazon ECR 存储库时使用的值相同(之前的步骤,我们在其中运行 setup.sh 脚本 - 使用的默认值与此处使用的相同)。更新后保存文件。

s3 的值是将文件上传到的目标 Amazon S3 存储桶。这用于在此存储桶中创建复制文件的权限。如果您设置不正确,ETL 脚本将由于缺少权限而无法上传。

from aws_cdk import core

from ecs_anywhere.ecs_anywhere_vpc import EcsAnywhereVPCStack
from ecs_anywhere.ecs_anywhere_taskdef import EcsAnywhereTaskDefStack

env_EU=core.Environment(region="eu-west-2", account="704533066374")
props = {
    'ecsclustername':'hybrid-airflow',
    'ecr-repo': 'hybrid-airflow',
    'image-tag' : 'airflw',
    'awsvpccidr':'10.0.0.0/16',
    's3':'ricsue-airflow-hybrid'
    }

app = core.App()

mydc_vpc = EcsAnywhereVPCStack(
    scope=app,
    id="ecs-anywhere-vpc",
    env=env_EU,
    props=props
)

mydc_ecs_cicd = EcsAnywhereTaskDefStack(
    scope=app,
    id="ecs-anywhere-taskdef",
    env=env_EU,
    vpc=mydc_vpc.vpc,
    props=props  
)

app.synth()

进入全屏模式 退出全屏模式

在命令行中,当您键入:

cdk ls

进入全屏模式 退出全屏模式

你应该得到如下内容:

ecs-anywhere-vpc
ecs-anywhere-taskdef

进入全屏模式 退出全屏模式

我们通过运行以下命令创建我们的 Amazon ECS VPC:

cdk deploy ecs-anywhere-vpc

进入全屏模式 退出全屏模式

注意! 通常一个 AWS 账户限制为 5 个 VPC,因此请检查您的 AWS 账户以查看在运行此之前配置了多少,否则运行此堆栈将产生错误。

完成后,我们可以通过运行以下命令部署我们的 Amazon ECS 集群:

cdk deploy  ecs-anywhere-taskdef

进入全屏模式 退出全屏模式

(对您可能得到的任何提示回答 Y)这应该会产生类似于以下内容的输出:

 ✅  ecs-anywhere-taskdef

Outputs:
ecs-anywhere-taskdef.ECSClusterName = hybrid-airflow-cluster
ecs-anywhere-taskdef.ECSRoleName = hybrid-airflow-ECSInstanceRole
ecs-anywhere-taskdef.ECSAnywhereRoleName = hybrid-airflow-ExternalECSAnywhereRole


Stack ARN:
arn:aws:cloudformation:eu-west-2:704533066374:stack/ecs-anywhere-taskdef/28b05150-9b9c-11ec-a2db-02449e7c3502

进入全屏模式 退出全屏模式

我们将需要此 CDK 应用程序输出的一些值,因此请记下这些值。

我们可以通过转到 AWS 控制台并查看 Amazon ECS 控制台来验证一切是否已设置。或者,我们可以运行以下命令:

aws ecs list-clusters --region={your region}

进入全屏模式 退出全屏模式

应该显示如下内容(取决于您可能已经设置了多少其他 ECS 集群),我们可以看到我们有了新的 ECS 集群 - 名称在 ecsclustername 的 app.py 中的变量中定义(“hybrid -airflow") 后跟“-cluster”。

{
    "clusterArns": [
        "arn:aws:ecs:eu-west-2:704533066374:cluster/hybrid-airflow-cluster"
    ]
}

进入全屏模式 退出全屏模式

探索 ECS CDK 堆栈

在继续之前,有必要通过探索 CDK 文件来了解我们刚刚所做的事情。部署此 ECS 集群并配置我们的任务定义(将运行我们的应用程序)的 CDK 文件是“ecs_anywhere_taskdef.py”。正如我们从文件的开头看到的那样,我们从 app.py(“props”)导入值,我们将使用这些值来定义 ECS 集群的名称、容器镜像、权限等。

class EcsAnywhereTaskDefStack(core.Stack):

    def __init__(self, scope: core.Construct, id: str, vpc, props, **kwargs) -> None:
        super().__init__(scope, id, **kwargs)

进入全屏模式 退出全屏模式

我们通过引用容器存储库(这里我们使用 Amazon ECR,但您可以使用其他),为我们的 ETL 应用程序的容器映像创建变量。

        airflow_repo = ecr.Repository.from_repository_name(self, "Hybrid-ELT-Repo", repository_name=f"{props['ecr-repo']}")
        airflow_image = ecs.ContainerImage.from_ecr_repository(airflow_repo, f"{props['image-tag']}")

进入全屏模式 退出全屏模式

当我们创建 ECS 集群时,它将创建一个 EC2 实例,该实例将需要一个 IAM 角色来为其提供所需的权限。这是 ECS 任务执行角色。这里我们只需要定义它并给它起一个名字“role_name”

        ecscluster_role = iam.Role(
            self,
            f"{props['ecsclustername']}-ecsrole",
            role_name=f"{props['ecsclustername']}-ECSInstanceRole",
            assumed_by=iam.ServicePrincipal("ssm.amazonaws.com"),
            managed_policies=[iam.ManagedPolicy.from_aws_managed_policy_name("AmazonSSMManagedInstanceCore")]
        )
        ecscluster_role.add_managed_policy(iam.ManagedPolicy.from_aws_managed_policy_name("service-role/AmazonEC2ContainerServiceforEC2Role"))

进入全屏模式 退出全屏模式

然后我们创建我们的 ECS 集群,为其命名,然后使用我们刚刚创建的 IAM 角色预置一些 EC2 资源

        ecscluster = ecs.Cluster(
            self,
            f"{props['ecsclustername']}-ecscluster",
            cluster_name=f"{props['ecsclustername']}-cluster",
            vpc=vpc
        )

        ecscluster.add_capacity(
            "x86AutoScalingGroup",
            instance_type=ec2.InstanceType("t2.xlarge"),
            desired_capacity=1
        )

进入全屏模式 退出全屏模式

我们现在需要为 ECS 任务定义创建一个角色。这将是我们的应用程序将使用的 IAM 角色,因此我们在这里定义范围尽可能低的不同权限。我们传入其中一些变量,例如 S3 存储桶和 ECS 集群名称。

首先我们创建策略。


                data_lake = s3.Bucket.from_bucket_name(self, "DataLake", f"{props['s3']}")
        data_lake_arn = data_lake.bucket_arn

        task_def_policy_document = iam.PolicyDocument(
            statements=[
                iam.PolicyStatement(
                    actions=[ "s3:*" ],
                    effect=iam.Effect.ALLOW,
                    resources=[
                        f"{data_lake_arn}/*",
                        f"{data_lake_arn}"
                    ],
                ),
                iam.PolicyStatement(
                    actions=[
                        "ecs:RunTask",
                        "ecs:DescribeTasks",
                        "ecs:RegisterTaskDefinition",
                        "ecs:DescribeTaskDefinition",
                        "ecs:ListTasks"
                    ],
                    effect=iam.Effect.ALLOW,
                    resources=[
                        "*"
                        ],
                    ),
                iam.PolicyStatement(
                    actions=[
                        "iam:PassRole"
                    ],
                    effect=iam.Effect.ALLOW,
                    resources=[ "*" ],
                    conditions= { "StringLike": { "iam:PassedToService": "ecs-tasks.amazonaws.com" } },
                    ),
                iam.PolicyStatement(    
                    actions=[
                        "logs:CreateLogStream",
                        "logs:CreateLogGroup",
                        "logs:PutLogEvents",
                        "logs:GetLogEvents",
                        "logs:GetLogRecord",
                        "logs:GetLogGroupFields",
                        "logs:GetQueryResults"
                    ],
                    effect=iam.Effect.ALLOW,
                    resources=[
                        f"arn:aws:logs:*:*:log-group:/ecs/{props['ecsclustername']}:log-stream:/ecs/*"
                        ]           
                )
            ]
        )

进入全屏模式 退出全屏模式

然后我们创建 IAM 角色,并附加我们可能需要的任何托管策略(在这种情况下,我们希望使用 Secrets Manager 托管策略,以便我们可以读取我们的密钥)

        task_def_policy_document_role = iam.Role(
            self,
            "ECSTaskDefRole",
            role_name=f"{props['ecsclustername']}-ECSTaskDefRole",
            assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com"),
            inline_policies={"ECSTaskDefPolicyDocument": task_def_policy_document}
        )

        managed_secret_manager_policy = iam.ManagedPolicy.from_aws_managed_policy_name("SecretsManagerReadWrite")
        task_def_policy_document_role.add_managed_policy(managed_secret_manager_policy)

进入全屏模式 退出全屏模式

最后一点是实际创建我们的任务定义,定义实际的容器映像、命令覆盖、资源等。我们还需要定义然后创建 AWS CloudWatch 日志记录组,以便我们可以在 AWS CloudWatch 中查看日志。如果需要,您可以配置其他日志记录目标。

        log_group = log.LogGroup(
            self,
            "LogGroup",
            log_group_name=f"/ecs/{props['ecsclustername']}"
        )
        ec2_task_definition = ecs.Ec2TaskDefinition(
            self,
            f"{props['ecsclustername']}-ApacheAirflowTaskDef",
            family="apache-airflow",
            network_mode=ecs.NetworkMode.HOST,
            task_role=task_def_policy_document_role
            )

        ec2_task_definition.add_container(
            "Hybrid-ELT-TaskDef",
            image=airflow_image,
            memory_limit_mib=1024,
            cpu=100,
            # Configure CloudWatch logging
            logging=ecs.LogDrivers.aws_logs(stream_prefix="ecs",log_group=log_group),
            essential=True,
            command= [ "ricsue-airflow-hybrid", "period1/hq-data.csv", "select * from customers WHERE country = \"Spain\"", "rds-airflow-hybrid", "eu-west-2" ],
            )

进入全屏模式 退出全屏模式

就是这样,一旦部署完成,大约 5 分钟后,我们就有了新的 ECS 集群,其中一个 EC2 资源启动并运行,我们的任务定义已定义并准备好运行。理论上,我们现在可以运行它,它应该和我们使用 Docker 在本地运行它时一样。

运行我们的 ELT 容器(Amazon ECS 任务定义)

从命令行,您可以通过运行以下命令来启动此任务。

export ECS_CLUSTER="hybrid-airflow-cluster"
export TASK_DEF="apache-airflow"
export DEFAULT_REGION="eu-west-2"

aws ecs run-task --cluster $ECS_CLUSTER --launch-type EC2 --task-definition $TASK_DEF --region=$DEFAULT_REGION

进入全屏模式 退出全屏模式

应该创建这样的输出:

{
    "tasks": [
        {
            "attachments": [],
            "attributes": [
                {
                    "name": "ecs.cpu-architecture",
                    "value": "x86_64"
                }
            ],
            "availabilityZone": "eu-west-2b",
            "clusterArn": "arn:aws:ecs:eu-west-2:704533066374:cluster/hybrid-airflow-cluster",
            "containerInstanceArn": "arn:aws:ecs:eu-west-2:704533066374:container-instance/hybrid-airflow-cluster/c7550837cbcb4a19a1fdcd79cb600062",
            "containers": [
                {
                    "containerArn": "arn:aws:ecs:eu-west-2:704533066374:container/hybrid-airflow-cluster/b357bc55e60b4557a2eccb3619a8ec64/a30c90c7-25f8-4091-8532-69cdf6d8c3ce",
                    "taskArn": "arn:aws:ecs:eu-west-2:704533066374:task/hybrid-airflow-cluster/b357bc55e60b4557a2eccb3619a8ec64",
                    "name": "Hybrid-ELT-TaskDef",
                    "image": "704533066374.dkr.ecr.eu-west-2.amazonaws.com/hybrid-airflow:airflw",
                    "lastStatus": "PENDING",
                    "networkInterfaces": [],
                    "cpu": "100",
                    "memory": "1024"
                }
            ],
            "cpu": "100",
            "createdAt": 1646582727.378,
            "desiredStatus": "RUNNING",
            "enableExecuteCommand": false,
            "group": "family:apache-airflow",
            "lastStatus": "PENDING",
            "launchType": "EC2",
            "memory": "1024",
            "overrides": {
                "containerOverrides": [
                    {
                        "name": "Hybrid-ELT-TaskDef"
                    }
                ],
                "inferenceAcceleratorOverrides": []
            },
            "tags": [],
            "taskArn": "arn:aws:ecs:eu-west-2:704533066374:task/hybrid-airflow-cluster/b357bc55e60b4557a2eccb3619a8ec64",
            "taskDefinitionArn": "arn:aws:ecs:eu-west-2:704533066374:task-definition/apache-airflow:6",
            "version": 1
        }
    ],
    "failures": []
}

进入全屏模式 退出全屏模式

我们已经配置了 AWS CloudWatch 日志记录,所以我们可以到这里查看它的输出。从 CDK 堆栈中,我们定义了我们的日志组“/ecs/hybrid-airflow”,因此我们可以打开这个日志组,我们可以看到格式为

hybrid-airflow/Hybrid-ELT-TaskDef/b357bc55e60b4557a2eccb3619a8ec64

进入全屏模式 退出全屏模式

当我们打开这个流时,我们可以看到输出正是我们所期望的,并且与我们在本地运行这个容器时的输出相匹配。

Connecting to  demords.cidws7o4yy7e.eu-west-2.rds.amazonaws.com  database  demo  as user  admin
Database host IP is : 18.169.169.151
Source IP is  ip-10-0-3-129.eu-west-2.compute.internal
Query is select * from customers WHERE country = "Spain"
Records exported:
171 , 2021-06-21 18:24:25 , Wiatt , Revell , wrevell4q@umn.edu , Female , 26.6.23.83 , Spain
632 , 2021-11-14 18:44:25 , Sheppard , Rylett , sryletthj@java.com , Genderfluid , 50.1.207.70 , Spain
783 , 2021-03-29 18:05:12 , Sloane , Maylour , smaylourlq@1und1.de , Female , 194.84.247.201 , Spain

进入全屏模式 退出全屏模式

我们现在已经成功地在云中运行了我们的容器化 ETL 脚本。下一步,使用 ECS Anywhere 在任何地方运行它...

随处部署ECS

在我们可以在本地环境中运行我们的容器化 ETL 脚本之前,我们需要安装 Amazon ECS Anywhere 代理。这扩展了 Amazon ECS 控制平面并允许您集成外部资源,使您可以在任何运行 ECS Anywhere 代理的地方运行任务定义(您的应用程序)。这些显示为“EXTERNAL”的新 ECS 启动类型(而当您运行任务定义时,您通常可能使用 EC2 或 FARGATE)。

注意! 如果您想深入了解,我建议您查看ECS 研讨会,其中有专门的 ECS Anywhere 部分。

要部署 ECS Anywhere,我们需要执行以下操作:

  • 创建将由 ECS Anywhere(控制平面)使用的新 IAM 角色

  • 安装 ECS Anywhere 代理并集成到现有 ECS 集群

不过,在深入了解之前,您可以将 ECS Anywhere 代理部署到哪些类型的主机上?您可以访问常见问题页面查看最新支持的操作系统。在撰写本文时,这包括 Amazon Linux 2、Bottlerocket、Ubuntu、RHEL、SUSE、Debian、CentOS 和 Fedora。您还应该考虑主机资源,因为可用的 CPU 和内存将决定如何在哪些主机上执行任务。无论您使用哪种发行版,都需要安装和配置 AWS cli,因此请确保在继续之前已完成此操作。

NEWS FLASH 本周宣布,您现在可以在 Windows 上运行 ECS Anywhere 代理。我对此感到非常兴奋,我将尝试找到一台 Windows 机器,在未来作为附录进行尝试。

出于本演练的目的,我使用了两个不同的主机。第一个是坐在我旁边的本地 Ubuntu 18.04 盒子。它有一个 Intel i5 CPU(四核)和 8 GB 内存。第二个是 EC2 实例,一个 m6i.large,它有 2 个 vCPU 和 8GB 内存。它正在运行 Amazon Linux 2。正如您从 pre-req 中回忆的那样,在这两个实例上,我都安装了 MySQL 并修改了 /etc/hosts 以将本地主机条目添加到名称“localmysql.beachgeek.co”。英国”。

EC2 实例是 ip-10-0-0-207.eu-west-2.compute.internal,我们稍后会看到这个确认这个“本地”ECS Anywhere 代理正在运行我们的 ETL 脚本。

创建 IAM 角色

在我们安装 ECS Anywhere 代理之前,我们需要创建一个将由代理使用的 IAM 角色。这将由 ECS CDK 堆栈创建,我们可以查看创建它的代码:


        external_task_def_policy_document_role = iam.Role(
            self,
            "ExternalECSAnywhereRole",
            role_name=f"{props['ecsclustername']}-ExternalECSAnywhereRole",
            assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com")
        )

        external_managed_SSM_policy = iam.ManagedPolicy.from_aws_managed_policy_name("AmazonSSMManagedInstanceCore")
        external_managed_ECS_policy = iam.ManagedPolicy.from_aws_managed_policy_name("service-role/AmazonEC2ContainerServiceforEC2Role")
        external_task_def_policy_document_role.add_managed_policy(external_managed_SSM_policy)
        external_task_def_policy_document_role.add_managed_policy(external_managed_ECS_policy)

进入全屏模式 退出全屏模式

正如我们所见,它添加到 AWS IAM 托管策略(“AmazonSSMManagedInstanceCore”和“AmazonEC2ContainerServiceforEC2Role”)中,这将为 ECS Anywhere 代理提供连接并注册为 ECS 集群中的外部托管实例所需的一切。部署脚本时将显示此输出:

ecs-anywhere-taskdef.ECSAnywhereRoleName = hybrid-airflow-ExternalECSAnywhereRole

进入全屏模式 退出全屏模式

当我们安装 ECS Anywhere 代理时,我们将需要它。

安装和集成 ECS Anywhere

有两种方法可以做到这一点。通过 Amazon ECS 控制台或 cli 生成的脚本。

通过 Amazon ECS 控制台

在 Amazon ECS 控制台中,有一种通过 ECS Anywhere 代理添加外部资源的简单方法。如果您单击 ECS 集群,您将看到一个允许您“注册外部实例”的按钮。

[外部](https://res.cloudinary.com/practicaldev/image/fetch/s--f-faf0Gw--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://raw.githubusercontent .com/94459/blogpost-airflow-hybrid/main/images/ecs-reg-1.png)

当您单击它时,您需要选择我们刚刚创建的 IAM 角色的名称。在上面的示例中,它将是“hybrid-airflow-ExternalECSAnywhereRole”,但如果您跟随,它可能会有所不同。

[外部](https://res.cloudinary.com/practicaldev/image/fetch/s--LBkdR74s--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://raw.githubusercontent.com /94459/blogpost-airflow-hybrid/main/images/ecs-reg-2.png)

当您单击下一步时,您将看到一些文本。您将使用它来安装 ECS Anywhere 代理。复制此内容,然后将其粘贴到您要安装 ECS Anywhere 代理的系统的终端中。

[外部](https://res.cloudinary.com/practicaldev/image/fetch/s--rKcS3G-5--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://raw.githubusercontent .com/94459/blogpost-airflow-hybrid/main/images/ecs-reg-3.png)

注意! 请小心,因为此激活脚本是有时间限制的,并且会在一段时间后过期。不要分享并公开。我已更改屏幕截图中的详细信息,因此仅用于说明目的。

安装过程将需要 5-10 分钟,如果成功,您将得到确认。

您可以通过单击 ECS 集群并检查 External Instances 来确认您现在在 ECS 集群中有一个新资源(以红色突出显示,您可以在这里看到我有两个已注册)

[外部](https://res.cloudinary.com/practicaldev/image/fetch/s--wvmRgMq0--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://raw.githubusercontent.com /94459/blogpost-airflow-hybrid/main/images/ecs-reg-4.png)

通过 Amazon ECS 控制台

要通过命令行安装,首先在要安装代理的机器上打开一个终端会话。首先我创建一些环境变量:

  • ECS_CLUSTERu003d{ECS集群名称}

  • ROLEu003d{ECS Anywhere 角色的名称}

  • DEFAULT_REGIONu003d{您工作的 AWS 区域}

使用上面的示例,我创建以下内容

export ECS_CLUSTER="hybrid-airflow-cluster"
export ROLE_NAME="hybrid-airflow-ExternalECSAnywhereRole"
export DEFAULT_REGION="eu-west-2"

进入全屏模式 退出全屏模式

现在在机器上运行这个命令

aws ssm create-activation --iam-role $ROLE_NAME | tee ssm-activation.json

进入全屏模式 退出全屏模式

这会给你类似的输出。将创建一个包含此信息的文件 (ssm-activation.json)。

{
    "ActivationCode": "sAsr8ZpOktv/hFnytAWS",
    "ActivationId": "b9dbd803-3b67-4719-b23e-d6f849722007"
}

进入全屏模式 退出全屏模式

我们现在将使用这些值创建几个新的环境变量:

export ACTIVATION_ID="b9dbd803-3b67-4719-b23e-d6f849722007"
export ACTIVATION_CODE="sAsr8ZpOktv/hFnytAWS"

进入全屏模式 退出全屏模式

接下来我们下载代理安装脚本

curl -o "ecs-anywhere-install.sh" "https://amazon-ecs-agent.s3.amazonaws.com/ecs-anywhere-install-latest.sh" && sudo chmod +x ecs-anywhere-install.sh

进入全屏模式 退出全屏模式

最后,我们运行脚本,传入我们已经创建的环境变量

sudo ./ecs-anywhere-install.sh \
    --cluster $ECS_CLUSTER \
    --activation-id $ACTIVATION_ID \
    --activation-code $ACTIVATION_CODE \
    --region $DEFAULT_REGION 

进入全屏模式 退出全屏模式

该脚本现在将开始安装、连接并注册 AWS SSM 和 ECS 代理。如果成功,您将看到如下内容:

# Trying to wait for ECS agent to start ...

Ping ECS Agent registered successfully! Container instance arn: "arn:aws:ecs:eu-west-2:704533066374:container-instance/hybrid-airflow-cluster/6863489c26b24c27ad2f91c458b3bd82"

You can check your ECS cluster here https://console.aws.amazon.com/ecs/home?region=eu-west-2#/clusters/hybrid-airflow-cluster

# ok

进入全屏模式 退出全屏模式

与控制台安装一样,您可以通过检查 AWS ECS 控制台来检查这是否有效。您还可以使用命令行:

aws ecs list-container-instances --cluster $ECS_CLUSTER

进入全屏模式 退出全屏模式

这应该为您提供类似于以下内容的内容:

{
    "containerInstanceArns": [
        "arn:aws:ecs:eu-west-2:704533066374:container-instance/hybrid-airflow-cluster/6863489c26b24c27ad2f91c458b3bd82",
        "arn:aws:ecs:eu-west-2:704533066374:container-instance/hybrid-airflow-cluster/c7550837cbcb4a19a1fdcd79cb600062"
    ]
}

进入全屏模式 退出全屏模式

如您所见,第一个实例是我们刚刚集成到 ECS 集群中的实例。现在我们已经安装了这个,我们可以通过 Amazon ECS 测试运行我们的 ETL 容器。

故障排除

注意! 我之前在(以前的博客文章)上介绍了 ECS Anywhere 的安装,其中提供了一些额外的详细信息,如果您遇到困难可以参考!

“本地”运行我们的 ETL 脚本

现在,我们的 ECS 集群中有“EXTERNAL”资源,这使我们能够确定我们想要在哪里运行我们的任务定义(在这种情况下是我们的容器化 ETL 脚本)。接下来我们可以试试这个。

在本地运行我们的任务

从命令行,您可以通过运行以下命令开始运行我们的容器化 ETL 脚本。您会注意到它是完全相同的命令,唯一的区别是启动类型参数已更改为 EXTERNAL。

export ECS_CLUSTER="hybrid-airflow-cluster"
export TASK_DEF="apache-airflow"
export DEFAULT_REGION="eu-west-2"

aws ecs run-task --cluster $ECS_CLUSTER --launch-type EXTERNAL --task-definition $TASK_DEF --region=$DEFAULT_REGION

进入全屏模式 退出全屏模式

当我们开始时,我们会得到类似的输出。注意这次“launchType”,我们显示为“EXTERNAL”。

{
    "tasks": [
        {
            "attachments": [],
            "attributes": [
                {
                    "name": "ecs.cpu-architecture",
                    "value": "x86_64"
                }
            ],
            "clusterArn": "arn:aws:ecs:eu-west-2:704533066374:cluster/hybrid-airflow-cluster",
            "containerInstanceArn": "arn:aws:ecs:eu-west-2:704533066374:container-instance/hybrid-airflow-cluster/6863489c26b24c27ad2f91c458b3bd82",
            "containers": [
                {
                    "containerArn": "arn:aws:ecs:eu-west-2:704533066374:container/hybrid-airflow-cluster/fc43322ecb55404fa086c266329b3a4c/8e387f3d-aa7d-4878-adfd-194866d35e25",
                    "taskArn": "arn:aws:ecs:eu-west-2:704533066374:task/hybrid-airflow-cluster/fc43322ecb55404fa086c266329b3a4c",
                    "name": "Hybrid-ELT-TaskDef",
                    "image": "704533066374.dkr.ecr.eu-west-2.amazonaws.com/hybrid-airflow:airflw",
                    "lastStatus": "PENDING",
                    "networkInterfaces": [],
                    "cpu": "100",
                    "memory": "1024"
                }
            ],
            "cpu": "100",
            "createdAt": 1646652352.003,
            "desiredStatus": "RUNNING",
            "enableExecuteCommand": false,
            "group": "family:apache-airflow",
            "lastStatus": "PENDING",
            "launchType": "EXTERNAL",
            "memory": "1024",
            "overrides": {
                "containerOverrides": [
                    {
                        "name": "Hybrid-ELT-TaskDef"
                    }
                ],
                "inferenceAcceleratorOverrides": []
            },
            "tags": [],
            "taskArn": "arn:aws:ecs:eu-west-2:704533066374:task/hybrid-airflow-cluster/fc43322ecb55404fa086c266329b3a4c",
            "taskDefinitionArn": "arn:aws:ecs:eu-west-2:704533066374:task-definition/apache-airflow:6",
            "version": 1
        }
    ],
    "failures": []
}

进入全屏模式 退出全屏模式

正如我们之前所做的那样,我们可以检查 CloudWatch 日志,我们可以看到这是有效的。我们可以看到 Source IP 是我们的“本地”ECS Anywhere 实例。

Connecting to  demords.cidws7o4yy7e.eu-west-2.rds.amazonaws.com  database  demo  as user  admin
Database host IP is : 18.169.169.151
Source IP is  ip-10-0-0-207.eu-west-2.compute.internal
Query is select * from customers WHERE location = "Spain"
Records exported:
171 , 2021-06-21 18:24:25 , Wiatt , Revell , wrevell4q@umn.edu , Female , 26.6.23.83 , Spain
632 , 2021-11-14 18:44:25 , Sheppard , Rylett , sryletthj@java.com , Genderfluid , 50.1.207.70 , Spain
783 , 2021-03-29 18:05:12 , Sloane , Maylour , smaylourlq@1und1.de , Female , 194.84.247.201 , Spain

进入全屏模式 退出全屏模式

访问本地资源

前面的示例向我们展示了运行容器化 ETL 脚本但访问云中的资源。对于我们正在研究的特定用例(混合编排),我们确实需要展示如何访问本地资源。我们将在下一节中这样做,因为我们将讨论如何通过 Apache Airflow 编排这些。

使用 Apache Airflow 创建和运行我们的混合工作流

到目前为止,我们已经验证我们可以获取我们的 ETL 脚本,将其容器化,然后在任何地方运行它:云、本地甚至我们的开发人员设置。下一阶段是使用 Apache Airflow 将其作为我们数据管道的一部分。

设置 Apache Airflow

首先,我们需要一个 Apache Airflow 环境。我整理了一篇关于如何使用 Apache Airflow (MWAA) 的托管工作流 (MWAA) 在 AWS 上启动和运行的帖子,您可以在此处查看。我已将代码包含在代码仓库中。

ECS操作员

Apache Airflow 有许多 Operator,您可以将它们视为模板,使执行任务变得更加容易。这些运算符在您定义任务时使用,并且您传递各种详细信息,运算符背后的代码完成繁重的工作。有许多 Operator 可以让您使用 AWS 服务(这些被打包到所谓的 Amazon 提供程序包中),我们感兴趣的是 ECS Operator,它允许我们定义和运行 ECS 任务 - 即我们的 ETL 容器脚本。

MWAA 使用专门的工作人员来执行任务,为了管理这些工作人员节点可以做什么,他们有一个 IAM 角色。 IAM 角色(称为 MWAA 执行策略)管理您的工作流可以访问哪些资源。如果我们希望我们的工作流(以及工作节点)能够创建和运行 ECS 任务,我们需要添加一些额外的权限。所需权限已添加到 CDK 脚本中,但在此供参考:

                iam.PolicyStatement(
                    actions=[
                        "ecs:RunTask",
                        "ecs:DescribeTasks",
                        "ecs:RegisterTaskDefinition",
                        "ecs:DescribeTaskDefinition",
                        "ecs:ListTasks",
                        "ecs:StopTasks"
                    ],
                    effect=iam.Effect.ALLOW,
                    resources=[
                        "*"
                        ],
                    ),
                iam.PolicyStatement(
                    actions=[
                        "iam:PassRole"
                    ],
                    effect=iam.Effect.ALLOW,
                    resources=[ "*" ],
                    conditions= { "StringLike": { "iam:PassedToService": "ecs-tasks.amazonaws.com" } },
                    ),

进入全屏模式 退出全屏模式

创建我们的工作流程

现在完成了基础工作,我们可以创建我们的工作流程。通过导入我们将要使用的 Python 库,它像任何其他典型的 DAG 一样开始。如您所见,我们导入了 ECSOperator。

from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.amazon.aws.operators.ecs import ECSOperator

default_args = {
    'owner': 'ubuntu',
    'start_date': datetime(2022, 3, 9),
    'retry_delay': timedelta(seconds=60*60)
}

进入全屏模式 退出全屏模式

我们现在创建我们的任务,使用 ECSOperator 并提供配置详细信息。在下面的示例中,我对一些值进行了硬编码(例如 ECS 集群名称和任务定义,但您可以将这些值作为变量存储在 Apache Airflow、AWS Secrets 等集中式存储中,或者在触发事件时使用参数文件DAG。

with DAG('hybrid_airflow_dag', catchup=False, default_args=default_args, schedule_interval=None) as dag:

    cloudquery = ECSOperator(
        task_id="cloudquery",
        dag=dag,
        cluster="hybrid-airflow-cluster",
        task_definition="apache-airflow",
        overrides={ },
        launch_type="EC2",
        awslogs_group="/ecs/hybrid-airflow",
        awslogs_stream_prefix="ecs/Hybrid-ELT-TaskDef",
        reattach = True
    )

    cloudquery

进入全屏模式 退出全屏模式

就是这样。我们没有定义时间表,我们只是将其作为测试的按需工作流。我们保存它(代码在 repo中)并将其上传到 Apache Airflow DAGs 文件夹中,几秒钟后工作流应该出现在 Apache Airflow UI 中。

[外部](https://res.cloudinary.com/practicaldev/image/fetch/s--b5gmBRCJ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://raw.githubusercontent.com /94459/blogpost-airflow-hybrid/main/images/airflow-dag-1.png)

运行我们的工作流程 - 云

我们可以通过取消暂停来启用工作流,然后我们可以触发它进行测试。一旦触发,Apache Airflow 会将此任务提交给调度程序,调度程序将通过执行程序将任务排队到 MWAA 工作节点。该任务将在那里执行,使用上述参数运行 ECS 任务定义。这些与我们通过命令行运行时使用的相同。

大约 2-3 分钟后,任务应该完成。 Apache Airflow UI 中的任务颜色将从浅绿色(正在运行)变为深绿色(完成)。我们现在可以通过单击任务并单击日志来查看日志。您将看到类似于以下内容的内容(为简洁起见,我省略了一些日志):

...
--------------------------------------------------------------------------------
[2022-03-06, 21:05:07 UTC] {{taskinstance.py:1242}} INFO - Starting attempt 1 of 1
[2022-03-06, 21:05:07 UTC] {{taskinstance.py:1243}} INFO - 
--------------------------------------------------------------------------------
[2022-03-06, 21:05:08 UTC] {{taskinstance.py:1262}} INFO - Executing <Task(ECSOperator): cloudquery> on 2022-03-06 21:05:06.837966+00:00
[2022-03-06, 21:05:08 UTC] {{standard_task_runner.py:52}} INFO - Started process 13324 to run task
[2022-03-06, 21:05:08 UTC] {{standard_task_runner.py:52}} INFO - Started process 13324 to run task
[2022-03-06, 21:05:08 UTC] {{standard_task_runner.py:77}} INFO - Job 346: Subtask cloudquery
[2022-03-06, 21:05:08 UTC] {{base_aws.py:401}} INFO - Airflow Connection: aws_conn_id=aws_default
[2022-03-06, 21:05:08 UTC] {{base_aws.py:190}} INFO - No credentials retrieved from Connection
[2022-03-06, 21:05:08 UTC] {{base_aws.py:93}} INFO - Creating session with aws_access_key_id=None region_name=eu-west-2
[2022-03-06, 21:05:08 UTC] {{base_aws.py:168}} INFO - role_arn is None
[2022-03-06, 21:05:08 UTC] {{logging_mixin.py:109}} INFO - Running <TaskInstance: hybrid_airflow_ec2_dag.cloudquery manual__2022-03-06T21:05:06.837966+00:00 [running]> on host ip-10-192-20-189.eu-west-2.compute.internal
[2022-03-06, 21:05:08 UTC] {{taskinstance.py:1429}} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=ubuntu
AIRFLOW_CTX_DAG_ID=hybrid_airflow_ec2_dag
AIRFLOW_CTX_TASK_ID=cloudquery
AIRFLOW_CTX_EXECUTION_DATE=2022-03-06T21:05:06.837966+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-03-06T21:05:06.837966+00:00
[2022-03-06, 21:05:08 UTC] {{ecs.py:300}} INFO - Running ECS Task - Task definition: apache-airflow - on cluster hybrid-airflow-cluster
[2022-03-06, 21:05:08 UTC] {{ecs.py:302}} INFO - ECSOperator overrides: {}

[2022-03-06, 21:05:08 UTC] {{ecs.py:418}} INFO - No active previously launched task found to reattach
[2022-03-06, 21:05:08 UTC] {{ecs.py:375}} INFO - ECS Task started: 
[2022-03-06, 21:05:08 UTC] {{ecs.py:379}} INFO - ECS task ID is: 42a9f08a68ce46f985a3177e9aca3bce
[2022-03-06, 21:05:08 UTC] {{ecs.py:313}} INFO - Starting ECS Task Log Fetcher

[2022-03-06, 21:06:09 UTC] {{ecs.py:328}} INFO - ECS Task has been successfully executed
[2022-03-06, 21:06:09 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=hybrid_airflow_ec2_dag, task_id=cloudquery, execution_date=20220306T210506, start_date=20220306T210507, end_date=20220306T210609
[2022-03-06, 21:06:09 UTC] {{local_task_job.py:154}} INFO - Task exited with return code 0
[2022-03-06, 21:06:09 UTC] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

进入全屏模式 退出全屏模式

如果我们查看 AWS CloudWatch 组中的日志。

[外部](https://res.cloudinary.com/practicaldev/image/fetch/s--nnvlgNta--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://raw.githubusercontent.com /94459/blogpost-airflow-hybrid/main/images/airflow-dag-3.png)

我们可以看到以下内容:

Connecting to  demords.cidws7o4yy7e.eu-west-2.rds.amazonaws.com  database  demo  as user  admin
Database host IP is : 18.169.169.151
Source IP is  ip-10-0-3-129.eu-west-2.compute.internal
Query is select * from customers WHERE country = "Spain"
Records exported:
171 , 2021-06-21 18:24:25 , Wiatt , Revell , wrevell4q@umn.edu , Female , 26.6.23.83 , Spain
632 , 2021-11-14 18:44:25 , Sheppard , Rylett , sryletthj@java.com , Genderfluid , 50.1.207.70 , Spain
783 , 2021-03-29 18:05:12 , Sloane , Maylour , smaylourlq@1und1.de , Female , 194.84.247.201 , Spain

进入全屏模式 退出全屏模式

成功!我们现在已经通过 Apache Airflow 触发了我们的工作流程。任务定义在我们位于云中的 EC2 实例上运行(我们可以从源 IP 中看到这一点,并通过检查运行 ECS 集群容器的 EC2 实例)。现在看看如何在本地触发任务。

运行我们的工作流程 - 本地

要在本地节点上运行我们的任务,我们需要做的就是将“launch_type”从“EC2”更改为“EXTERNAL”。当 ECS 集群收到运行该任务的请求时,它会查看哪些节点正在运行 ECS Anywhere 代理,然后选择 on 以在那里运行该任务。


    remotequery = ECSOperator(
        task_id="remotequery",
        dag=dag,
        cluster="hybrid-airflow-cluster",
        task_definition="apache-airflow",
        overrides={ },
        launch_type="EXTERNAL",
        awslogs_group="/ecs/hybrid-airflow",
        awslogs_stream_prefix="ecs/Hybrid-ELT-TaskDef",
        reattach = True
    )

    remotequery

进入全屏模式 退出全屏模式

当我们创建一个新的 DAG 并上传它(代码在 repo中)时,我们可以以完全相同的方式触发它。一旦触发,就像之前我们需要等待 2-3 分钟。完成后,我们可以在 Apache Airflow UI 中或通过 CloudWatch 日志流检查日志。

Connecting to  demords.cidws7o4yy7e.eu-west-2.rds.amazonaws.com  database  demo  as user  admin
Database host IP is : 18.169.169.151
Source IP is  ip-10-0-0-207.eu-west-2.compute.internal
Query is select * from customers WHERE country = "Spain"
Records exported:
171 , 2021-06-21 18:24:25 , Wiatt , Revell , wrevell4q@umn.edu , Female , 26.6.23.83 , Spain
632 , 2021-11-14 18:44:25 , Sheppard , Rylett , sryletthj@java.com , Genderfluid , 50.1.207.70 , Spain
783 , 2021-03-29 18:05:12 , Sloane , Maylour , smaylourlq@1und1.de , Female , 194.84.247.201 , Spain

进入全屏模式 退出全屏模式

我们可以看到这次源 IP 是我们运行 ECS Anywhere 代理的本地计算机。

运行我们的工作流程 - 本地和访问本地资源

在前面的示例中,我们仍在访问基于云的 MySQL 数据库(“连接到 demords.cidws7o4yy7e.eu-west-2.rds.amazonaws.com...”)但我们真正想做的是连接到我们的在编排这些类型的混合任务时使用本地资源。现在让我们这样做。

如果您还记得,ETL 脚本使用存储在 AWS Secrets 中的参数(如果您愿意,可以使用不同的存储库)来了解要连接到哪个 MySQL 数据库。我们可以在传递给容器化 ETL 脚本的命令中看到这一点

"command" : [ "ricsue-airflow-hybrid","period1/region-data.csv", "select * from customers WHERE country = \"Spain\"", "rds-airflow-hybrid","eu-west-2" ]} 

进入全屏模式 退出全屏模式

“rds-airflow-hybrid”的值指向存储数据库主机、数据库、用户名和密码的 AWS Secrets 记录。在本演练开始时,我们创建了另一个指向本地 MySQL 数据库的记录,即“localmysql-airflow-hybrid”,因此我们可以创建如下所示的新任务:

with DAG('hybrid_airflow_dag', catchup=False, default_args=default_args, schedule_interval=None) as dag:

    localquery = ECSOperator(
        task_id="localquery",
        dag=dag,
        cluster="hybrid-airflow-cluster",
        task_definition="apache-airflow",
        overrides={ "containerOverrides": [
            { 
                "name": "Hybrid-ELT-TaskDef",
                "command" : [ "ricsue-airflow-hybrid","period1/region-data.csv", "select * from customers WHERE country = \"Spain\"", "localmysql-airflow-hybrid","eu-west-2" ]} 
            ] },
        launch_type="EC2",
        awslogs_group="/ecs/hybrid-airflow",
        awslogs_stream_prefix="ecs/Hybrid-ELT-TaskDef",
        reattach = True
    )

    localquery

进入全屏模式 退出全屏模式

和以前一样,在创建这个新的 DAG 并上传它(代码在 repo中)之后,我们可以以完全相同的方式触发它。一旦触发,就像之前我们需要等待 2-3 分钟。完成后,我们可以在 Apache Airflow UI 中或通过 CloudWatch 日志流检查日志。

Connecting to  localmysql.beachgeek.co.uk  database  localdemo  as user  root
Database host IP is : 127.0.0.1
Source IP is  ip-10-0-0-207.eu-west-2.compute.internal
Query is select * from customers WHERE country = "Spain"
Records exported:
164 , 2021-04-02 , Dag , Delacourt , ddelacourt4j@nydailynews.com , Male , 200.12.59.159 , Spain

进入全屏模式 退出全屏模式

我们可以看到源 IP 是我们运行 ECS Anywhere 代理的本地计算机,我们现在正在连接到本地 MySQL 实例。

注意! 这次我们有不同的结果集的原因是我们使用了不同的样本数据集。在之前的所有查询中,我们针对 Amazon RDS MySQL 实例运行它们

恭喜!我们现在已经通过访问本地资源的 ECS Anywhere 编排运行本地 ETL 脚本,并将结果上传回我们在 Amazon S3 上的数据湖。

使用您的工作流程做更多事情

现在您已经掌握了基础知识,您可以扩展它并尝试如何创建可重用的工作流。您可以做的一些事情包括:

  • 覆盖我们在任务定义中定义的默认 ETL 脚本参数 - 例如,我们可以更改查询或 ETL 脚本的任何其他参数
with DAG('hybrid_airflow_dag', catchup=False, default_args=default_args, schedule_interval=None) as dag:

    cloudquery = ECSOperator(
        task_id="cloudquery",
        dag=dag,
        cluster="hybrid-airflow-cluster",
        task_definition="apache-airflow",
        overrides={ "containerOverrides": [
            { 
                "name": "airflow-hybrid-demo",
                "command" : [ "ricsue-airflow-hybrid","period1/region-data.csv", "select * from customers WHERE country = \"Poland\"", "rds-airflow-hybrid","eu-west-2" ]} 
            ] },
        launch_type="EC2",
        awslogs_group="/ecs/hybrid-airflow",
        awslogs_stream_prefix="ecs/Hybrid-ELT-TaskDef",
        reattach = True
    )

    cloudquery

进入全屏模式 退出全屏模式

  • 创建新的任务定义,使用 PythonOperator 和 boto3 创建一个新的任务定义,然后通过 ECSOperator 运行它。这是您将如何执行此操作的示例。
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.amazon.aws.operators.ecs import ECSOperator
from airflow.operators.python import PythonOperator
import boto3

default_args = {
    'owner': 'ubuntu',
    'start_date': datetime(2019, 8, 14),
    'retry_delay': timedelta(seconds=60*60)
}

def create_task():
    client = boto3.client("ecs", region_name="eu-west-2")
    response = client.register_task_definition(
        containerDefinitions=[
            {
                "name": "Hybrid-ELT-TaskDef",
                "image": "704533066374.dkr.ecr.eu-west-2.amazonaws.com/hybrid-airflow:airflw",
                "cpu": 0,
                "portMappings": [],
                "essential": True,
                "environment": [],
                "mountPoints": [],
                "volumesFrom": [],
                "command": ["ricsue-airflow-hybrid","period1/hq-data.csv", "select * from customers WHERE country = \"England\"", "rds-airflow-hybrid","eu-west-2"],
                "logConfiguration": {"logDriver": "awslogs",
                    "options": {
                        "awslogs-group": "/ecs/hybrid-airflow",
                        "awslogs-region": "eu-west-2", 
                        "awslogs-stream-prefix":"ecs/Hybrid-ELT-TaskDef" }
                    }
            }
        ],
        taskRoleArn="arn:aws:iam::704533066374:role/hybrid-airflow-ECSTaskDefRole",
        executionRoleArn="arn:aws:iam::704533066374:role/ecs-anywhere-taskdef-hybridairflowApacheAirflowTas-1QL1K5RWWCUTD",
        family= "apache-airflow",
        networkMode="host",
        requiresCompatibilities= [
            "EXTERNAL"
        ],
        cpu= "256",
        memory= "512")
    print(response)

with DAG('hybrid_airflow_dag_taskdef', catchup=False, default_args=default_args, schedule_interval=None) as dag:
    create_taskdef = PythonOperator(
        task_id='create_taskdef',
        provide_context=True,
        python_callable=create_task,
        dag=dag
    )

    cloudquery = ECSOperator(
        task_id="cloudquery",
        dag=dag,
        cluster="hybrid-airflow-cluster",
        task_definition="apache-airflow",
        overrides={ },
        launch_type="EC2",
        awslogs_group="/ecs/hybrid-airflow",
        awslogs_stream_prefix="ecs/Hybrid-ELT-TaskDef",
        reattach = True
    )

    create_taskdef >> cloudquery

进入全屏模式 退出全屏模式

结论

我们学到了什么?

在本次演练中,我们了解了如何采取一些您在创建数据管道时通常使用的步骤,将这些步骤容器化,然后在 Apache Airflow 中创建工作流,允许您编排混合数据管道以在我们需要的任何地方运行 -在云端或远程数据中心或网络中。

如何改进?

有很多方法可以使用这样的方法,我很想听听你的意见。

我很想得到你对这篇文章的反馈。你喜欢什么,你认为需要改进什么?如果您可以完成这个非常简短的调查,我将使用这些信息来改进以后的帖子。

非常感谢!

在您出发之前,请确保您考虑删除/清理您可能已设置的 AWS 资源(和本地资源)。

打扫干净

如果您已按照此演练进行操作,那么在离开之前,请确保您删除/删除您创建的任何资源,以确保您不会保留任何经常性费用。查看并清理您在执行过程中可能已配置的以下资源

  • 删除运行混合工作流的 DAG

  • 删除 Amazon ECS 集群和任务

  • 删除 Amazon ECR 容器存储库和映像

  • 删除已复制到 Amazon S3 存储桶的文件

  • 删除示例客户数据库(如果您创建了它们,则为 MySQL 实例)

  • 卸载 ECS Anywhere 代理并清除所有本地容器映像

  • 删除任何已创建的 CloudWatch 日志组

故障排除

和我所有的博客冒险一样,我经常遇到我没想到的事情,我犯的很多错误希望能节省你的时间。所以这是我在设置过程中发现的一些问题。

  • ECS Anywhere 故障排除 - 如其他地方所述,您可能会遇到一些问题,我在另一篇博文中全面概述了这些问题,

  • 权限 - 当我把这个演练放在一起时,我遇到了许多与 IAM 相关的权限问题。这是因为我不想从广泛的访问权限和资源权限开始,这意味着要通过反复试验来确定所需的权限。以下是我遇到的一些与权限相关的错误。这些都包含在上面的 CDK 堆栈中,并包含在内以供参考。

Apache Airflow Worker 任务

  • 在使用 ECSOperator 时,我在尝试执行 ECS Operator 任务时遇到了权限。这是我遇到的错误。
botocore.errorfactory.AccessDeniedException: An error occurred (AccessDeniedException) when calling the RunTask operation: User: arn:aws:sts::704533066374:assumed-role/AmazonMWAA-hybrid-demo-kwZCZS/AmazonMWAA-airflow is not authorized to perform: ecs:RunTask on resource: arn:aws:ecs:eu-west-2:704533066374:task-definition/airflow-hybrid-ecs-task because no identity-based policy allows the ecs:RunTask action
[2022-02-01, 13:02:29 UTC] {{taskinstance.py:1280}} INFO - Marking task as FAILED. dag_id=airflow_dag_test, task_id=airflow-hybrid-ecs-task-query, execution_date=20220201T130226, start_date=20220201T130227, end_date=20220201T130229
[2022-02-01, 13:02:29 UTC] {{standard_task_runner.py:91}} ERROR - Failed to execute job 3 for task airflow-hybrid-ecs-task-query

进入全屏模式 退出全屏模式

我需要修改我的 MWAA 执行策略,以启用这些权限。这是我添加的政策

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "ecs:RunTask",
                "ecs:DescribeTasks"
            ],
            "Resource": "*"
        },
        {
            "Action": "iam:PassRole",
            "Effect": "Allow",
            "Resource": [
                "*"
            ],
            "Condition": {
                "StringLike": {
                    "iam:PassedToService": "ecs-tasks.amazonaws.com"
                }
            }
        }
    ]
}

进入全屏模式 退出全屏模式

我需要更新MWAA执行角色的信任关系如下

{
  "Version": "2008-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": "ecs-tasks.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

进入全屏模式 退出全屏模式

  • 使用 boto3 创建新的任务定义时,我们遇到了另一个错误:
botocore.errorfactory.AccessDeniedException: An error occurred (AccessDeniedException) when calling the RegisterTaskDefinition operation: User: arn:aws:sts::704533066374:assumed-role/AmazonMWAA-hybrid-demo-kwZCZS/AmazonMWAA-airflow is not authorized to perform: ecs:RegisterTaskDefinition on resource: * because no identity-based policy allows the ecs:RegisterTaskDefinition action
[2022-02-03, 15:43:01 UTC] {{taskinstance.py:1280}} INFO - Marking task as FAILED. dag_id=airflow_ecsanywhere_boto3, task_id=create_taskdef, execution_date=20220203T154259, start_date=20220203T154301, end_date=20220203T154301
[2022-02-03, 15:43:01 UTC] {{standard_task_runner.py:91}} ERROR - Failed to execute job 36 for task create_taskdef

进入全屏模式 退出全屏模式

我们只需要添加一个进一步的 ECS 操作,“ecs:RegisterTaskDefinition”

  • ECSOperator 的功能之一是“attachu003dTrue”开关。启用此生成的权限错误。
botocore.errorfactory.AccessDeniedException: An error occurred (AccessDeniedException) when calling the DescribeTaskDefinition operation: User: arn:aws:sts::704533066374:assumed-role/AmazonMWAA-hybrid-demo-kwZCZS/AmazonMWAA-airflow is not authorized to perform: ecs:DescribeTaskDefinition on resource: * because no identity-based policy allows the ecs:DescribeTaskDefinition action
[2022-03-03, 11:41:16 UTC] {{taskinstance.py:1280}} INFO - Marking task as FAILED. dag_id=hybrid_airflow_dag_test, task_id=cloudquery, execution_date=20220303T114110, start_date=20220303T114114, end_date=20220303T114116

进入全屏模式 退出全屏模式

添加以下附加权限可解决此问题

            "ecs:DescribeTaskDefinition",
            "ecs:ListTasks"

进入全屏模式 退出全屏模式

  • 更新ECS集群任务执行角色的信任关系,解决一些不允许传递角色的错误信息。编辑 ECS 执行角色的信任关系,以便允许 MWAA 工作人员代表我们执行这些任务。
{
  "Version": "2008-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": [
          "ecs-tasks.amazonaws.com"
        ]
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

进入全屏模式 退出全屏模式

并更改以添加 Apache Airflow 服务(这将允许工作人员/调度程序启动它)

{
  "Version": "2008-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": [
          "airflow-env.amazonaws.com",
          "airflow.amazonaws.com",
          "ecs-tasks.amazonaws.com"
        ]
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

进入全屏模式 退出全屏模式

  • 将文件复制到 Amazon S3 存储桶时,工作流程遇到权限问题。我需要修改我的 MWAA 执行策略,因此我创建了一个新策略,允许您将文件复制到 S3 存储桶并附加它。这就是它的样子(资源将根据您要将文件复制到的位置而有所不同)
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:DeleteObject"
            ],
            "Resource": [
                "arn:aws:s3:::ricsue-airflow-hybrid*"
            ]
        }
    ]
}

进入全屏模式 退出全屏模式

  • 工作流没有生成任何 CloudWatch 日志,这是因为我没有添加正确的权限。为了解决这个问题,我修改了策略以包含(并匹配)在 ECS 集群创建期间创建的 AWS CloudWatch 日志记录组。

并使其成为我们可以进行必要的日志记录

{
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:CreateLogGroup",
                "logs:PutLogEvents",
                "logs:GetLogEvents",
                "logs:GetLogRecord",
                "logs:GetLogGroupFields",
                "logs:GetQueryResults"
            ],
            "Resource": [
                "arn:aws:logs:*:*:log-group:/ecs/hyrid-airflow:log-stream:/ecs/*"
            ]
        }

进入全屏模式 退出全屏模式

设置MySQL测试数据

我如何在本地设置我的 MySql 数据库机器 - Ubuntu

这是我使用的程序。我假设你对 MySQL 有一些基本的了解。这可以用于测试/演示设置,但绝对不能用于其他任何设置。作为 root 用户,您将需要运行以下命令:

 sudo apt update
 sudo apt install mysql-server
 sudo mysql_secure_installation

进入全屏模式 退出全屏模式

按照提示启用本地访问。我确实必须运行“sudo mysql -uroot -p”才能登录才能设置权限。创建 mysql 用户并更新权限,以便它可以连接/查询数据库(我创建了一个名为 admin 的用户,它可以从任何主机访问所有数据库)

在刚刚安装 MySQL 的机器上,在 /etc/mysql/mysql.conf.d/mysqld.cnf 上更新绑定到 0.0.0.0,然后“sudo systemctl restart mysql”

更新您的本地 /etc/hosts 以添加 ETL 脚本将用于连接的本地数据库的主机名。您在 AWS Secrets 中定义连接详细信息时使用此值。我们使用 /etc/hosts 因为这是一个简单的演示设置。我在设置中使用了“127.0.1.1 localmysql.beachgeek.co.uk”。

测试与它的连接并确保它解析为 127.0.0.1。

现在登录到 MySQL 并创建您的演示数据库,然后使用以下命令创建一个表

create database localdemo;
use localdemo;
create table customers ( id INT, date DATE, first_name VARCHAR(50), last_name VARCHAR(50), email VARCHAR(50), gender VARCHAR(50), ip_address VARCHAR(20), country VARCHAR(50), consent VARCHAR(50) );

进入全屏模式 退出全屏模式

复制 repo 中的示例客户数据,您现在可以使用以下命令导入数据>

mysql -u {user} -p localdemo < customer-reg-2.sql

进入全屏模式 退出全屏模式

我如何在本地设置 MySql 数据库机器 - Amazon Linux 2

在 Amazon Linux 2 上安装 MySQL 时使用了类似的过程。运行以下命令来安装和配置 MySQL。

sudo yum install https://dev.mysql.com/get/mysql80-community-release-el7-3.noarch.rpm
sudo amazon-linux-extras install epel -y
sudo rpm --import https://repo.mysql.com/RPM-GPG-KEY-mysql-2022
sudo yum install mysql-community-server
sudo systemctl enable --now mysqld

进入全屏模式 退出全屏模式

您现在需要更改 root 密码。要查找当前的临时密码,请运行“sudo grep 'temporary password' /var/log/mysqld.log”,它将显示密码。像这样的东西:

you will see: 022-01-26T17:27:51.108997Z 6 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: F+lFfe*QG8qX

进入全屏模式 退出全屏模式

然后设置新的root密码

sudo mysql_secure_installation -p'F+lFfe*QG8qX'

进入全屏模式 退出全屏模式

更新您的本地 /etc/hosts 以添加 ETL 脚本将用于连接的本地数据库的主机名。您在 AWS Secrets 中定义连接详细信息时使用此值。我们使用 /etc/hosts 因为这是一个简单的演示设置。我在设置中使用了“127.0.1.1 localmysql.beachgeek.co.uk”。

测试与它的连接并确保它解析为 127.0.0.1。

现在登录到 MySQL 并创建您的演示数据库,然后使用以下命令创建一个表

create database localdemo;
use localdemo;
create table customers ( id INT, date DATE, first_name VARCHAR(50), last_name VARCHAR(50), email VARCHAR(50), gender VARCHAR(50), ip_address VARCHAR(20), country VARCHAR(50), consent VARCHAR(50) );

进入全屏模式 退出全屏模式

复制 repo 中的示例客户数据,您现在可以使用以下命令导入数据>

mysql -u {user} -p localdemo < customer-reg-2.sql

进入全屏模式 退出全屏模式

Logo

ModelScope旨在打造下一代开源的模型即服务共享平台,为泛AI开发者提供灵活、易用、低成本的一站式模型服务产品,让模型应用更简单!

更多推荐