物料准备

相关部署文件 git 地址
https://github.com/itnoobzzy/EasyAirflow.git
项目文件目录如下:
image.png
配置文件及对应目录
airflow 容器化部署需要将 dag 和 logs 以及 plugin 挂载,同时需要将配置文件 airflow.cfg 挂载至容器内部。
下载完后执行如下命令配置 airflow.cfg 配置文件:

cp EasyAirflow
cp config/default_airflow.cfg   airflow.cfg
# 修改 airflow.cfg 中的数据库等相关配置信息
vim airflow.cfg
  • 这里主要需要修改四个地方:
    将 executor 修改为 CeleryExecutor。
    修改 sql_alchemy_conn 使用已有的 mysql 数据库, 这里需要注意连接驱动使用 mysql+pymysql。
    修改 broker_url 和 result_backend, broker 需要使用 redis 通信, result_backend 使用 mysql 存储,这里 result_backend 需要注意使用 db+mysql 连接驱动。
    image.png
    image.png
    image.png

Dockerfile 文件

FROM apache/airflow:slim-latest-python3.10
USER root
EXPOSE 8080 5555 8793
COPY config/airflow.cfg /opt/airflow/airflow.cfg
RUN set -ex \
    && buildDeps=' \
        freetds-dev \
        libkrb5-dev \
        libsasl2-dev \
        libssl-dev \
        libffi-dev \
        libpq-dev \
        git \
        python3-dev \
        gcc \
        sasl2-bin \
        libsasl2-2 \
        libsasl2-dev \
        libsasl2-modules \
    ' \
    && apt-get update -yqq \
    && apt-get upgrade -yqq \
    && apt-get install -yqq --no-install-recommends \
        $buildDeps \
        freetds-bin \
        build-essential \
        default-libmysqlclient-dev \
        apt-utils \
        curl \
        rsync \
        netcat \
        locales \
        procps \
        telnet

RUN chmod 777 -R /opt/airflow/logs
RUN chmod 777 -R /opt/airflow/dags
USER airflow
COPY config/airflow.cfg /opt/airflow/airflow.cfg

# 这里使用 airflow 官方 2.6.0 python3.10 的镜像作为基础镜像,
# 有几个第三方库需要安装,否则在启动 airflow 的时候会报错。
RUN pip install celery
RUN pip install flower
RUN pip install pymysql
RUN pip install mysqlclient
RUN pip install redis
RUN pip install livy==0.6.0
RUN pip install apache-airflow-providers-mysql
RUN pip install apache-airflow-providers-apache-hive

# 最后一步需要初始化 airflow 元数据库
RUN airflow db init

如果 airflow.cfg 中配置的数据库连接正常,在构建完镜像后将会在对应数据库下看到如下表:
image.png
docker-compose.yaml

version: '2.1'
services:
    webserver:
        image: airflow:2.6.0
        restart: always
        volumes:
            # dag and plugins
            - ./dags:/opt/airflow/dags
            - ./plugins:/opt/airflow/plugins
            - ./logs:/opt/airflow/logs
            - ./config/airflow.cfg:/opt/airflow/airflow.cfg
        ports:
            - "8080:8080"
        command: ["airflow", "webserver"]
    scheduler:
        image: airflow:2.6.0
        restart: always
        volumes:
            # dag and plugins
            - ./dags:/opt/airflow/dags
            - ./plugins:/opt/airflow/plugins
            - ./logs:/opt/airflow/logs
            - ./config/airflow.cfg:/opt/airflow/airflow.cfg
        command: ["airflow", "scheduler"]
    flower:
        image: airflow:2.6.0
        restart: always
        volumes:
            # dag and plugins
            - ./dags:/opt/airflow/dags
            - ./plugins:/opt/airflow/plugins
            - ./logs:/opt/airflow/logs
            - ./config/airflow.cfg:/opt/airflow/airflow.cfg
        ports:
            - "5555:5555"
        command: ["airflow", "celery", "flower"]
    worker:
        image: airflow:2.6.0
        restart: always
        volumes:
            # dag and plugins
            - ./dags:/opt/airflow/dags
            - ./plugins:/opt/airflow/plugins
            - ./logs:/opt/airflow/logs
            - ./config/airflow.cfg:/opt/airflow/airflow.cfg
        ports:
            - "8793:8793"
        command: ["airflow", "celery", "worker"]
  • webserver 和 flower 需要将 8080 和 5555 端口暴露,用于外部访问管理界面
  • worker 需要将 8793 端口暴露用于获取日志及 scheduler 和 worker 间通信

镜像构建与启动

执行构建命令:docker build -t airflow:2.6.0 .
执行启动命令:docker-compose up -d
查看容器日志验证是否启动正常
docker logs --since 2m -f airflow_scheduler_1
image.png
docker logs --since 2m -f airflow_webserver_1
image.png
docker logs --since 2m -f airflow_flower_1
image.png
docker logs --since 2m -f airflow_worker_1
image.png
通过8080端口查看 webserver 界面
在服务器的 dags 挂载目录中创建一个测试 dag:
Fundamental Concepts — Airflow Documentation

from datetime import datetime, timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
with DAG(
    "tutorial",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        "depends_on_past": False,
        "email": ["airflow@example.com"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function, # or list of functions
        # 'on_success_callback': some_other_function, # or list of functions
        # 'on_retry_callback': another_function, # or list of functions
        # 'sla_miss_callback': yet_another_function, # or list of functions
        # 'trigger_rule': 'all_success'
    },
    description="A simple tutorial DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    t2 = BashOperator(
        task_id="sleep",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=3,
    )
    t1.doc_md = dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
    **Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id="templated",
        depends_on_past=False,
        bash_command=templated_command,
    )

    t1 >> [t2, t3]

scheduler 会扫描 dag 目录下的 dag 文件,过会儿在界面上可以查看到对应的 dag:
image.png

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐