在您的 Apache Airflow DAG 中跨不同 AWS 账户读取和写入数据

普通读者会知道,我有时会潜伏在Apache Airflow slack 频道看看发生了什么。如果您是 Apache Airflow 的新手,或者想更深入地了解,那么我强烈建议您花一些时间在这里。社区非常欢迎并渴望帮助新参与者。

在最近的一次会议中,我遇到了一个构建者遇到的一个有趣的问题,即如何访问(读/写)S3 存储桶中的数据,该存储桶与托管 Amazon Managed Workflows for Apache 的账户不同气流(MWAA)。

本文的其余部分将快速浏览设置、发生的错误以及如何配置 MWAA,以便您可以自信地跨不同的 AWS 账户读取/写入数据。

问题

一位客户正在使用 MWAA 2.0.2,在他们的工作流程中,他们试图将数据上传到与他们的 MWAA 环境运行不同的 AWS 账户中的 S3 存储桶。当他们触发他们的 DAG 时,他们遇到了错误,例如以下:

 An error occurred (AccessDenied) when calling the PutObject operation: Access Denied

进入全屏模式 退出全屏模式

为了帮助进行故障排除,客户创建了一个测试 DAG,它将尝试以三种方式将文件读/写到示例 S3 存储桶:

  • 使用 boto3,用于与 AWS 交互的 Python 开发工具包

  • 使用 Apache Airflow 运营商-airflow.providers.amazon.aws.hooks.s3

  • 使用 pandas Python 库 - 使用 s3fs

这是客户放在一起的测试DAG

import logging
import random
from datetime import timedelta
import pandas as pd
import boto3
from airflow.models import Variable
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.utils.dates import days_ago


def save_file_to_s3():
    n = random.randint(100, 10000)
    filename = f'demo_{n}.csv'
    local_file_path = f'/tmp/{filename}'
    demo_data = pd.DataFrame({'num_legs': [2, 4, 8, 0],
                              'num_wings': [2, 0, 0, 0],
                              'num_specimen_seen': [10, 2, 1, 8]},
                             index=['falcon', 'dog', 'spider', 'fish'])
    demo_data.to_csv(local_file_path, index=False)

    bucket_name = Variable.get('TEST_BUCKET')
    folder_name = Variable.get('TEST_FOLDER')
    s3_key = f'{folder_name}/boto3_{filename}'
    logging.info(f'local_file_path: {local_file_path}, bucket_name: {bucket_name}, key: {s3_key}')

    logging.info('Uploading CSV to S3 with boto3')
    s3 = boto3.resource('s3')
    try:
        s3.meta.client.upload_file(local_file_path, bucket_name, s3_key, ExtraArgs={'ACL': 'bucket-owner-full-control'})
    except Exception as e:
        logging.info(e)
        pass
    logging.info('Done uploading CSV to S3 with boto3')
    logging.info('Uploading CSV to S3 with S3HOOK')
    s3_hook = S3Hook()
    s3_key = f'{folder_name}/s3hook_{filename}'
    try:
        s3_hook.load_file(local_file_path, bucket_name=bucket_name, key=s3_key, acl_policy='bucket-owner-full-control')
    except Exception as e:
        logging.info(e)
        pass
    logging.info('Done uploading CSV to S3 with S3HOOK')

    logging.info('Uploading CSV to S3 with pandas')
    demo_data.to_csv(f's3://{bucket_name}/{folder_name}/pandas_{filename}', index=False)
    logging.info('Done uploading CSV to S3 with pandas')


def run_dag():
    save_file_to_s3()


dag_default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=2)
}

with DAG(
    'demo',
    default_args=dag_default_args,
    description='description',
    schedule_interval="0 10 * * *",
    start_date=days_ago(1),
    tags=['test'],
) as dag:
    test_dag = PythonOperator(
        task_id="test_dag",
        python_callable=run_dag
    )

进入全屏模式 退出全屏模式

有趣的是,当他们运行 DAG 时,boto3 函数会将文件写入目标 S3 存储桶,但其他两个都失败了。

是时候仔细看看了。

进场

我想看看我是否可以重现这个问题,所以我采取的方法是

1/ 设置运行与客户相同版本的 MWAA 的环境,但最初使用 SAME 帐户中的 S3 存储桶以确保一切按预期工作,

2/ 在不同的 AWS 账户上设置一个新的 S3 存储桶并重复以查看是否可以重现错误,

3/ 查看 CloudTrail 和 MWAA 日志,看看我是否可以发现任何问题并进行修复。

设置

客户在一个 AWS 账户中运行标准 MWAA 2.0.2 环境,因此我快速预置了一个环境(使用我在](https://dev.to/aws/using-aws-cdk-to-deploy-your-amazon-managed-workflows-for-apache-airflow-environment-12cf)之前写过的我的 CDK 应用程序[)。

我部署了上面的 DAG,然后不得不做几件事:

  • 创建一个新的 S3 存储桶(在我的例子中,我称之为“ricsue-airflow-s3hook”,在这个存储桶中,我创建了一个名为“s3permissions”的文件夹)

  • 调整了 MWAA 执行角色的权限(您可以通过在 AWS 控制台中打开您的 MWAA 环境来找到)以包含我刚刚创建的新存储桶

...
            "Resource": [
                "arn:aws:s3:::airflow-ricsue-cdk-demo/*",
                "arn:aws:s3:::airflow-ricsue-cdk-demo",
                "arn:aws:s3:::ricsue-airflow-s3hook/*",
                "arn:aws:s3:::ricsue-airflow-s3hook"
            ],
            "Effect": "Allow"
...

进入全屏模式 退出全屏模式

  • 在 MWAA 控制台中,使用 Apache Airflow UI 中的 Admin 菜单选项,我创建了两个新变量:TEST_BUKCET(值为“ricsue-airflow-s3hook”)和 TEST_FOLDER(值为“s3permissions” ")

完成后,我启用它,然后手动触发它。它失败并出现以下错误:

[2021-09-06 12:36:26,255] {{customer-s3.py:26}} INFO - local_file_path: /tmp/demo_2025.csv, bucket_name: ricsue-airflow-s3hook, key: s3permissions/boto3_demo_2025.csv
[2021-09-06 12:36:26,284] {{customer-s3.py:28}} INFO - Uploading CSV to S3 with boto3
[2021-09-06 12:36:26,429] {{customer-s3.py:35}} INFO - Done uploading CSV to S3 with boto3
[2021-09-06 12:36:26,455] {{customer-s3.py:36}} INFO - Uploading CSV to S3 with S3HOOK
[2021-09-06 12:36:26,487] {{logging_mixin.py:104}} INFO - [2021-09-06 12:36:26,486] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default
[2021-09-06 12:36:26,545] {{logging_mixin.py:104}} INFO - [2021-09-06 12:36:26,545] {{base_aws.py:179}} INFO - No credentials retrieved from Connection
[2021-09-06 12:36:26,571] {{logging_mixin.py:104}} INFO - [2021-09-06 12:36:26,571] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=None
[2021-09-06 12:36:26,613] {{logging_mixin.py:104}} INFO - [2021-09-06 12:36:26,613] {{base_aws.py:157}} INFO - role_arn is None
[2021-09-06 12:36:26,782] {{customer-s3.py:44}} INFO - Done uploading CSV to S3 with S3HOOK
[2021-09-06 12:36:26,813] {{customer-s3.py:46}} INFO - Uploading CSV to S3 with pandas
[2021-09-06 12:36:26,851] {{taskinstance.py:1482}} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 117, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 128, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/usr/local/airflow/dags/customer-s3.py", line 52, in run_dag
    save_file_to_s3()
  File "/usr/local/airflow/dags/customer-s3.py", line 47, in save_file_to_s3
    demo_data.to_csv(f's3://{bucket_name}/{folder_name}/pandas_{filename}', index=False)
  File "/usr/local/lib64/python3.7/site-packages/pandas/core/generic.py", line 3403, in to_csv
    storage_options=storage_options,
  File "/usr/local/lib64/python3.7/site-packages/pandas/io/formats/format.py", line 1083, in to_csv
    csv_formatter.save()
  File "/usr/local/lib64/python3.7/site-packages/pandas/io/formats/csvs.py", line 234, in save
    storage_options=self.storage_options,
  File "/usr/local/lib64/python3.7/site-packages/pandas/io/common.py", line 563, in get_handle
    storage_options=storage_options,
  File "/usr/local/lib64/python3.7/site-packages/pandas/io/common.py", line 315, in _get_filepath_or_buffer
    fsspec = import_optional_dependency("fsspec")
  File "/usr/local/lib64/python3.7/site-packages/pandas/compat/_optional.py", line 109, in import_optional_dependency
    raise ImportError(msg) from None
ImportError: Missing optional dependency 'fsspec'.  Use pip or conda to install fsspec.

进入全屏模式 退出全屏模式

虽然 boto3 和 Apache Airflow 操作符有效,但 Python 库 (pandas) 却没有。这是因为它们没有安装在 Apache Airflow 工作节点上。

我修改了要求文本以添加以下内容:

fsspec
s3fs

进入全屏模式 退出全屏模式

上传后,必须更新 MWAA 环境。这花了大约 20 分钟,但一旦完成,我就能够成功触发 DAG。没有错误,当我查看本地 S3 存储桶时,我可以看到以下文件:

boto3_demo_9977.csv
s3hook_demo_9977.csv
pandas_demo_9977.csv

进入全屏模式 退出全屏模式

所以,第一步完成了——我们的一切都按预期工作了。现在更改 DAG 以将这些文件写入不同 AWS 账户中的 S3 存储桶。

错误

为了重现与客户相同的设置,我必须:

  • 在不同的 AWS 账户中创建一个新的 S3 存储桶 - (在我的例子中,我称之为“ricsue-airflow-s3hook-diffawsaccount”,在这个存储桶中,我保留了名为“s3permissions”的同一个文件夹)

  • 我需要更新 Apache Airflow 变量 TEST_BUCKET。在 Apache Airflow UI 中,我编辑了指向这个新存储桶和不同 AWS 账户的值。

为了使这更容易,我让两个浏览器运行 Chrome 运行我的 AWS 账户和 MWAA,Firefox 运行另一个 AWS 账户,只使用 Amazon S3 存储桶。

完成这些更改后,我再次触发了 DAG。考虑到这是两个独立的 AWS 账户,我没想到任何这些方法可以上传文件,但我想设置基准。果然,我收到以下错误,DAG 失败:

[2021-09-06 13:18:54,317] {{customer-s3.py:26}} INFO - local_file_path: /tmp/demo_5839.csv, bucket_name: ricsue-airflow-s3hook-diffawsaccount, key: s3permissions/boto3_demo_5839.csv
[2021-09-06 13:18:54,344] {{customer-s3.py:28}} INFO - Uploading CSV to S3 with boto3
[2021-09-06 13:18:54,479] {{customer-s3.py:33}} INFO - Failed to upload /tmp/demo_5839.csv to ricsue-airflow-s3hook-diffawsaccount/s3permissions/boto3_demo_5839.csv: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
[2021-09-06 13:18:54,507] {{customer-s3.py:35}} INFO - Done uploading CSV to S3 with boto3
[2021-09-06 13:18:54,531] {{customer-s3.py:36}} INFO - Uploading CSV to S3 with S3HOOK
[2021-09-06 13:18:54,558] {{logging_mixin.py:104}} INFO - [2021-09-06 13:18:54,558] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default
[2021-09-06 13:18:54,602] {{logging_mixin.py:104}} INFO - [2021-09-06 13:18:54,602] {{base_aws.py:179}} INFO - No credentials retrieved from Connection
[2021-09-06 13:18:54,631] {{logging_mixin.py:104}} INFO - [2021-09-06 13:18:54,631] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=None
[2021-09-06 13:18:54,673] {{logging_mixin.py:104}} INFO - [2021-09-06 13:18:54,673] {{base_aws.py:157}} INFO - role_arn is None
[2021-09-06 13:18:54,792] {{customer-s3.py:42}} INFO - An error occurred (403) when calling the HeadObject operation: Forbidden
[2021-09-06 13:18:54,818] {{customer-s3.py:44}} INFO - Done uploading CSV to S3 with S3HOOK
[2021-09-06 13:18:54,850] {{customer-s3.py:46}} INFO - Uploading CSV to S3 with pandas
[2021-09-06 13:18:56,049] {{taskinstance.py:1482}} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/s3fs/core.py", line 248, in _call_s3
    out = await method(**additional_kwargs)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/aiobotocore/client.py", line 155, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the CreateBucket operation: Access Denied

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 117, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 128, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/usr/local/airflow/dags/customer-s3.py", line 52, in run_dag
    save_file_to_s3()
  File "/usr/local/airflow/dags/customer-s3.py", line 47, in save_file_to_s3
    demo_data.to_csv(f's3://{bucket_name}/{folder_name}/pandas_{filename}', index=False)
  File "/usr/local/lib64/python3.7/site-packages/pandas/core/generic.py", line 3403, in to_csv
    storage_options=storage_options,
  File "/usr/local/lib64/python3.7/site-packages/pandas/io/formats/format.py", line 1083, in to_csv
    csv_formatter.save()
  File "/usr/local/lib64/python3.7/site-packages/pandas/io/formats/csvs.py", line 234, in save
    storage_options=self.storage_options,
  File "/usr/local/lib64/python3.7/site-packages/pandas/io/common.py", line 563, in get_handle
    storage_options=storage_options,
  File "/usr/local/lib64/python3.7/site-packages/pandas/io/common.py", line 345, in _get_filepath_or_buffer
    filepath_or_buffer, mode=fsspec_mode, **(storage_options or {})
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/fsspec/core.py", line 438, in open
    **kwargs,
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/fsspec/core.py", line 292, in open_files
    [fs.makedirs(parent, exist_ok=True) for parent in parents]
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/fsspec/core.py", line 292, in <listcomp>
    [fs.makedirs(parent, exist_ok=True) for parent in parents]
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/fsspec/asyn.py", line 88, in wrapper
    return sync(self.loop, func, *args, **kwargs)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/fsspec/asyn.py", line 69, in sync
    raise result[0]
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/fsspec/asyn.py", line 25, in _runner
    result[0] = await coro
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/s3fs/core.py", line 731, in _makedirs
    await self._mkdir(path, create_parents=True)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/s3fs/core.py", line 716, in _mkdir
    await self._call_s3("create_bucket", **params)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/s3fs/core.py", line 268, in _call_s3
    raise err
PermissionError: Access Denied

进入全屏模式 退出全屏模式

正如我们所见,这三种方法都失败了。正是我所期望的。

奇怪的是客户没有看到这个问题,并且能够使用 boto3 写入目标 S3 存储桶。让我们看看如何在我的环境中重现它。

修复

在您的 Amazon S3 存储桶中,您可以定义允许从其他 AWS 账户读取/写入文件的存储桶策略。

在新的 AWS 账户中,我为我创建的新存储桶创建了以下存储桶策略(“ricsue-airflow-s3hook-diffawsaccount”)。这是我添加的:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::704533066374:role/mwaa-2-eks-role"
            },
            "Action": [
                "s3:GetObject*",
                "s3:GetBucket*",
                "s3:List*",
                "s3:PutObject*"
            ],
            "Resource": [
                "arn:aws:s3:::ricsue-airflow-s3hook-diffawsaccount/*",
                "arn:aws:s3:::ricsue-airflow-s3hook-diffawsaccount"
            ]
        }
    ]
}

进入全屏模式 退出全屏模式

该策略包含我的原始 AWS 账户中我的 MWAA 环境的 MWAA 执行角色的 arn,配置允许的操作(在本例中,我已将其范围缩小到这些操作 - GetObject* 、 GetBucket* 、 List* 和PutObject* ),然后配置目标 S3 存储桶资源(这里是此存储桶下的所有资源,但如果您愿意,也可以将范围缩小到某些文件夹)

当我保存了这个,然后重新运行了 DAG。成功,这一切都奏效了。

[2021-09-06 18:09:15,305] {{customer-s3.py:26}} INFO - local_file_path: /tmp/demo_3455.csv, bucket_name: ricsue-airflow-s3hook-diffawsaccount, key: s3permissions/boto3_demo_3455.csv
[2021-09-06 18:09:15,335] {{customer-s3.py:28}} INFO - Uploading CSV to S3 with boto3
[2021-09-06 18:09:15,499] {{customer-s3.py:35}} INFO - Done uploading CSV to S3 with boto3
[2021-09-06 18:09:15,564] {{customer-s3.py:36}} INFO - Uploading CSV to S3 with S3HOOK
[2021-09-06 18:09:15,592] {{logging_mixin.py:104}} INFO - [2021-09-06 18:09:15,592] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default
[2021-09-06 18:09:15,632] {{logging_mixin.py:104}} INFO - [2021-09-06 18:09:15,632] {{base_aws.py:179}} INFO - No credentials retrieved from Connection
[2021-09-06 18:09:15,681] {{logging_mixin.py:104}} INFO - [2021-09-06 18:09:15,681] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=None
[2021-09-06 18:09:15,719] {{logging_mixin.py:104}} INFO - [2021-09-06 18:09:15,719] {{base_aws.py:157}} INFO - role_arn is None
[2021-09-06 18:09:15,859] {{customer-s3.py:44}} INFO - Done uploading CSV to S3 with S3HOOK
[2021-09-06 18:09:15,887] {{customer-s3.py:46}} INFO - Uploading CSV to S3 with pandas
[2021-09-06 18:09:16,729] {{customer-s3.py:48}} INFO - Done uploading CSV to S3 with pandas
[2021-09-06 18:09:16,762] {{python.py:118}} INFO - Done. Returned value was: None
[2021-09-06 18:09:16,805] {{taskinstance.py:1192}} INFO - Marking task as SUCCESS. dag_id=demo, task_id=test_dag, execution_date=20210906T180913, start_date=20210906T180914, end_date=20210906T180916
[2021-09-06 18:09:16,901] {{taskinstance.py:1246}} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-09-06 18:09:16,943] {{logging_mixin.py:104}} INFO - [2021-09-06 18:09:16,943] {{local_task_job.py:146}} INFO - Task exited with return code 0

进入全屏模式 退出全屏模式

但是等等,这并不能描述客户问题。他们可以让它与 boto3 一起工作,那么发生了什么?

我更改了存储桶策略,使其具有更少的权限,特别是删除 List* 如下:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::704533066374:role/mwaa-2-eks-role"
            },
            "Action": [
                "s3:GetObject*",
                "s3:GetBucket*",
                "s3:PutObject*"
            ],
            "Resource": [
                "arn:aws:s3:::ricsue-airflow-s3hook-diffawsaccount/*",
                "arn:aws:s3:::ricsue-airflow-s3hook-diffawsaccount"
            ]
        }
    ]
}

进入全屏模式 退出全屏模式

当我重新运行 DAG 时,boto3 工作,但 S3 Hook 和 pandas 因权限错误而失败:

[2021-09-06 18:07:11,238] {{customer-s3.py:26}} INFO - local_file_path: /tmp/demo_6465.csv, bucket_name: ricsue-airflow-s3hook-diffawsaccount, key: s3permissions/boto3_demo_6465.csv
[2021-09-06 18:07:11,263] {{customer-s3.py:28}} INFO - Uploading CSV to S3 with boto3
[2021-09-06 18:07:11,420] {{customer-s3.py:35}} INFO - Done uploading CSV to S3 with boto3
[2021-09-06 18:07:11,454] {{customer-s3.py:36}} INFO - Uploading CSV to S3 with S3HOOK
[2021-09-06 18:07:11,481] {{logging_mixin.py:104}} INFO - [2021-09-06 18:07:11,481] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default
[2021-09-06 18:07:11,547] {{logging_mixin.py:104}} INFO - [2021-09-06 18:07:11,546] {{base_aws.py:179}} INFO - No credentials retrieved from Connection
[2021-09-06 18:07:11,571] {{logging_mixin.py:104}} INFO - [2021-09-06 18:07:11,571] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=None
[2021-09-06 18:07:11,578] {{logging_mixin.py:104}} WARNING - /usr/local/airflow/.local/lib/python3.7/site-packages/watchtower/__init__.py:205 WatchtowerWarning: Failed to deliver logs: None
[2021-09-06 18:07:11,649] {{logging_mixin.py:104}} INFO - [2021-09-06 18:07:11,649] {{base_aws.py:157}} INFO - role_arn is None
[2021-09-06 18:07:11,761] {{customer-s3.py:42}} INFO - An error occurred (403) when calling the HeadObject operation: Forbidden
[2021-09-06 18:07:11,789] {{customer-s3.py:44}} INFO - Done uploading CSV to S3 with S3HOOK
[2021-09-06 18:07:11,817] {{customer-s3.py:46}} INFO - Uploading CSV to S3 with pandas
[2021-09-06 18:07:12,347] {{taskinstance.py:1482}} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/s3fs/core.py", line 248, in _call_s3
    out = await method(**additional_kwargs)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/aiobotocore/client.py", line 155, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the CreateBucket operation: Access Denied

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 117, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 128, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/usr/local/airflow/dags/customer-s3.py", line 52, in run_dag
    save_file_to_s3()
  File "/usr/local/airflow/dags/customer-s3.py", line 47, in save_file_to_s3
    demo_data.to_csv(f's3://{bucket_name}/{folder_name}/pandas_{filename}', index=False)
  File "/usr/local/lib64/python3.7/site-packages/pandas/core/generic.py", line 3403, in to_csv
    storage_options=storage_options,
  File "/usr/local/lib64/python3.7/site-packages/pandas/io/formats/format.py", line 1083, in to_csv
    csv_formatter.save()
  File "/usr/local/lib64/python3.7/site-packages/pandas/io/formats/csvs.py", line 234, in save
    storage_options=self.storage_options,
  File "/usr/local/lib64/python3.7/site-packages/pandas/io/common.py", line 563, in get_handle
    storage_options=storage_options,
  File "/usr/local/lib64/python3.7/site-packages/pandas/io/common.py", line 345, in _get_filepath_or_buffer
    filepath_or_buffer, mode=fsspec_mode, **(storage_options or {})
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/fsspec/core.py", line 438, in open
    **kwargs,
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/fsspec/core.py", line 292, in open_files
    [fs.makedirs(parent, exist_ok=True) for parent in parents]
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/fsspec/core.py", line 292, in <listcomp>
    [fs.makedirs(parent, exist_ok=True) for parent in parents]
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/fsspec/asyn.py", line 88, in wrapper
    return sync(self.loop, func, *args, **kwargs)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/fsspec/asyn.py", line 69, in sync
    raise result[0]
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/fsspec/asyn.py", line 25, in _runner
    result[0] = await coro
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/s3fs/core.py", line 731, in _makedirs
    await self._mkdir(path, create_parents=True)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/s3fs/core.py", line 716, in _mkdir
    await self._call_s3("create_bucket", **params)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/s3fs/core.py", line 268, in _call_s3
    raise err
PermissionError: Access Denied

进入全屏模式 退出全屏模式

所以,我们可以关闭这个循环。我们现在了解这可能会在什么情况下起作用(目标 S3 存储桶策略对各种上传文件的方法没有正确的权限),我们可以根据需要解决/解决这个问题。

结论

在这篇文章中,我向您展示了如何设置 S3 存储桶,以便您可以使用它们从 MWAA 环境中读取和写入数据。我向您展示了它是如何工作的,无论您是通过 boto3、使用 Apache Airflow S3 运算符还是通过第三方 Python 库执行此操作。

请通过您对这篇文章的任何其他问题或评论的评论让我知道。

您可以通过此处的 GitHub 存储库链接查看/使用本博客随附的文件

Logo

ModelScope旨在打造下一代开源的模型即服务共享平台,为泛AI开发者提供灵活、易用、低成本的一站式模型服务产品,让模型应用更简单!

更多推荐