在这里插入图片描述

在这里插入图片描述

Airflow官网地址: https://airflow.apache.org/docs/apache-airflow/stable/start/index.html.

在这里插入图片描述
在这里插入图片描述


官方提供的图片:
在这里插入图片描述

AirflowPython
2.3.0及以上3.8.10
2.0.2 ~ 2.2.53.6.12

2022-06-21安装2.3.0以上版本一直报警,无法解决,降低版本
在这里插入图片描述


1. Python Install

  • **下载python 3.7.8 ,下面的都改成3.6.12,某些地方略微修改 **
    wget https://www.python.org/ftp/python/3.6.12/Python-3.6.12.tgz
  • 解压
    tar -zxvf Python-3.6.12.tgz
    在这里插入图片描述
  • 安装依赖
    sudo yum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel db4-devel libpcap-devel xz-devel libffi-devel
  • 安装
    cd Python-3.6.12
    ./configure -prefix=/usr/local/python3
    在这里插入图片描述
    看到很多config后面加参数的,那都是高版本才有的,低版本都没有参数
    Python config: https://docs.python.org/zh-cn/dev/using/configure.html.
    在这里插入图片描述
  • 编译
    make && make install
  • 添加软链
    最好先删除软链,请看下面。然后再创建
    sudo ln -s /usr/local/python3/bin/python3.6 /usr/bin/python3
    sudo ln -s /usr/local/python3/bin/pip3.6 /usr/bin/pip3
  • 检验
    python3 -V
    pip3 -V
  • 升级
    pip3 install --upgrade pip
  • 退出python命令
    输入exit(),回车
    输入quit(),回车

2. Mysql Install

CentOS安装mysql真的太难了,花了我一天的时间


Mysql Install: https://blog.csdn.net/weixin_43916074/article/details/125284109.

3. Airflow Install

3.1 Run Locally

Running Airflow locally: https://airflow.apache.org/docs/apache-airflow/2.2.3/start/local.html.
在这里插入图片描述

  • Airflow needs a home. ~/airflow is the default, but you can put it
    export AIRFLOW_HOME=~/airflow

  • Install Airflow using the constraints file
    AIRFLOW_VERSION=2.3.2
    PYTHON_VERSION=“$(python --version | cut -d " " -f 2 | cut -d “.” -f 1-2)”

  • For example: 3.7
    CONSTRAINT_URL=“https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt”

  • For example: https://raw.githubusercontent.com/apache/airflow/constraints-2.3.2/constraints-3.7.txt
    pip install “apache-airflow==${AIRFLOW_VERSION}” --constraint “${CONSTRAINT_URL}”

  • The Standalone command will initialise the database, make a user,and start all components for you.
    airflow standalone

3.2 Pip3 install

  • Airflow needs a home. ~/airflow is the default, but you can put it
    export AIRFLOW_HOME=~/airflow
  • pip3 install “apache-airflow[celery]==2.2.5” --constraint “https://raw.githubusercontent.com/apache/airflow/constraints-2.2.5/constraints-3.6.txt”

  • 当我的环境python = 3.6.12
    sudo pip3 install apache-airflow =>自动选择2.2.5版本

在这里插入图片描述

  • 报警
    在这里插入图片描述
    sudo pip3 install --upgrade pip
    sudo pip3 install apache-airflow
  • 进入python安装目录
    cd /usr/local/python3/bin
  • 初始化
    ./airflow

这下面使用airflow都要./airlfow,要建软链,我配置变数都不行。

  • 配置环境变量
    sudo vim /etc/profile
    content : export AIRFLOW_HOME=~/airflow
  • 执行配置
    source /etc/profile
  • 检查版本
    airflow version
  • 会报警,说升级更新sqlit

  • 配置环境变量
    sudo vim ~/.bashrc
    content : export AIRFLOW_HOME=~/airflow
  • 执行配置
    source ~/.bashrc
  • 检查版本
    airflow version

3.2 Config Mysql

  • 进入mysql
    mysql -u root -p
  • 建数据库
    mysql> create database if not exists airflow default character set utf8 default collate utf8_general_ci;
  • 建用户
    mysql> create user ‘airflow’@‘%’ identified by ‘Airflow@123’;
  • 授权1
    mysql> grant all privileges on airflow.* to airflow@localhost identified by ‘Airflow@123’;

  • 授权2
    mysql> grant all privileges on airflow.* to ‘airflow’@‘%’ identified by ‘Airflow@123’;

  • 授权3
    mysql> flush privileges;
  • 修改配置~/airflow/airflow.cfg,一般都在当前用户下面
    #sql_alchemy_conn = sqlite:data/airflow/airflow.db
    sql_alchemy_conn = mysql://airflow:Airflow@123@localhost:3306/airflow
  • 执行
    airflow version在这里插入图片描述
  • 继续安装插件
    sudo yum install -y mysql-devel
    pip3 install pymysql
    pip3 install mysql
  • 初始化
    airflow db init
    在这里插入图片描述
  • Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql
    mysql -u root -p
    mysql > show databases;
    mysql > use airflow;
    mysql > show global variables like ‘%timestamp%’;
    mysql > set global explicit_defaults_for_timestamp =1;
    mysql > exit在这里插入图片描述
  • 再次初始化
    airflow db init
    在这里插入图片描述
    在这里插入图片描述
  • 启动服务
    airflow users create -u admin -p admin -f admin -l admin -r Admin -e xxxxx
    airflow webserver --port 8080
    airflow scheduler
  • 访问
    http://ip:8080/

4. HA Environment

4.1 Celery+Redis

  • 安装redis
    yum -y install redis
  • 修改redis配置
    sudo vim /etc/redis.conf
    #bind 127.0.0.1 // 注释掉,使redis允许远程访问
    requirepass redis // 修改这行,redis设置登录密码(自定义,目前是redis)
  • 关闭密码验证
    protected-mode no
  • 启动redis
    sudo systemctl start redis
  • 检查redis服务
    ps -ef | grep redis
  • 测试redis服务
    cd /usr/bin
    redis-cli 类似于进入mysql
    redis install with ssl: https://blog.csdn.net/weixin_43916074/article/details/126470126.
  • python下载redis库
    sudo pip3 install redis
  • python下载celery库
    sudo pip3 install celery
  • 修改配置 airflow.cfg
    executor = CeleryExecutor
    broker_url = redis://127.0.0.1:6379/0
    result_backend = redis://127.0.0.1:6379/0 or result_backend = db+musql://username:password@localhsot:3306/airflow
  • 搜索,总能找到一些蛛丝马迹
    sudo find / -name ‘redis*’
    在这里插入图片描述

4.2 Workers

修改master的配置,不要用localhost

[core]
dags_folder = /root/airflow/dags
 
#修改时区
default_timezone = Asia/Shanghai
 
#配置Executor类型,集群建议配置CeleryExecutor
executor = CeleryExecutor
 
# 配置数据库
sql_alchemy_conn=mysql://airflow:Airflow@123@xxipxx:3306/airflow
[webserver]
#设置时区
default_ui_timezone = Asia/Shanghai
 
[celery]
#配置Celery broker使用的消息队列
broker_url = redis://xxipxx:6379/0
#配置Celery broker任务完成后状态更新使用库
result_backend = db+mysql://airflow:Airflow@123@xxipxx:3306/airflow

将master的airflow.cfg覆盖掉其他节点的airflow.cfg

其实就是所有人都共用一个mysql和redis

在master启动webserver,scheduler,在其他node启动wroker

  • master command
    cd /usr/local/python3/bin
    ./airflow webserver --port 8081
    ./airflow scheduler

  • other node command
    ./airflow celery worker

  • 前面都是测试,正式环境都要守护进程启动
    ./airflow webserver -D
    ./airflow scheduler -D
    ./airflow celery worker -D

5. Https

5.1 Close service

  • close service
    ps -ef | grep airflow
    kill -9 xxxx

5.2 Config https

在这里插入图片描述

6. Delete Dags

  • 客制化dags,只需要参考tutorial code就ok啦,如何将其他删掉
  • 通过detail找到路径,全部移除
    在这里插入图片描述
  • 最后效果
    在这里插入图片描述
  • 客制化的直接在airflow的目录删除就好了
    /home/xxxx/airflow/dags

7. Email Notification

7.1 Create DAG

with DAG(
    dag_id='hello_world',
    # [START default_args]
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        'owner': 'Nan',
        'depends_on_past': False,
        'email': ['nan.zhao@deltaww.com'],
        'email_on_failure': True,
        'email_on_retry': True,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        'on_failure_callback': failure_callback,   //打开这两个
        'on_success_callback': success_callback,   //打开这两个回调函数
        # 'on_retry_callback': another_function,
        # 'sla_miss_callback': yet_another_function,
        # 'trigger_rule': 'all_success'
    },
    # [END default_args]
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    tags=['example'],
) as dag:

    # [START documentation]
  
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    download_task= PythonOperator(
        task_id = "hello_world",
        python_callable = demo    //呼叫自定义demo function
    )
 
# [END tutorial]

7.2 Send_email

// 获取airflow.cfg里面的配置参数
def send_email(mail_msg):
    """SMTP Service"""
    mail_host = conf.get("smtp","smtp_host")
    mail_user = conf.get("smtp","smtp_user")
    mail_pass = conf.get("smtp","smtp_password")
    sender = conf.get("smtp","smtp_mail_from")
    port = conf.get("smtp","smtp_port")

    receivers = ['xxxxx.com']

    message = MIMEText(mail_msg, 'html', 'utf-8')
    message['From'] = sender
    message['To'] = ";".join(receivers)
    subject = 'Airflow Msg'
    message['Subject'] = Header(subject, 'utf-8')

    try:
        smtp_obj = smtplib.SMTP(mail_host)
        smtp_obj.connect(mail_host,port)
        smtp_obj.starttls()
        smtp_obj.login(mail_user, mail_pass)
        smtp_obj.sendmail(sender, receivers, message.as_string())
    except smtplib.SMTPException as error:
        print("Error: send failure ,"+ error)

def send_email_fun(msg):
    send_email(msg)

7.3 Call_back

// An highlighted block
def send_email_fun(msg):
    send_email(msg)

 
def success_callback(context: dict):
    dag_id = context['dag'].dag_id
    email = context['dag'].default_args['email']
    schedule_interval = context['dag'].schedule_interval
    task_id = context['task_instance'].task_id
    run_id = context['run_id']
    operator = context['task_instance'].operator
    state = context['task_instance'].state
    duration = '%.1f' % context['task_instance'].duration
    max_tries = context['task_instance'].max_tries
    hostname = context['task_instance'].hostname
    start_date = context['task_instance'].start_date.strftime('%Y-%m-%d %H:%M:%S')
    end_date = context['task_instance'].end_date.strftime('%Y-%m-%d %H:%M:%S')
    params = context['params']
    var = context['var']
    test_mode = context['test_mode']
    execution_date = context['logical_date'].strftime('%Y-%m-%d %H:%M:%S')
    next_execution_date = context['data_interval_end'].strftime('%Y-%m-%d %H:%M:%S')
 
    msg = f"""<h3 style='color: green;'>Airflow DAG: {task_id}</h3><tr>
        <table width='1500px' border='1' cellpadding='2' style='border-collapse: collapse'>
        <tr><td width='30%' align='center'>DAG Name</td><td>{dag_id}</td></tr>
        <tr><td width='30%' align='center'>Task name</td><td>{task_id}</td></tr>
        <tr><td width='30%' align='center'>Run Cycle</td><td>{schedule_interval}</td></tr>
        <tr><td width='30%' align='center'>Run ID</td><td>{run_id}</td></tr>
        <tr><td width='30%' align='center'>Task type</td><td>{operator}</td></tr>
        <tr><td width='30%' align='center'>Stask Status</td><td style='color: green;'>Succeed</td></tr></table>
    """
    
    print('succeed')
    send_email_fun(msg)
    print("this is succeed")

def failure_callback(context: dict):
    dag_id = context['dag'].dag_id
    email = context['dag'].default_args['email']
    schedule_interval = context['dag'].schedule_interval
    task_id = context['task_instance'].task_id
    run_id = context['run_id']
    operator = context['task_instance'].operator
    state = context['task_instance'].state
    duration = '%.1f' % context['task_instance'].duration
    max_tries = context['task_instance'].max_tries
    hostname = context['task_instance'].hostname
    start_date = context['task_instance'].start_date.strftime('%Y-%m-%d %H:%M:%S')
    end_date = context['task_instance'].end_date.strftime('%Y-%m-%d %H:%M:%S')
    params = context['params']
    var = context['var']
    test_mode = context['test_mode']
    exception = context['exception']
    execution_date = context['logical_date'].strftime('%Y-%m-%d %H:%M:%S')
    next_execution_date = context['data_interval_end'].strftime('%Y-%m-%d %H:%M:%S')
 
    msg = f"""<h3 style='color: green;'>Airflow DAG : {task_id} Error</h3>
        <table width='100%' border='1' cellpadding='2' style='border-collapse: collapse'>
        <tr><td width='30%' align='center'>DAG Name</td><td>{dag_id}</td></tr>
        <tr><td width='30%' align='center'>Task Name</td><td>{task_id}</td></tr>
        <tr><td width='30%' align='center'>Run Cycle</td><td>{schedule_interval}</td></tr>
        <tr><td width='30%' align='center'>Task ID</td><td>{run_id}</td></tr>
        <tr><td width='30%' align='center'>Task Type</td><td>{operator}</td></tr>
        <tr><td width='150px' style='color: red;'>Task Status</td><td style='color: red;'>{state}</td></tr>
        <tr><td width='30%' align='center'>Retry</td><td>{max_tries}</td></tr>
        <tr><td width='30%' align='center'>Time</td><td>{duration}s</td></tr>
        <tr><td width='30%' align='center'>Hostname</td><td>{hostname}</td></tr>
        <tr><td width='30%' align='center'>exe_date</td><td>{execution_date}</td></tr>
        <tr><td width='30%' align='center'>stasrt_date</td><td>{start_date}</td></tr>
        <tr><td width='30%' align='center'>end_date</td><td>{end_date}</td></tr>
        <tr><td width='30%' align='center'>previoue_exe_date</td><td>{prev_execution_date}</td></tr>
        <tr><td width='30%' align='center'>next_exe_date</td><td>{next_execution_date}</td></tr>
        <tr><td width='30%' align='center'>Parmameters</td><td>{params}</td></tr>
        <tr><td width='30%' align='center'>Variable</td><td>{var}</td></tr>
        <tr><td width='30%' align='center'>Mode</td><td>{test_mode}</td></tr>
        <tr><td width='30%' align='cneter'>Task Status</td><td style='color: red;'>{state}</td></tr>
        <tr><td width='150px' style='color: red;'>Error Msg</td><td style='color: red;'>{exception}</td></tr></table>
    """
 
    send_email_fun(msg)
    print("this is failure")

8. Config

airflow.cfg

[core]
dags_folder = /airlfow_home/dags        #客制化的dag目录,用python执行后,自动写入UI界面

default_timezone = Asia/Shanghai        #设置默认的时区

executor    #有以下几个选择,生产一般使用CeleryExecutor
   SequentialExecutor    #按顺序调度,这个一般适用于开发、测试,因为按顺序调度,真的是太慢了。
   LocalExecutor         #local模式,就无法使用集群扩展了。
   CeleryExecutor        #分布式调度
   DaskExecutor          #没用过
   
KubernetesExecutor       #kubernetes方式提交任务,也适用于生产,一般是那些单个任务比较大的情况。

#airflow的元数据库连接串,可以是sqlite、mysql、postgresql,推荐mysql和postgresql:
sql_alchemy_conn = mysql://username:password@localhost:3306/airflow?charset=utf8

parallelism            #所有worker同时运行的task数;
dag_concurrency        #单个dag中同时运行的task数;

max_active_runs_per_dag     #单个dag最多同时运行的dag_run数;

load_examples          #是否加载airflow自带的example dag;

fernet_key             #connection和variable加密用的key,记住,如果你要升级airflow,升级后需要把此参数值copy到新的配置文件中,不然升级后,airflow中的connection和variable加密的值会解密失败,无法查看;

dagbag_import_timeout            #scheduler加载新的dag使用的超时时间;
dag_file_processor_timeout       #scheduler解析dag使用的超时时间;

store_serialized_dags            #是否序列化dag,开启时,scheduler会定时将dag序列化到数据库中存储起来,这样我们的webserver就直接可以查数据库了,更方便快捷高效;
min_serialized_dag_update_interval  #序列化dag的最小间隔时间,也就是说,scheduler序列化所有dag后,隔多长时间再序列化。
store_dag_code                   #是否存储dag代码到数据库;


[webserver]
base_url                         #webserver启动的访问url
web_server_host                  #webserver的host
web_server_port                  #webserver的访问端口

web_server_ssl_cert              #开启https,使用openssl生成的cert 
web_server_ssl_key               #开启https,使用openssl生成的key

web_server_master_timeout        #webserver其实启动的是gunicorn服务,那么webserver master gunicorn服务超过这个时间没有响应,就会抛出异常;
web_server_worker_timeout        #webserver worker服务同理
workers                          #webserver启动的gunicorn服务个数,个数越多,webserver更快,当然根据系统资源来设置

expose_config                    #是否可以在web上查看airflow.cfg;

authenticate              #webserver认证方式,我们可以设置以下几个登录认证方式,这几个方式仅限登录认证:
    - github_enterprise_auth
    - google_auth
    - kerberos_auth
    - ldap_auth
    - password_auth
#当然我们后面还有一个rbac认证模式,这是生产用的最多的方式,因为它包含了web界面上每一个点击按钮的权限
filter_by_owner                   #当我们使用了以上认证方式时,如果filter_by_owner设置为true,那么登录后,界面上显示的dag设置的owner就是跟登录用户一致的。做到了dag的用户隔离;

dag_default_view                  #在我们从web界面进入某个dag时,默认展示的页面,可以设置tree, graph, duration, gantt, landing_times

hide_paused_dags_by_default       #是否隐藏关闭的dag;

page_size                         #web页面主页默认展示的dag个数,设置越少,打开主页速度越快哦;

rbac                              #权限认证模式,开启时,可以做到web界面的权限控制,细粒度到每个dag的每个task的每个操作;

default_dag_run_display_number    #打开dag的tree界面时,默认显示多少个运行批次:越少越好,打开的速度也就越快;



[sentry]
sentry_dsn     #sentry是一个监控系统,可以监控到airflow服务的运行情况

[celery]
worker_concurrency      #单个worker的并发执行task数

broker_url              #broker支持rabbitmq/redis/mysql/postgresql, task执行命令的中转站,scheduler发送执行命令到中转站,然后worker去消费这些task命令,并执行

result_backend          #worker执行完task,将结果存储的位置,官方推荐使用落地的持久化数据库,如mysql、postgresql等

flower_host             #flower是airflow中监控broker和worker的一个服务,有自己的单独页面,可以看到broker情况、task执行情况

flower_port             #flower访问的端口


[scheduler]                   #airflow最重要的角色:调度器,它承担着初始化dag文件、动态编译dag文件、发现调度任务并发送执行命令的责任。我们调度的快慢,也需要从这里设置一些参数,下面来看看。
- job_heartbeat_sec           #任务的心跳监控时间,每隔job_heartbeat_sec秒去监控一下任务的状态
- scheduler_heartbeat_sec     #监控scheduler的状态,每隔scheduler_heartbeat_sec秒去监控scheduler状态
- run_duration                #scheduler运行多长时间停止,-1表示不停止,持续调度
- num_runs                    #每隔dag的调度次数,-1表示无限次
- processor_poll_interval     #解析每个dag之间的间隔时间
- min_file_process_interval   #间隔多长时间一个新的dag被拾起;
- dag_dir_list_interval       #间隔多长时间,scheduler从磁盘读取出dag列表。跟上一个参数min_file_process_interval的区别就是,此参数不解析dag,只获取dag列表。先获取dag列表,在解析dag文件;
- parsing_processes           #编译dag所使用的schedule进程数,这个非常关键,影响到dag的调度速度和task的调度速度,官方推荐cpu盒数-1,但是我们可以设置到2倍的cpu盒数,榨干cpu。

9. Error

  • 启动worker报错:connection in use: (‘0.0.0.0’, 8793)
    在这里插入图片描述
  • 查询8793port是被谁占用,对应的pid
    netstat -npl
    sudo kill -9 127070在这里插入图片描述
  • 再次启动
    在这里插入图片描述
    ps -ef 查看服务
    在这里插入图片描述
  • airflow webserver -D 无法使用守护进程启动
    进入airflow安装目录下,remove airflow-webserver.err and airflow-webserver-monitor.pid
    在这里插入图片描述
  • 启动报错:Add Permission to Role Error
    将airflow的版本切换到2.2.3 or 2.2.5
    在这里插入图片描述
Logo

助力广东及东莞地区开发者,代码托管、在线学习与竞赛、技术交流与分享、资源共享、职业发展,成为松山湖开发者首选的工作与学习平台

更多推荐