site logo

Marico's space

写了 ETL 脚本,然后呢?用 Apache Airflow 把它变成靠谱的生产流水线

服务器技术 2026-04-27 15:07:57 2

最近在折腾数据管道的自动化,Airflow 这个名字出现频率极高,干脆上手试试。这篇文章算是学习笔记,原文是篇很标准的入门教程,结合实操体验重新整理了一遍,加了点踩坑心得。

如果你也写了 ETL 脚本但还在手动跑,或者想了解怎么把这套东西整到生产级别,这篇应该有用。


先搞清楚:为什么是 Airflow?

老实说,刚听到 DAG(Directed Acyclic Graph,有向无环图)这个概念的时候我是有点懵的——不就是跑个脚本吗,搞这么复杂干嘛?

但当你真正需要:每天凌晨自动跑数据清洗、某个任务失败了要自动重试、多个任务之间有依赖关系(A 跑完才能跑 B)、同时管理好几个数据管道——的时候,你就会发现光靠 cron + Python 脚本根本撑不住。Airflow 就是来解决这些问题的——它本质上是一个工作流编排工具,把你的 ETL 脚本包装成可调度、可监控、可重试的流水线。

准备工作

  • 一个能跑的 Python ETL 脚本
  • Python 3.9 或更高版本
  • Airflow 已安装
  • 数据库(PostgreSQL、MySQL 或 SQLite 都行,生产环境推荐 PostgreSQL)
  • 对 DAG 的基本了解——简单说就是"任务及其依赖关系的有向无环图"

Step 1:安装 Airflow

pip install apache-airflow
airflow db init

初始化数据库别漏了,不然 webserver 跑不起来。

Step 2:确认 ETL 脚本能跑

假设你有个这样的脚本 etl_pipeline.py

import pandas as pd

def extract():
    return pd.read_csv("sales.csv")

def transform(df):
    df["total"] = df["quantity"] * df["price"]
    return df

def load(df):
    df.to_csv("processed_sales.csv", index=False)

def run_etl():
    data = extract()
    transformed = transform(data)
    load(transformed)

if __name__ == "__main__":
    run_etl()

Step 3:创建一个 DAG

dags/ 目录下新建文件,比如 sales_etl_dag.py

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from etl_pipeline import run_etl

default_args = {
    "owner": "airflow",
    "start_date": datetime(2026, 1, 1),
    "retries": 2
}

with DAG(
    dag_id="sales_etl_pipeline",
    default_args=default_args,
    schedule="@daily",
    catchup=False
) as dag:
    etl_task = PythonOperator(
        task_id="run_sales_etl",
        python_callable=run_etl
    )

几个关键参数:schedule 是调度频率,@daily 是每天,也可以用 cron 表达式;retries 是失败自动重试次数;catchup 设为 False 避免从 start_date 补跑历史任务。

Step 4:启动 Airflow 服务

开两个终端窗口,分别跑:

airflow scheduler        # 调度器
airflow webserver --port 8080  # Web UI

Step 5:打开 Web UI

浏览器访问 http://localhost:8080,能看到 DAG 管理界面:所有 DAG 列表、手动触发任务、查看执行历史、定位失败任务、看日志。

Step 6:启用 DAG

把 DAG 文件扔进 dags/ 目录,Airflow 会自动发现。然后在 Web UI 里找到 DAG,把开关 toggle 打开,调度就激活了。

Step 7:加点依赖关系

实际业务里通常会把 ETL 拆成多个步骤:

extract_task >> transform_task >> load_task

这样 Airflow 会严格按顺序执行,load 任务只会在 transform 成功后才会启动。

Step 8:监控和调试

Airflow 自带任务级日志,失败的时候能直接看到是哪一行代码报错。另外几个值得开启的功能:自动重试、任务级日志、SLA 监控、邮件通知、失败告警。

Step 9:生产级最佳实践

  • 用 Airflow Connections 安全存储凭证
  • 用环境变量
  • 开启日志
  • ETL 逻辑要做到幂等
  • 加数据质量检查
  • 用生产级元数据库

Step 10:扩展你的流水线

随着数据平台增长,Airflow 可以编排:多个数据源、复杂依赖链、机器学习工作流、数据仓库加载、实时集成。

数据管道的逻辑写好了,Airflow 就是让它们在生产环境里可靠跑起来的那个引擎。

原文:https://dev.to/petermuriya/automating-etl-workflows-with-apache-airflow-from-python-script-to-scheduled-pipeline-1hee