Apache Airflow 完整知识体系 Roadmap
定位说明:本文档覆盖 Airflow 从入门到生产级别的全部知识点,按初级 → 中级 → 高级三层递进,每层配合"你应该能做什么"的能力锚点,帮助你在求职和项目中精准定位自己的水平。
第零层:思维框架 — Airflow 是什么,不是什么
在学任何工具之前,先建立正确的心智模型。
Airflow 的本质是一个工作流编排器(Workflow Orchestrator),而不是一个数据处理引擎。它的工作是告诉其他系统"什么时候做什么",而不是自己去做数据计算。这个区别至关重要:Airflow 不移动数据,它调度任务;Spark 移动和处理数据,Airflow 告诉 Spark 什么时候启动。
理解这个边界,你才能回答面试中最常见的陷阱问题:"Airflow 和 Spark 的区别是什么?"
Airflow 的三个核心哲学:
第一,一切皆代码(Everything as Code)。你的工作流定义是 Python 代码,可以版本控制、代码审查、单元测试,这是 Airflow 相比 GUI 拖拽工具(如 SSIS)的本质优势。
第二,DAG 是声明式的,不是命令式的。DAG 文件描述的是任务之间的依赖关系图,而不是执行流程。Airflow 的 Scheduler 读取这个图,然后决定什么时候触发哪个任务。
第三,幂等性(Idempotency)是设计原则。每个任务应该能安全地重跑,多次执行的结果应该和执行一次一样。这决定了你设计 Task 的方式。
第一层:初级知识体系(Junior / Getting Started)
能力锚点:能独立安装 Airflow,理解所有核心概念,能写简单的 ETL DAG,能用 UI 监控和调试任务。
1.1 核心概念体系
DAG(Directed Acyclic Graph,有向无环图)
DAG 是 Airflow 的基本工作单元,代表一个完整的工作流。"有向"意味着任务之间有方向性的依赖关系,"无环"意味着不能有循环依赖(A 依赖 B,B 又依赖 A)。
DAG 有几个关键属性需要深刻理解。dag_id 是全局唯一标识符,一旦投入生产不要随意修改。schedule_interval 决定 DAG 的触发频率,可以是 Cron 表达式("0 6 * * *" 代表每天早 6 点)或预设值(@daily, @hourly)。start_date 是 DAG 开始调度的时间,这里有一个极其重要的概念需要单独讲解。
catchup 参数控制是否补跑历史任务。如果你今天设置了一个 start_date 是两周前的 DAG,且 catchup=True,Airflow 会尝试把过去两周所有错过的执行都跑一遍。大多数生产环境应该设置 catchup=False。
Execution Date 与 Data Interval 的深层理解
这是 Airflow 最容易让初学者困惑的地方,必须彻底理解。
在 Airflow 中,execution_date 代表的是数据区间的开始时间,不是任务实际运行的时间。比如一个 @daily 的 DAG,execution_date 为 2024-01-01 的那次运行,实际上是在 2024-01-02 的 00:00 才被触发,处理的是 2024-01-01 这一天的数据。
Airflow 2.x 引入了 data_interval_start 和 data_interval_end 来更清晰地表达这个概念,但 execution_date 仍然被保留以维持向后兼容。
Task 和 Operator
Task 是 DAG 中的一个执行单元,而 Operator 是定义 Task 做什么事的模板类。常见 Operator 体系如下:
BashOperator 执行 Shell 命令,是最基础的 Operator,适合调用脚本、命令行工具。PythonOperator 执行 Python 函数,是最灵活的,几乎可以做任何事。EmptyOperator(旧版叫 DummyOperator)不做任何操作,仅作为流程控制的节点,用于标记 DAG 的开始/结束或创建条件分支汇合点。
Provider Operators 是 Airflow 生态中对第三方服务的封装:SparkSubmitOperator 提交 Spark 作业,S3ToRedshiftOperator 从 S3 加载数据到 Redshift,BigQueryInsertJobOperator 在 BigQuery 执行 SQL,以此类推。
Task Dependencies(任务依赖关系)
在 Airflow 中,你用 >> 和 << 运算符或 set_upstream / set_downstream 方法定义依赖。task_a >> task_b 意味着 task_b 在 task_a 成功后才执行。可以用列表表达多对多依赖:[task_a, task_b] >> task_c 意味着 task_c 要等 a 和 b 都成功才开始。
Task Instance States(任务状态)
理解状态机是调试的基础。queued 表示等待 Executor 拾取,running 表示正在执行,success 表示成功,failed 表示失败,up_for_retry 表示等待重试,skipped 表示被条件跳过,upstream_failed 表示上游失败导致未能执行。
1.2 安装与部署模式(初级)
本地开发:使用 Docker Compose
初学者最推荐的方式是使用 Airflow 官方提供的 docker-compose.yaml,它会同时启动所有必要组件:Webserver、Scheduler、Worker、Postgres(元数据库)、Redis(消息队列)。
# 下载官方 docker-compose
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'
# 初始化数据库和用户
docker compose up airflow-init
# 启动所有服务
docker compose up -d
pip 安装(理解依赖结构)
Airflow 采用 extras 机制管理可选依赖,你不需要安装所有东西。pip install apache-airflow[postgres,redis,amazon] 表示安装 Airflow 核心加上 PostgreSQL 支持、Redis 支持和 AWS 相关 Operator。
Airflow 组件架构
理解每个组件的职责:Webserver 是 UI 服务,只提供界面和 API,不做调度逻辑。Scheduler 是核心大脑,它扫描 DAG 文件,计算哪些 Task Instance 应该被触发,然后将它们提交给 Executor。Executor 是执行策略的抽象层,决定任务如何被分发(本地执行、进程池、Celery 分布式)。Worker 是实际执行 Task 的进程,在 Celery 模式下是独立的机器或容器。Metadata Database 存储所有状态信息,是整个系统的"事实来源"。DAG Folder 是你的代码存放的地方,Scheduler 会周期性地扫描它。
1.3 初级 DAG 写法模式
理解 DAG 文件的基本结构是一个固定的模式:先定义 DAG 配置(default_args),然后实例化 DAG,最后在 DAG 上下文中定义 Task 并建立依赖关系。
default_args 是一个字典,为所有 Task 提供默认参数,例如 owner(负责人)、retries(重试次数)、retry_delay(重试间隔)、email_on_failure(失败时发邮件)。
TaskFlow API(Airflow 2.0+ 推荐写法)
TaskFlow API 用装饰器语法大幅简化了 PythonOperator 的写法,同时自动处理任务间的数据传递(XCom)。
from airflow.decorators import dag, task
from datetime import datetime
@dag(schedule="@daily", start_date=datetime(2024, 1, 1), catchup=False)
def my_etl_pipeline():
@task()
def extract():
# 返回值会自动成为 XCom
return {"data": [1, 2, 3]}
@task()
def transform(raw_data: dict):
return [x * 2 for x in raw_data["data"]]
@task()
def load(processed_data: list):
print(f"Loading {len(processed_data)} records")
# 函数调用即建立依赖,返回值即传递数据
raw = extract()
processed = transform(raw)
load(processed)
my_etl_pipeline()
这种写法比老式的 PythonOperator + provide_context + 手动 XCom Pull 清晰得多,是现代 Airflow 的标准写法。
1.4 XCom:Task 间的数据传递
XCom(Cross-Communication)是 Airflow 中 Task 之间传递少量数据的机制。数据被存在 Metadata Database 中。
关键限制:XCom 不适合传递大量数据(如 DataFrame、大文件),因为数据存在数据库里。正确的做法是把大数据写入外部存储(S3、数据库),然后只通过 XCom 传递路径或标识符。这个原则在面试中经常被考察。
1.5 Airflow UI 操作
你需要熟悉 UI 的以下功能:DAG 视图(Graph View)展示任务依赖关系图,Grid View 展示历史执行矩阵,Calendar View 展示执行频率,Gantt View 展示每个任务的执行时长(用于性能分析)。
手动触发 DAG、暂停/恢复 DAG、清除(Clear)某个 Task Instance 以触发重跑、查看 Task 的日志——这些是日常运维的基本操作。
第二层:中级知识体系(Intermediate / Production-Ready)
能力锚点:能设计可维护的 DAG 结构,掌握连接管理、变量管理、条件控制、动态 DAG 生成,理解 Executor 差异,能在生产环境排查问题。
2.1 Connections 与 Variables:配置管理
Connections(连接)
Airflow 提供了一个中心化的连接管理机制,将数据库密码、API Key、服务器地址等敏感信息存储在 Metadata Database 中(也可以对接 Vault 等 Secret Backend)。每个 Connection 有一个唯一的 conn_id,Operator 通过 conn_id 引用。
你可以通过 UI、CLI 或环境变量三种方式定义 Connection。在生产环境中,推荐用环境变量方式(AIRFLOW_CONN_{CONN_ID})或 Secret Backend,避免在 UI 中手动管理,实现 Infrastructure as Code。
Variables(变量)
Variables 是全局键值对,适合存储配置参数(如 S3 Bucket 名称、目标数据库名)。注意:每次访问 Variable 都会触发数据库查询,在 DAG 文件顶层(DAG 解析时)大量调用 Variable 会导致 Scheduler 负担过重。最佳实践是在 Task 内部读取 Variable,或使用 Jinja 模板引用。
Secret Backend
生产级配置管理应该对接 Secret Backend,如 HashiCorp Vault、AWS Secrets Manager、GCP Secret Manager。这样 Connection 和 Variable 的实际值不存在 Airflow 的数据库里,而是在运行时从 Secret Backend 动态读取,大幅提升安全性。
2.2 Templating(Jinja 模板)
Airflow 支持在 Operator 参数中使用 Jinja 模板,动态引用运行时上下文变量。
最常用的模板变量是 {{ ds }}(execution date,格式 YYYY-MM-DD)、{{ ts }}(时间戳)、{{ data_interval_start }}、{{ data_interval_end }}、{{ prev_ds }}(上一个 execution date)。
# 在 BashOperator 中使用模板引用执行日期
extract_task = BashOperator(
task_id="extract_data",
bash_command="python extract.py --date={{ ds }}",
)
这是实现"分区处理"的基础:每次 DAG 运行,你的脚本知道自己处理的是哪一天的数据。
2.3 流程控制:Sensors、Branching、Trigger Rules
Sensors(传感器)
Sensor 是一种特殊的 Operator,它会持续等待某个外部条件满足。FileSensor 等待文件出现,S3KeySensor 等待 S3 对象出现,HttpSensor 轮询 HTTP 端点,ExternalTaskSensor 等待另一个 DAG 的某个 Task 完成。
理解 Sensor 的两种模式很重要:mode="poke" 会持续占用一个 Worker slot,mode="reschedule" 会在每次检查失败后释放 Worker slot,等待下次检查再重新占用。生产环境中长时间等待的 Sensor 应该用 reschedule 模式,避免浪费资源。
BranchPythonOperator(条件分支)
分支 Operator 允许根据运行时逻辑选择走哪条任务路径。它执行一个 Python 函数,函数返回下一个要执行的 task_id(或 task_id 列表),不在返回列表中的分支 Task 会被标记为 skipped。
def choose_branch(**context):
if context["ds"] >= "2024-06-01":
return "new_pipeline_task"
else:
return "legacy_pipeline_task"
branch = BranchPythonOperator(
task_id="choose_pipeline_version",
python_callable=choose_branch,
)
Trigger Rules(触发规则)
默认情况下,一个 Task 只有在所有上游 Task 都成功后才会执行(all_success)。但你可以修改这个规则:all_failed(所有上游失败时触发),one_success(任一上游成功就触发),all_done(所有上游完成,无论成功失败),none_failed(没有上游失败,包括跳过的)。
在分支场景中,汇合节点(merge node)应该使用 trigger_rule="none_failed_min_one_success",否则它会因为另一个分支被跳过而无法执行。
2.4 Executor 深度对比
Executor 的选择直接决定了你的 Airflow 集群的扩展性,这是中高级面试必考点。
SequentialExecutor:同一时间只运行一个 Task,使用 SQLite 数据库,仅用于开发测试。永远不要在生产环境使用。
LocalExecutor:在同一台机器上用多进程并行执行 Task。简单,适合小规模场景,但无法水平扩展,所有任务受限于单机资源。
CeleryExecutor:分布式执行架构,使用 Redis 或 RabbitMQ 作为消息队列,Worker 可以部署在多台机器上,支持水平扩展。这是传统的生产级选择,配置较复杂(需要维护消息队列和 Worker 节点)。
KubernetesExecutor:每个 Task 启动一个独立的 Kubernetes Pod,Task 完成后 Pod 销毁。完美的资源隔离,自动扩缩容,是云原生场景的主流选择。代价是每个 Task 有 Pod 启动延迟(通常 10-30 秒)。
CeleryKubernetesExecutor:混合模式,可以为不同 Task 指定使用 Celery Worker 或 K8s Pod,灵活性最高。
LocalKubernetesExecutor(2.3+):同上,混合本地和 K8s。
选择 Executor 的实践原则:开发用 Local,中小型生产用 Celery(如果已有 Redis 基础设施),云原生生产用 Kubernetes。
2.5 动态 DAG 生成
动态 DAG 是中级开发者必须掌握的模式,用于批量生成结构相似但参数不同的 DAG 或 Task。
方案一:循环生成 Task
在一个 DAG 内,用循环为每个数据源、每个国家/地区等生成一组平行任务。
# 为每个数据源生成一个处理任务
data_sources = ["source_a", "source_b", "source_c"]
with DAG("multi_source_pipeline", ...):
for source in data_sources:
process = PythonOperator(
task_id=f"process_{source}",
python_callable=process_source,
op_kwargs={"source": source},
)
方案二:循环生成 DAG(工厂模式)
用一个循环在 DAG 文件中生成多个独立的 DAG 对象,每个客户或每个业务单元有一个独立的 DAG。注意:这种模式会增加 Scheduler 扫描负担,DAG 数量不宜过多(通常不超过几百个)。
方案三:Dynamic Task Mapping(Airflow 2.3+)
这是现代 Airflow 的官方动态任务方案,允许在运行时根据上游 Task 的输出动态创建任务实例,任务数量在执行前不需要确定。
@task
def get_files():
return ["file_a.csv", "file_b.csv", "file_c.csv"]
@task
def process_file(filename: str):
print(f"Processing {filename}")
# expand() 会为列表中的每个元素创建一个独立的 Task Instance
files = get_files()
process_file.expand(filename=files) # 动态生成 3 个并行 Task
2.6 Datasets 与数据驱动调度(Airflow 2.4+)
传统的 Airflow 调度是时间驱动的(schedule_interval)。Dataset 调度是一种数据驱动的模式:DAG B 不是按时间触发,而是在 DAG A 产出某个 Dataset(数据集)之后自动触发。
# DAG A 产出数据
my_dataset = Dataset("s3://my-bucket/processed/orders/")
with DAG("producer_dag", ...):
@task(outlets=[my_dataset])
def write_orders():
# 写入数据到 S3
pass
# DAG B 在 my_dataset 更新后自动运行
with DAG("consumer_dag", schedule=[my_dataset], ...):
@task()
def read_orders():
# 读取数据
pass
这个特性让 Airflow 的 DAG 编排更接近事件驱动架构,减少了对轮询和时间耦合的依赖。
2.7 测试 DAG
DAG 测试是生产工程师必须掌握的技能,也是 Airflow 常被诟病"难以测试"的领域。
DAG 结构验证:用 python dag_file.py 或 airflow dags test 快速验证 DAG 能否被正常加载,无语法错误,无循环依赖。
单元测试:用 pytest 测试 DAG 的结构属性(任务数量、依赖关系是否正确),以及 Task 内部 Python 函数的逻辑(把业务逻辑从 Operator 中提取出来,单独测试)。
# test_my_dag.py
from airflow.models import DagBag
def test_dag_loaded():
dagbag = DagBag(dag_folder="dags/", include_examples=False)
dag = dagbag.get_dag(dag_id="my_etl_pipeline")
assert dag is not None
assert len(dag.tasks) == 3
def test_task_dependencies():
dagbag = DagBag(dag_folder="dags/")
dag = dagbag.get_dag("my_etl_pipeline")
extract = dag.get_task("extract")
transform = dag.get_task("transform")
assert transform.task_id in [t.task_id for t in extract.downstream_list]
集成测试:使用 pytest-docker 或 Testcontainers 启动真实的 Airflow 和数据库环境进行端到端测试。
第三层:高级知识体系(Advanced / Platform Engineering)
能力锚点:能设计和维护生产级 Airflow 集群,掌握性能调优、安全、可观测性、自定义扩展,能评估 Airflow 与其他编排工具的适用场景,能做架构决策。
3.1 生产部署架构
Helm Chart 部署(Kubernetes)
在生产环境,Airflow 几乎都部署在 Kubernetes 上,使用官方 Helm Chart。核心配置点:
values.yaml 中需要配置:Executor 类型(KubernetesExecutor 或 CeleryExecutor),GitSync(自动从 Git 仓库同步 DAG 文件,无需重启 Pod),Persistent Volume(用于日志存储),Secret 管理(对接 External Secrets Operator 或直接用 K8s Secrets)。
高可用配置
生产 Airflow 需要多个 Scheduler 副本(Airflow 2.0+ 支持多 Scheduler,使用数据库行锁防止重复调度),Webserver 多副本,数据库使用 PostgreSQL 的高可用集群(RDS, CloudSQL 等 Managed Service)。
Remote Logging
Task 日志必须写入远程存储(S3, GCS, Azure Blob)而不是本地磁盘,因为 Pod 是临时的,本地日志会随 Pod 销毁而消失。通过配置 [logging] 部分的 remote_logging = True 和对应的 Bucket 地址实现。
3.2 性能调优
Scheduler 性能
scheduler_heartbeat_sec:Scheduler 心跳间隔,默认 5 秒,决定调度延迟的下界。min_file_process_interval:扫描单个 DAG 文件的最小间隔,DAG 文件多时适当增大。dag_file_processor_timeout:DAG 文件解析超时,如果 DAG 文件中有慢速操作(如网络请求)会触发。parallelism:全局最大并发 Task 数。max_active_runs_per_dag:每个 DAG 最多同时有多少个 Run 在执行。max_active_tasks_per_dag:每个 DAG 最多同时执行多少个 Task。
避免常见性能陷阱
DAG 文件顶层不能有重型操作:不要在 DAG 文件的模块级别调用数据库、发 HTTP 请求、读大文件,因为 Scheduler 每隔几秒就会 import 一次你的 DAG 文件。把所有耗时操作放到 Task 函数内部。
连接池管理
如果大量 Task 同时连接同一个数据库,会耗尽数据库连接池。使用 Pool(Airflow 的槽位池机制)限制对同一资源的并发访问。在 UI 的 Admin → Pools 中定义池,然后在 Task 中通过 pool="my_db_pool" 引用,限制同时运行的相关 Task 数量。
3.3 自定义扩展
自定义 Operator
当内置 Operator 和 Provider 无法满足需求时,继承 BaseOperator 创建自定义 Operator。关键要实现 execute(self, context) 方法。
from airflow.models import BaseOperator
class MyCustomOperator(BaseOperator):
# template_fields 声明哪些参数支持 Jinja 模板
template_fields = ("sql",)
def __init__(self, sql: str, conn_id: str, **kwargs):
super().__init__(**kwargs)
self.sql = sql
self.conn_id = conn_id
def execute(self, context):
from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection(self.conn_id)
# 执行自定义逻辑
self.log.info(f"Executing: {self.sql}")
return result
自定义 Hook
Hook 是 Airflow 中连接外部系统的抽象层,每个 Provider 的 Operator 底层都依赖 Hook。自定义 Hook 继承 BaseHook,封装连接逻辑和低级 API 调用,供 Operator 使用。这实现了连接管理和业务逻辑的分离。
自定义 Sensor
继承 BaseSensorOperator,实现 poke(self, context) 方法,返回 True 表示条件满足,返回 False 表示继续等待。
Plugin 系统
Airflow 的 Plugin 系统允许你向 Webserver 注入自定义 UI 页面、自定义 Blueprint(Flask 路由)、自定义菜单项,以及注册自定义 Operator、Hook、Macro。将 plugin.py 放入 plugins/ 目录即可自动加载。
3.4 安全与权限管理
RBAC(基于角色的访问控制)
Airflow 内置 FAB(Flask-AppBuilder)的 RBAC 系统,预定义了 Admin、Op、User、Viewer、Public 五种角色,也支持自定义角色。可以配置 DAG 级别的访问控制,让不同团队只能看到和操作自己的 DAG。
SSO 集成
生产环境应集成企业 SSO,支持 OAuth 2.0(Google, GitHub, Azure AD),LDAP / Active Directory,SAML。配置在 webserver_config.py 中,通过 FAB 的 AUTH_TYPE 参数指定。
Secret Backend 集成
对接 HashiCorp Vault:在 airflow.cfg 中配置 secrets_backend = airflow.providers.hashicorp.secrets.vault.VaultBackend,并提供 Vault 地址和认证方式。Airflow 在需要 Connection 或 Variable 时会先查询 Vault,找不到再回退到数据库。
Fernet Key 加密
Airflow 用 Fernet 对称加密算法加密存储在数据库中的 Connection 密码和 Variable 值。fernet_key 是加密密钥,必须在生产环境中设置为强随机值并妥善保管,轮换时需要用 airflow rotate-fernet-key 命令迁移历史数据。
3.5 可观测性与监控
内置 Metrics(StatsD / OpenTelemetry)
Airflow 可以向 StatsD 服务发送运行指标,包括:DAG 处理时长、Task 成功/失败计数、Scheduler 心跳延迟、Executor 队列长度。这些指标可以被 Prometheus(通过 statsd_exporter)采集,然后在 Grafana 中可视化。
Airflow 2.7+ 开始原生支持 OpenTelemetry,可以直接发送 Traces 到 Jaeger、Tempo 等 Tracing 后端,实现分布式追踪。
关键告警指标
你应该监控并设置告警的指标:DAG Run 失败率,Scheduler 延迟(scheduler_heartbeat 时间),Task 积压量(排队但未执行的 Task 数),Zombie Task 数量(进程意外死亡后的孤儿任务)。
日志聚合
Task 日志应该被聚合到集中式日志系统(ELK Stack, Loki + Grafana)。结合结构化日志(JSON 格式)和统一的关联 ID(dag_id, run_id, task_id),可以快速跨 Task 追踪问题。
3.6 CI/CD for DAGs
将 DAG 作为代码,应该有完整的 CI/CD 流程。
标准 Pipeline 设计:PR → 自动化检查(pylint/flake8 语法检查,pytest 单元测试,airflow dags list-import-errors 确保无导入错误)→ Code Review → Merge → 自动部署到测试环境(GitSync 自动同步)→ 烟雾测试(用 airflow dags test 测试关键 DAG)→ 部署到生产。
GitSync 模式
KubernetesExecutor 的 Airflow 集群使用 GitSync sidecar 容器,持续从 Git 仓库拉取最新的 DAG 文件(默认每 30 秒一次)。这实现了 DAG 的零停机更新:你提交代码到 Git,几十秒后 Airflow 就会开始使用新的 DAG 版本。
蓝绿部署 DAG
生产环境中修改正在运行的 DAG 需要谨慎。安全的做法是:为新版本创建新的 dag_id,等所有历史 Run 在旧 DAG 上完成后,再将旧 DAG 暂停。避免直接修改正在生产运行的 DAG 的关键参数(如 schedule_interval)。
3.7 Airflow 2.x 重要新特性时间线
2.0(2020):重大版本,TaskFlow API,多 Scheduler HA,新 DAG 序列化,Stable REST API。
2.2(2021):Custom Timetables(自定义调度策略,如"每个工作日 9 点"),Task Group(UI 中分组折叠任务)。
2.3(2022):Dynamic Task Mapping(运行时动态生成任务),GridView 全面替代 TreeView。
2.4(2022):Dataset-driven Scheduling(数据集触发调度),这是方向性的改变。
2.6(2023):Task Execution Timeout 改进,DAG Versioning 早期支持,更好的 OpenTelemetry 集成。
2.7(2023):Auth Manager 解耦,为插件化权限系统铺路,更成熟的 OpenTelemetry Tracing。
2.10 / 3.0(2024+):Airflow 3.0 是架构性重构,Scheduler 和 DAG Processor 进一步解耦,边缘执行器(Edge Executor)支持,Task SDK 独立发布。
3.8 与其他编排工具的对比(架构决策视角)
作为平台工程师,你必须能够做工具选型决策,而不只是会用工具。
Airflow vs Prefect:Prefect 2.x 采用 Python-native 设计,DAG 定义更自然,无需 DAG 上下文管理器,原生支持动态工作流,Orion 引擎性能更好。Prefect 更适合团队小、工作流变化快的场景。Airflow 生态更成熟,Provider 更丰富,更适合大型企业和已有 Airflow 投资的组织。
Airflow vs Dagster:Dagster 以"软件定义资产(Software-Defined Assets)"为核心抽象,更注重数据资产的血缘关系(Lineage)和可观测性,内置类型系统和 IO 管理。如果你的重点是数据资产治理而非任务编排,Dagster 是更好的选择。这也正是你的 SoloLakehouse 选择 Dagster 的原因。
Airflow vs dbt(误解澄清):dbt 不是工作流编排器,它是数据转换工具,只管 SQL 模型的编译和执行。正确的组合是用 Airflow(或 Dagster)编排 dbt,即 Airflow 负责"什么时候跑 dbt",dbt 负责"怎么转换数据"。
Airflow vs Argo Workflows:Argo 是 Kubernetes-native 的工作流引擎,使用 YAML 定义工作流,更适合纯 K8s 环境下的通用任务编排(不只是数据工程),在 ML Pipeline(KubeFlow Pipelines)场景有广泛应用。如果工作流以容器任务为主,Argo 是有力竞争者。
3.9 Airflow 在现代数据栈中的定位
现代数据平台的编排层通常是这样一个图景:Airflow(或 Dagster/Prefect)负责外层编排,定义数据流从哪里来、经过什么处理、到哪里去;dbt 负责 Warehouse 内部的 SQL 转换;Spark/Flink 负责大规模数据处理;Great Expectations / Soda 负责数据质量检查;dlt / Fivetran / Airbyte 负责 EL(数据提取加载)。
Airflow 是把这些组件粘合在一起的"胶水层",它的价值在于提供统一的调度、监控、告警和可观测性视图。
附录:学习路径建议
第一个月(初级):在 Docker Compose 上安装 Airflow,写 5 个以上的真实 DAG(涉及文件处理、数据库 ETL、API 调用),彻底理解 DAG 生命周期和所有任务状态,掌握 TaskFlow API,能熟练使用 UI 调试问题。
第二至三个月(中级):在 Kubernetes 上用 Helm Chart 部署 Airflow,配置 GitSync 和远程日志,掌握 Dynamic Task Mapping,实现 CI/CD Pipeline,写一个涉及条件分支和 Sensor 的复杂 DAG,学习 Connection/Variable 的最佳实践。
第四至六个月(高级):深入 Executor 性能调优,实现 RBAC 和 SSO,对接 Secret Backend,配置 Prometheus + Grafana 监控,写自定义 Operator 和 Hook,研究 Airflow 源码的 Scheduler 实现,参与社区讨论或阅读 AIP(Airflow Improvement Proposal)。
持续精进(平台工程师视角):把 Airflow 视为一个需要 SLA 的平台产品,而不只是一个工具。思考如何为团队提供标准化的 DAG 模板,如何设计 DAG 的测试框架,如何建立 DAG 变更的 Review 流程,如何量化和优化调度效率。
文档版本:2024 年 Q4 | 覆盖 Airflow 2.x(重点 2.4-2.9)及 3.0 趋势