
最近在折腾数据管道的自动化,Airflow 这个名字出现频率极高,干脆上手试试。这篇文章算是学习笔记,原文是篇很标准的入门教程,结合实操体验重新整理了一遍,加了点踩坑心得。
如果你也写了 ETL 脚本但还在手动跑,或者想了解怎么把这套东西整到生产级别,这篇应该有用。
老实说,刚听到 DAG(Directed Acyclic Graph,有向无环图)这个概念的时候我是有点懵的——不就是跑个脚本吗,搞这么复杂干嘛?
但当你真正需要:每天凌晨自动跑数据清洗、某个任务失败了要自动重试、多个任务之间有依赖关系(A 跑完才能跑 B)、同时管理好几个数据管道——的时候,你就会发现光靠 cron + Python 脚本根本撑不住。Airflow 就是来解决这些问题的——它本质上是一个工作流编排工具,把你的 ETL 脚本包装成可调度、可监控、可重试的流水线。
pip install apache-airflow
airflow db init
初始化数据库别漏了,不然 webserver 跑不起来。
假设你有个这样的脚本 etl_pipeline.py:
import pandas as pd
def extract():
return pd.read_csv("sales.csv")
def transform(df):
df["total"] = df["quantity"] * df["price"]
return df
def load(df):
df.to_csv("processed_sales.csv", index=False)
def run_etl():
data = extract()
transformed = transform(data)
load(transformed)
if __name__ == "__main__":
run_etl()
在 dags/ 目录下新建文件,比如 sales_etl_dag.py:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from etl_pipeline import run_etl
default_args = {
"owner": "airflow",
"start_date": datetime(2026, 1, 1),
"retries": 2
}
with DAG(
dag_id="sales_etl_pipeline",
default_args=default_args,
schedule="@daily",
catchup=False
) as dag:
etl_task = PythonOperator(
task_id="run_sales_etl",
python_callable=run_etl
)
几个关键参数:schedule 是调度频率,@daily 是每天,也可以用 cron 表达式;retries 是失败自动重试次数;catchup 设为 False 避免从 start_date 补跑历史任务。
开两个终端窗口,分别跑:
airflow scheduler # 调度器
airflow webserver --port 8080 # Web UI
浏览器访问 http://localhost:8080,能看到 DAG 管理界面:所有 DAG 列表、手动触发任务、查看执行历史、定位失败任务、看日志。
把 DAG 文件扔进 dags/ 目录,Airflow 会自动发现。然后在 Web UI 里找到 DAG,把开关 toggle 打开,调度就激活了。
实际业务里通常会把 ETL 拆成多个步骤:
extract_task >> transform_task >> load_task
这样 Airflow 会严格按顺序执行,load 任务只会在 transform 成功后才会启动。
Airflow 自带任务级日志,失败的时候能直接看到是哪一行代码报错。另外几个值得开启的功能:自动重试、任务级日志、SLA 监控、邮件通知、失败告警。
随着数据平台增长,Airflow 可以编排:多个数据源、复杂依赖链、机器学习工作流、数据仓库加载、实时集成。
数据管道的逻辑写好了,Airflow 就是让它们在生产环境里可靠跑起来的那个引擎。
原文:https://dev.to/petermuriya/automating-etl-workflows-with-apache-airflow-from-python-script-to-scheduled-pipeline-1hee