Photo by Ula Kuźma on Unsplash

Earlier on, we had little automation around the maintenance of our Airflow cluster. Those of you familiar with Airflow may be aware of the bloat accumulated over time from DAG runs. Metadata for each DAG run and its tasks are recorded in the appropriate tables; log files are also written for every task.

We currently output log files locally (as opposed to cloud storage). For our use case, 30 days worth of metadata retention was sufficient. To delete database table entries and log files older than 30 days, we leveraged maintenance DAGs shared by the team at Clairvoyant.

At the time of this writing, we have almost 1,000 DAGs. As you can expect, table cleanup and log cleanup are a couple of the most important maintenance tasks for us. Specifically, we relied on the db-cleanup and log-cleanup DAGs to minimize database and server space.

Need for Speed

With the upgrade to Airflow 2.0, the immense performance improvements to the scheduler allowed us to increase the frequency of many of our DAGs. We talk more about the major gains we were able to achieve, along with learnings, here. One third of our DAGs now run every five minutes, and a sizable chunk execute every minute! Yet, this quickly equated to over 250,000 of database table entries and log files per day.

Pretty soon, the log cleanup DAG was taking over 24 hours to recursively find and delete old log files. The DAG effectively uses a bash operator that searches for files older than the allotted maximum day (using find) and deletes them (via-exec rm). However, the sheer volume of logs along with the nested directory scheme made the recursive traversal and searching of old log files very cumbersome.

New and Improved

Re-thinking how we could make the log archival more efficient, we realized that we could write a DAG that could quickly delete old logs by avoiding the find command altogether. After all, we already had all the metadata needed to construct the log file paths for any given DAG runs in the database tables. Airflow outputs logs using a convention that includes the DAG name, task name, and execution timestamp. For example:

/path/to/logs/dag_name/task_name/2021–08–15T12:25:00+00:00/1.log

Instead of searching for files on the server older than n number of days, we simply issued a select query against the dag and task_instance tables. We retrieve all entries with execution times older than n number of days. The log directory paths are then constructed from the dag names, task names, and execution timestamps. Finally, we delete those directories which contains the log files (including retries).

One important note is: since we also purge older records from the database tables, we need to make sure to run the log cleanup DAG before the table cleanup DAG. Otherwise, the log cleanup script would not be able to retrieve the old DAG runs to construct the directory paths to delete. Below is a quick and dirty script using the Python Operator to do the job.

from datetime import datetime
import logging
import shutil
from airflow import DAG
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator
from dags import dag_makerdefault_args = {
"owner": "airflow",
'depends_on_past': False,
'email': "foo@email.com",
'email_on_failure': True
}
MAX_LOG_DAYS = 30
LOG_DIR = '/efs/airflow/logs/'
def find_old_logs():
# Query old dag runs and build the log file paths to be deleted
# Example log directory looks like this:
# '/path/to/logs/dag_name/task_name/2021-01-11T12:25:00+00:00'
sql = f"""
SELECT '{LOG_DIR}' || dag_id || '/' || task_id || '/' || replace(execution_date::text, ' ', 'T') || ':00' AS log_dir
FROM task_instance
WHERE execution_date::DATE <= now()::DATE - INTERVAL '{MAX_LOG_DAYS} days'
"""
src_pg = PostgresHook(postgres_conn_id='airflow_db')
conn = src_pg.get_conn()
logging.info("Fetching old logs to purge...")with conn.cursor() as cursor:
cursor.execute(sql)
rows = cursor.fetchall()
logging.info(f"Found {len(rows)} log directories to delete...")
for row in rows:
delete_log_dir(row[0])
def delete_log_dir(log_dir):
try:
# Recursively delete the log directory and its log contents (e.g, 1.log, 2.log, etc)
shutil.rmtree(log_dir)
logging.info(f"Deleted directory and log contents: {log_dir}")
except OSError as e:
logging.info(f"Unable to delete: {e.filename} - {e.strerror}")
with DAG(
dag_id="airflow_log_cleanup",
start_date=datetime(2021, 1, 1),
schedule_interval="00 00 * * *",
default_args=default_args,
max_active_runs=1,
catchup=False,
) as dag:
log_cleanup_op = PythonOperator(
task_id="delete_old_logs",
python_callable=find_old_logs
)

Summary

With the new log cleanup DAG, we were able to reduce the run time from over 24 hours to only ~40 minutes. We expect to expand our usage of Airflow even more this year. As a side effect, we may likely encounter new challenges requiring clever solutions. Thankfully, the significant improvements in Airflow 2.0 have brought a big level of reliability for us. If you have tips and tricks that have helped to keep your Airflow operations in tip top shape, feel free to leave a comment.

Logo

开发云社区提供前沿行业资讯和优质的学习知识,同时提供优质稳定、价格优惠的云主机、数据库、网络、云储存等云服务产品

更多推荐