彻底搞懂 Apache Airflow、Dagster 和 Prefect

——从任务调度到现代数据编排的完整演化路线图

学习目标:读完本文后,你能够(1)从第一性原理解释为什么这三个工具存在;(2)在白板前画出每个框架的核心架构图;(3)写出生产级别的真实代码;(4)在面试中从 Platform Owner 视角进行深度技术选型对比;(5)清晰解释你在 SoloLakehouse 中选择 Dagster 的战略理由。

目录

  1. 为什么需要工作流编排?—— 从第一性原理出发
  2. Apache Airflow —— 事实标准与它的重量
  3. Dagster —— 以资产为中心的现代编排
  4. Prefect —— 以流程为中心的轻量编排
  5. 三者核心架构横向对比
  6. 选型指南:什么场景用什么工具
  7. 真实代码示例:同一个 Pipeline 的三种写法
  8. 生产环境最佳实践
  9. 与 Lakehouse 架构的集成模式
  10. 面试高频问题与标准答案
  11. Platform Owner 思维:战略总结

1. 为什么需要工作流编排?

在深入三个工具之前,我们必须先理解它们共同解决的根本问题。这是工程师思维和架构师思维的分野:工程师关心"这个工具怎么用",架构师关心"没有这个工具,世界会变成什么样"

1.1 一个没有编排系统的真实噩梦

想象你负责一个数据平台,每天需要执行以下任务:

首先,在凌晨 1:00,从三个数据源(Kafka、PostgreSQL、第三方 API)抽取数据,写入数据湖的 Bronze 层。其次,等 Bronze 层写入完成后,运行 Spark 清洗作业,生成 Silver 层数据。第三,等 Silver 层就绪后,并行运行三个聚合作业(财务汇总、用户行为分析、风险指标计算),分别写入 Gold 层。第四,等所有 Gold 层数据就绪后,触发报表刷新,并发邮件通知业务团队。最后,每个任务失败后需要自动重试三次,第三次依然失败则发 Slack 告警,同时跳过该任务继续执行下游不依赖它的其他任务。

如果没有编排系统,你面对的是:写若干个 Cron Job,用文件或数据库标志来协调任务依赖,手工管理重试逻辑,手工监控每个任务的状态,手工处理部分失败的场景……这很快就会变成一个无法维护的"定时任务地狱"。

数据编排系统(Workflow Orchestrator)就是为了系统性地解决这个问题。它的核心价值在于提供一个统一的框架,让你可以声明性地定义任务之间的依赖关系(DAG,有向无环图),集中管理任务的调度、执行、监控、重试和告警,并提供一个可视化界面,让整个团队都能看清楚数据管道的运行状态。

1.2 编排系统需要解决的七个核心问题

理解了为什么需要编排,我们来看这类系统普遍需要解决的七个问题,这七个问题也是我们后续对比三个工具的评估维度:

依赖管理(Dependency Management):任务 B 必须等任务 A 成功后才能运行,这个关系如何定义?当有几十个任务时,依赖关系如何可视化?

调度(Scheduling):支持 Cron 表达式(每天凌晨 1 点)、事件触发(新文件到达时)、手动触发三种模式。

重试与失败处理(Retry & Failure Handling):失败后自动重试几次?重试间隔多长?部分失败后,下游任务如何处理?如何从失败点重新运行而不是从头开始?

参数化与动态化(Parameterization & Dynamic Pipelines):能否根据运行时参数动态生成任务图?比如"为每个国家生成一个处理任务",而国家列表在运行时才确定。

可观测性(Observability):如何知道现在哪个任务在运行、哪个失败了、每个任务跑了多久?历史运行记录如何查询?

数据血缘与版本控制(Data Lineage & Versioning):如何追踪"某张表的数据来自哪个任务、基于哪个版本的代码生成的"?

部署与扩展(Deployment & Scaling):如何在生产环境部署?如何支持数百个并发任务的执行?


2. Apache Airflow

2.1 Airflow 的诞生背景与核心哲学

Apache Airflow 于 2014 年由 Maxime Beauchemin 在 Airbnb 创建,2016 年进入 Apache 孵化器,2019 年成为 Apache 顶级项目。截至今天,它是全球使用最广泛的数据编排系统,也是大多数企业数据平台的事实标准。

Airflow 的核心哲学可以用一句话概括:"Configuration as Code"(配置即代码)。在 Airflow 诞生之前,工作流通常用 XML 文件或 GUI 界面配置,难以版本控制,也难以复用。Airflow 的革命性在于,它让你用 Python 代码来定义工作流,这意味着你可以用 Git 来版本控制你的 Pipeline,用 Python 的所有编程能力(循环、条件、函数)来动态生成任务图。

但 Airflow 也有一个重要的哲学取向,它是以任务(Task)为中心的:你定义的是"做什么操作"(运行 SQL、执行 Spark 作业、调用 API),而不是"操作什么数据"。这个取向在后面与 Dagster 的对比中会变得非常重要。

2.2 Airflow 的核心架构

Airflow 的架构由以下几个核心组件构成:

用户 / 浏览器
      │
      │ HTTP
      ▼
┌─────────────────────────────────────────────────┐
│                  Web Server                      │
│  (Flask 应用,提供 UI 和 REST API)              │
└────────────────────┬────────────────────────────┘
                     │ 读取元数据
                     ▼
┌─────────────────────────────────────────────────┐
│              Metadata Database                   │
│  (PostgreSQL / MySQL)                          │
│  存储:DAG 定义、任务实例状态、运行历史、变量、连接 │
└────────────────────┬────────────────────────────┘
                     │ 读写状态
          ┌──────────┴──────────┐
          ▼                     ▼
┌─────────────────┐  ┌──────────────────────────┐
│    Scheduler    │  │         Worker(s)         │
│                 │  │                          │
│ 职责:          │  │ 职责:                   │
│ - 解析 DAG 文件 │  │ - 实际执行任务           │
│ - 决定哪些任务  │  │ - 支持多种执行器:       │
│   何时可以运行  │◄─►│   LocalExecutor          │
│ - 将就绪任务推  │  │   CeleryExecutor         │
│   入执行队列    │  │   KubernetesExecutor     │
└─────────────────┘  └──────────────────────────┘
          │
          │ 读取 DAG 文件
          ▼
┌─────────────────────────────────────────────────┐
│                   DAG Files                      │
│  (Python 文件,存储在 dags/ 目录)              │
│  每隔 N 秒,Scheduler 扫描此目录发现新 DAG       │
└─────────────────────────────────────────────────┘

这个架构中,Scheduler 和 Worker 是两个独立进程,它们通过 Metadata Database 来协调状态,这个设计有重要的含义:数据库成了整个系统的核心"神经中枢",任何任务状态的变更都必须写入数据库,这在大规模场景下会成为性能瓶颈。

**执行器(Executor)**是 Airflow 的一个关键概念,决定了任务如何被实际执行:

LocalExecutor 在 Scheduler 同一进程内的多个线程中运行任务,适合小规模、单机部署。CeleryExecutor 将任务分发给 Celery Worker 集群,支持水平扩展,是生产环境的传统选择,但需要额外维护 Redis 或 RabbitMQ 作为消息队列。KubernetesExecutor 为每个任务启动一个独立的 Kubernetes Pod,任务完成后 Pod 销毁,是最现代、最弹性的选择,也是云原生环境的推荐方案。

2.3 DAG:Airflow 的核心抽象

在 Airflow 中,一切都围绕 DAG(Directed Acyclic Graph,有向无环图) 展开。你在 Python 文件中定义 DAG,Scheduler 定期扫描 dags/ 目录来发现它们。

# dags/ecb_rates_pipeline.py
# 这是一个完整的 Airflow DAG 示例,展示了核心概念

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import requests
import logging

# ─────────────────────────────────────────────────────────────────────────────
# 1. DAG 级别的默认参数
#    这些参数会被 DAG 中所有任务继承,除非任务单独覆盖
# ─────────────────────────────────────────────────────────────────────────────
default_args = {
    "owner": "data-platform-team",
    "depends_on_past": False,       # 当天运行不依赖昨天是否成功
    "start_date": days_ago(1),      # DAG 从什么时候开始调度
    "email": ["alerts@company.com"],
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 3,                   # 失败后重试 3 次
    "retry_delay": timedelta(minutes=5),  # 每次重试间隔 5 分钟
    "execution_timeout": timedelta(hours=2),  # 任务超时时间
}

# ─────────────────────────────────────────────────────────────────────────────
# 2. 定义 DAG 本身
#    DAG 是一个上下文管理器,在 with 块内定义的任务自动属于这个 DAG
# ─────────────────────────────────────────────────────────────────────────────
with DAG(
    dag_id="ecb_rates_pipeline",           # DAG 的唯一 ID
    description="每日从 ECB 抓取利率数据并写入数据湖",
    default_args=default_args,
    schedule_interval="0 1 * * *",         # 每天凌晨 1:00 运行(Cron 表达式)
    catchup=False,                         # 不补跑历史(若 start_date 是过去,不追溯)
    tags=["ecb", "bronze", "finance"],     # 便于在 UI 中过滤
    max_active_runs=1,                     # 同时只允许一个活跃的 DAG Run(防止并发问题)
) as dag:

    # ─────────────────────────────────────────────────────────────────────────
    # 3. 定义任务
    #    每个任务是一个 Operator 的实例
    # ─────────────────────────────────────────────────────────────────────────

    # 开始哨兵节点(纯虚任务,无操作,只是一个视觉上的起点)
    start = EmptyOperator(task_id="start")

    def fetch_ecb_deposit_rate(**context):
        """
        从 ECB API 抓取存款利率数据。
        context 参数包含运行时信息,如 execution_date、ds(日期字符串)等。
        """
        execution_date = context["ds"]  # YYYY-MM-DD 格式的执行日期
        logging.info(f"正在抓取 {execution_date} 的 ECB 存款利率...")

        # 模拟 API 调用
        response = requests.get(
            "https://data-api.ecb.europa.eu/service/data/FM/B.U2.EUR.RT0.BB.B.1.30.NA",
            params={"startPeriod": execution_date, "endPeriod": execution_date},
            timeout=30
        )
        response.raise_for_status()
        data = response.json()

        # 使用 XCom 将数据传递给下游任务
        # 注意:XCom 适合传递小数据(元数据、状态),不适合传递大数据集
        return {"rate": data["observations"][0]["value"], "date": execution_date}

    fetch_deposit_rate = PythonOperator(
        task_id="fetch_deposit_rate",
        python_callable=fetch_ecb_deposit_rate,
        provide_context=True,       # 将 context 注入到函数中
    )

    def fetch_euribor_rate(**context):
        """抓取 Euribor 3M 利率,与 fetch_deposit_rate 并行运行"""
        execution_date = context["ds"]
        logging.info(f"正在抓取 {execution_date} 的 Euribor 3M 利率...")
        # ... 实际实现省略
        return {"rate": 3.89, "date": execution_date}

    fetch_euribor = PythonOperator(
        task_id="fetch_euribor",
        python_callable=fetch_euribor_rate,
        provide_context=True,
    )

    def write_to_bronze(**context):
        """
        将两个并行任务的结果写入 Bronze 层。
        使用 XCom pull 从上游任务获取数据。
        """
        # XCom:跨任务通信机制
        # task_instance(简称 ti)是当前任务运行实例的对象
        ti = context["task_instance"]
        deposit_rate = ti.xcom_pull(task_ids="fetch_deposit_rate")
        euribor = ti.xcom_pull(task_ids="fetch_euribor")

        logging.info(f"写入 Bronze 层: deposit_rate={deposit_rate}, euribor={euribor}")
        # ... 实际写入 S3/MinIO 的代码
        return "bronze_write_success"

    write_bronze = PythonOperator(
        task_id="write_to_bronze",
        python_callable=write_to_bronze,
        provide_context=True,
    )

    # 运行 SQL 转换(Silver Layer)
    # PostgresOperator 直接连接数据库执行 SQL,无需 Python 包装
    transform_silver = PostgresOperator(
        task_id="transform_to_silver",
        postgres_conn_id="data_warehouse",  # 在 Airflow UI 的 Connections 中配置
        sql="""
            INSERT INTO silver.ecb_rates_clean (rate_date, rate_type, value, ingested_at)
            SELECT 
                '{{ ds }}'::date AS rate_date,       -- {{ ds }} 是 Airflow 模板变量
                'ECB_DEPOSIT' AS rate_type,
                (payload->>'rate')::numeric AS value,
                NOW() AS ingested_at
            FROM bronze.raw_ecb_rates
            WHERE ingestion_date = '{{ ds }}'
            ON CONFLICT (rate_date, rate_type) DO UPDATE SET value = EXCLUDED.value;
        """,
    )

    # 失败告警任务(只在失败时触发,通过 trigger_rule 控制)
    notify_failure = SlackWebhookOperator(
        task_id="notify_failure",
        http_conn_id="slack_webhook",
        message="⚠️ ECB 数据管道失败!日期: {{ ds }},请查看 Airflow UI。",
        trigger_rule="one_failed",     # 上游任何一个任务失败时触发
    )

    end = EmptyOperator(
        task_id="end",
        trigger_rule="none_failed_min_one_success",  # 上游全部成功才触发
    )

    # ─────────────────────────────────────────────────────────────────────────
    # 4. 定义任务依赖关系
    #    >> 操作符表示"左边完成后运行右边"
    #    [a, b] >> c 表示 a 和 b 都完成后才运行 c
    # ─────────────────────────────────────────────────────────────────────────
    start >> [fetch_deposit_rate, fetch_euribor] >> write_bronze >> transform_silver
    transform_silver >> [end, notify_failure]

2.4 XCom:任务间的数据传递

XCom(Cross-Communication)是 Airflow 中任务间传递数据的机制。每个任务可以通过 return 语句"push"一个值,下游任务通过 ti.xcom_pull(task_ids="upstream_task") 来"pull"这个值。XCom 的值存储在 Airflow 的 Metadata Database 中,这决定了它只适合传递小数据(如文件路径、状态信息、统计数字),绝对不能传递 DataFrame 或大型数据集,否则会把数据库撑爆。

这是 Airflow 架构的一个根本性限制:任务之间的数据流是不一等公民,Airflow 关注的是任务的执行,而不是数据的流转。这也是 Dagster 后来崛起的重要原因之一。

2.5 Airflow 的 Hooks 和 Connections

Airflow 通过 Connections 管理外部系统的凭据(数据库密码、API Key、S3 访问密钥等),存储在 Metadata Database 中(加密存储),可以在 UI 中配置,也可以通过环境变量注入。Hooks 是对某个外部系统连接的高级抽象,屏蔽了底层的连接管理细节:

from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.hooks.postgres_hook import PostgresHook

def upload_to_s3(**context):
    # S3Hook 自动从 Connections 中读取认证信息
    s3_hook = S3Hook(aws_conn_id="aws_s3")
    s3_hook.load_string(
        string_data='{"key": "value"}',
        key=f"bronze/ecb_rates/{context['ds']}/data.json",
        bucket_name="my-data-lake"
    )

def query_postgres(**context):
    pg_hook = PostgresHook(postgres_conn_id="data_warehouse")
    # get_pandas_df 直接返回 Pandas DataFrame
    df = pg_hook.get_pandas_df("SELECT * FROM silver.ecb_rates WHERE rate_date = %(date)s",
                               parameters={"date": context["ds"]})
    return df.to_dict()

2.6 TaskFlow API:现代 Airflow 的写法

Airflow 2.0 引入了 TaskFlow API,用装饰器语法大幅简化了 Python 任务的定义,自动处理 XCom 的 push/pull,让代码更像普通 Python 函数:

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["finance"]
)
def ecb_rates_taskflow():
    """使用 TaskFlow API 的现代写法,代码更简洁,意图更清晰"""

    @task()
    def fetch_deposit_rate(ds: str) -> dict:
        # 直接 return,Airflow 自动 push 到 XCom
        return {"rate": 4.0, "date": ds}

    @task()
    def fetch_euribor(ds: str) -> dict:
        return {"rate": 3.89, "date": ds}

    @task()
    def write_to_bronze(deposit_rate: dict, euribor: dict) -> str:
        # 参数直接接收上游任务的返回值,Airflow 自动处理 XCom pull
        # 任务依赖关系也从函数参数自动推断,无需手写 >> 操作符
        print(f"Writing: {deposit_rate}, {euribor}")
        return "s3://bucket/bronze/ecb_rates/2024-01-01/"

    @task()
    def transform_silver(bronze_path: str) -> None:
        print(f"Transforming data from {bronze_path}")

    # 函数调用即定义了依赖关系和数据流
    deposit_rate = fetch_deposit_rate()
    euribor = fetch_euribor()
    bronze_path = write_to_bronze(deposit_rate, euribor)
    transform_silver(bronze_path)

# 实例化 DAG
ecb_rates_pipeline = ecb_rates_taskflow()

2.7 动态 DAG(Dynamic Task Mapping)

Airflow 2.3 引入了 Dynamic Task Mapping,这是 Airflow 历史上最重要的功能之一,允许在运行时根据数据动态生成任务:

from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule_interval="@daily", start_date=datetime(2024, 1, 1), catchup=False)
def dynamic_country_pipeline():

    @task()
    def get_countries() -> list:
        """返回需要处理的国家列表(运行时才确定)"""
        return ["DE", "FR", "IT", "ES", "NL"]

    @task()
    def process_country(country_code: str) -> dict:
        """为每个国家生成一个独立任务,并行执行"""
        print(f"Processing data for {country_code}")
        # 实际处理逻辑
        return {"country": country_code, "status": "success", "records": 1000}

    @task()
    def aggregate_results(results: list) -> None:
        """收集所有国家的处理结果"""
        total = sum(r["records"] for r in results)
        print(f"Total records processed: {total}")

    countries = get_countries()
    # .expand() 是 Dynamic Task Mapping 的关键方法
    # 它会为 countries 列表中的每个元素生成一个独立的 process_country 任务
    results = process_country.expand(country_code=countries)
    aggregate_results(results)

dynamic_pipeline = dynamic_country_pipeline()

2.8 Airflow 的核心痛点

理解了 Airflow 的强大,我们也必须诚实地面对它的局限,这些痛点正是 Dagster 和 Prefect 崛起的根本原因:

第一个痛点:数据资产(Data Assets)是隐形的。Airflow 告诉你任务成功了,但它不知道任务产生了什么数据、数据在哪里、数据的 Schema 是什么。数据血缘需要借助第三方工具(如 OpenLineage、Marquez)额外构建,而不是内建的。

第二个痛点:本地开发和测试极其困难。测试一个 Airflow DAG,通常需要启动完整的 Airflow 服务(Web Server + Scheduler + Database)。这使得 TDD(测试驱动开发)几乎不可能,也让 CI/CD 流程更复杂。

第三个痛点:Scheduler 的性能瓶颈。Scheduler 需要频繁扫描 dags/ 目录解析所有 Python 文件(包括 import),在有数百个 DAG 时,这会产生严重的性能问题,导致任务调度延迟。

第四个痛点:运行环境的隔离性差。所有 Task 默认在同一个环境中运行,如果两个任务需要不同版本的 Python 库,会产生冲突。KubernetesExecutor 可以部分解决这个问题,但增加了大量配置复杂度。

第五个痛点:全局状态(Variables)和连接管理。Airflow 的 Variables 和 Connections 存储在数据库中,在代码里引用一个不存在的 Connection ID,只有在任务实际运行时才会报错,而不是在 DAG 定义时就被检测到。


3. Dagster

3.1 Dagster 的诞生背景与核心哲学

Dagster 由 Nick Schrock(前 Facebook GraphQL 创始人)创建,2019 年开源。Dagster 的诞生本身就是对 Airflow 的一次深度反思。

Dagster 的核心哲学与 Airflow 有一个根本性的不同:Airflow 以任务(Task)为中心,Dagster 以资产(Asset)为中心

什么意思?在 Airflow 中,你定义的是"运行一个 Spark 作业"这个动作。在 Dagster 中,你定义的是"这个表(Asset)的计算逻辑是什么",Dagster 自动处理它的依赖关系、何时需要重新计算、计算结果存储在哪里。

这个哲学差异带来的是完全不同的编程模型和用户体验。以一个比喻来说:Airflow 是"指挥交通的警察"(你定义任务执行的顺序),Dagster 是"建筑规划师"(你定义什么建筑存在、建筑之间的关系,调度系统自动决定施工顺序)。

另一个 Dagster 的核心设计原则是 "本地开发友好":所有 Dagster 代码都是普通的 Python 函数,可以不启动任何服务直接测试,CI/CD 测试成本极低。

3.2 Dagster 的核心概念体系

Dagster 有一套独特的概念体系,理解这些概念是学习 Dagster 的关键:

**Asset(资产)**是 Dagster 最核心的概念。一个 Asset 代表一个持久化的数据对象,可以是 S3 文件、数据库表、MLflow 模型、仪表板。用 @asset 装饰器定义资产。

**Op(操作)**是 Dagster 中最底层的计算单元,类似于 Airflow 的 Task,但它是无状态的纯函数。Op 是构成 Asset 和 Graph 的基础。

**Graph(图)**是多个 Op 的有向无环图,定义了 Op 之间的数据流。

**Job(作业)**是一个可配置的、可执行的 Graph 实例,包含执行环境的配置(使用哪个 Resource)。

**Resource(资源)**是 Dagster 管理外部依赖(数据库连接、S3 客户端、Spark 会话)的方式。Resource 以依赖注入的方式提供给 Op/Asset,使得本地开发可以注入 Mock Resource,生产环境注入真实 Resource,无需修改业务逻辑代码。

**IO Manager(输入输出管理器)**是 Dagster 最独特的设计之一。IO Manager 定义了如何将 Asset 的计算结果持久化(存到 S3、写入数据库)以及如何从持久化存储中读取数据。业务代码只写计算逻辑,IO Manager 负责所有 I/O 细节。

**Sensor(传感器)**是事件驱动触发的机制(如新文件上传时触发)。

**Schedule(调度)**是基于时间的触发机制(类似 Cron)。

**Partition(分区)**是 Dagster 管理时间分区数据的内建机制,与 Asset 深度集成。

3.3 Dagster 的架构

用户 / 浏览器
      │
      │ HTTP
      ▼
┌─────────────────────────────────────────────────┐
│                  Dagster UI                      │
│  (称为 Dagit,基于 React,内建血缘图可视化)    │
└────────────────────┬────────────────────────────┘
                     │
                     ▼
┌─────────────────────────────────────────────────┐
│              Dagster Webserver                   │
│  提供 GraphQL API(是的,Dagster 用 GraphQL)    │
└────────────────────┬────────────────────────────┘
                     │
                     ▼
┌─────────────────────────────────────────────────┐
│              Dagster Daemon                      │
│  职责:                                         │
│  - 运行 Schedules(定时触发)                    │
│  - 运行 Sensors(事件触发)                      │
│  - 背景资产物化队列管理                         │
└────────────────────┬────────────────────────────┘
                     │
          ┌──────────┴──────────┐
          ▼                     ▼
┌─────────────────┐  ┌──────────────────────────┐
│   Storage       │  │   Code Location(s)        │
│  (元数据存储)  │  │                          │
│  - Run History  │  │ 一个或多个 Python 进程/  │
│  - Asset 事件   │  │ 容器,包含你的业务代码。  │
│  - 日志         │  │ 代码与 Dagster 服务分离   │
│  支持:         │  │ 运行,热更新代码不需要    │
│  SQLite(开发) │  │ 重启 Dagster 服务。       │
│  PostgreSQL     │  └──────────────────────────┘
│  S3(日志)     │
└─────────────────┘

Dagster 架构的一个关键设计是代码位置(Code Location)的分离。你的业务代码(Asset 定义、Op、Resource)运行在独立的 Python 进程中(甚至独立的 Docker 容器中),与 Dagster 的基础设施服务(Daemon、Webserver)完全分离。这意味着:你更新业务代码时,只需要重启 Code Location,不需要重启 Dagster 服务;不同的代码库可以有不同的依赖环境,互不干扰;Code Location 可以部署在不同的机器上。

3.4 Asset-Native 编程模型详解

这是 Dagster 与其他编排工具最本质的不同,值得深入理解:

# dagster_assets_demo.py
from dagster import (
    asset, AssetIn, AssetOut, Output, multi_asset,
    IOManager, io_manager,
    resource, ResourceDefinition,
    define_asset_job, AssetSelection,
    ScheduleDefinition, Definitions,
    EnvVar, Config
)
from dagster_aws.s3 import S3Resource
import pandas as pd
import requests
from typing import Tuple

# ─────────────────────────────────────────────────────────────────────────────
# 1. 定义 Resource:外部依赖的注入点
#    Resource 让本地开发可以 Mock 所有外部依赖
# ─────────────────────────────────────────────────────────────────────────────

@resource
def ecb_api_resource(context):
    """封装对 ECB API 的访问,生产环境使用真实 URL"""
    return {"base_url": "https://data-api.ecb.europa.eu/service/data"}

@resource
def mock_ecb_api_resource(context):
    """测试环境使用的 Mock Resource,返回固定测试数据"""
    return {"base_url": "http://mock-server:8080"}


# ─────────────────────────────────────────────────────────────────────────────
# 2. 定义 IO Manager:Asset 的持久化策略
#    业务代码只负责计算,IO Manager 负责所有 I/O
# ─────────────────────────────────────────────────────────────────────────────

class ParquetS3IOManager(IOManager):
    """将 Asset 的 DataFrame 结果自动存为 S3 上的 Parquet 文件"""

    def __init__(self, s3_client, bucket: str, prefix: str):
        self._s3 = s3_client
        self._bucket = bucket
        self._prefix = prefix

    def handle_output(self, context, obj: pd.DataFrame):
        """Asset 计算完成后,自动调用此方法持久化结果"""
        # context.asset_key 是资产的路径,如 AssetKey(["bronze", "ecb_rates"])
        key = f"{self._prefix}/{'/'.join(context.asset_key.path)}.parquet"
        context.log.info(f"Writing {len(obj)} rows to s3://{self._bucket}/{key}")
        # 实际写入 S3
        parquet_buffer = obj.to_parquet(index=False)
        self._s3.put_object(Bucket=self._bucket, Key=key, Body=parquet_buffer)

    def load_input(self, context) -> pd.DataFrame:
        """下游 Asset 需要读取上游 Asset 时,自动调用此方法"""
        key = f"{self._prefix}/{'/'.join(context.asset_key.path)}.parquet"
        response = self._s3.get_object(Bucket=self._bucket, Key=key)
        return pd.read_parquet(response["Body"])

@io_manager(required_resource_keys={"s3"})
def parquet_s3_io_manager(context) -> ParquetS3IOManager:
    return ParquetS3IOManager(
        s3_client=context.resources.s3,
        bucket="sololakehouse",
        prefix="assets"
    )


# ─────────────────────────────────────────────────────────────────────────────
# 3. 定义 Assets:这是 Dagster 的核心
#    每个 @asset 声明一个数据资产及其计算逻辑
# ─────────────────────────────────────────────────────────────────────────────

@asset(
    group_name="bronze",              # 用于在 UI 中分组显示
    description="从 ECB API 抓取的原始存款利率数据",
    required_resource_keys={"ecb_api"},  # 声明此 Asset 需要哪些 Resource
    tags={"layer": "bronze", "source": "ecb"},
    io_manager_key="parquet_s3_io_manager",  # 指定使用哪个 IO Manager 持久化
)
def ecb_deposit_rate_raw(context) -> pd.DataFrame:
    """
    Bronze 层资产:从 ECB API 抓取原始数据。
    函数名 ecb_deposit_rate_raw 就是这个资产的名字。
    函数返回值会由 IO Manager 自动持久化。
    """
    base_url = context.resources.ecb_api["base_url"]
    context.log.info("Fetching ECB deposit rate...")
    # response = requests.get(f"{base_url}/FM/B.U2.EUR.RT0.BB.B.1.30.NA")
    # 模拟返回数据
    return pd.DataFrame([
        {"date": "2024-01-01", "rate_type": "ECB_DEPOSIT", "value": 4.0},
        {"date": "2024-01-02", "rate_type": "ECB_DEPOSIT", "value": 4.0},
    ])


@asset(
    group_name="bronze",
    description="从 ECB API 抓取的原始 Euribor 3M 数据",
    required_resource_keys={"ecb_api"},
    io_manager_key="parquet_s3_io_manager",
)
def euribor_3m_raw(context) -> pd.DataFrame:
    context.log.info("Fetching Euribor 3M...")
    return pd.DataFrame([
        {"date": "2024-01-01", "rate_type": "EURIBOR_3M", "value": 3.89},
        {"date": "2024-01-02", "rate_type": "EURIBOR_3M", "value": 3.87},
    ])


@asset(
    group_name="silver",
    description="合并清洗后的利率数据(Silver 层)",
    # 声明上游依赖:此资产依赖上面两个 Bronze 资产
    # Dagster 自动推断执行顺序,并用 IO Manager 自动加载上游数据
    ins={
        "deposit_rate": AssetIn("ecb_deposit_rate_raw"),
        "euribor": AssetIn("euribor_3m_raw"),
    },
    io_manager_key="parquet_s3_io_manager",
)
def rates_silver(context, deposit_rate: pd.DataFrame, euribor: pd.DataFrame) -> pd.DataFrame:
    """
    Silver 层资产:合并两个数据源,标准化 Schema。
    注意:deposit_rate 和 euribor 参数由 IO Manager 自动从 S3 加载,
    业务代码根本不知道数据存在哪里,也不用写任何 I/O 代码。
    """
    context.log.info(f"Merging {len(deposit_rate)} + {len(euribor)} rows")
    combined = pd.concat([deposit_rate, euribor], ignore_index=True)
    combined["date"] = pd.to_datetime(combined["date"])
    combined["ingested_at"] = pd.Timestamp.now()
    return combined.sort_values(["date", "rate_type"])


@asset(
    group_name="gold",
    description="每日宏观利率状态宽表(Gold 层)",
    ins={"rates": AssetIn("rates_silver")},
    io_manager_key="parquet_s3_io_manager",
)
def macro_state_daily(context, rates: pd.DataFrame) -> pd.DataFrame:
    """Gold 层:计算每日宏观状态标签,供 BI 直接使用"""
    pivot = rates.pivot_table(
        index="date", columns="rate_type", values="value", aggfunc="last"
    ).reset_index()
    pivot.columns.name = None

    # 宏观状态分类逻辑
    def classify_regime(row):
        ecb = row.get("ECB_DEPOSIT", float("nan"))
        euribor = row.get("EURIBOR_3M", float("nan"))
        if pd.isna(ecb) or pd.isna(euribor):
            return "UNKNOWN"
        if ecb > 3.0:
            return "TIGHT_MONETARY"
        elif ecb < 0.5:
            return "LOOSE_MONETARY"
        return "NEUTRAL"

    pivot["macro_regime"] = pivot.apply(classify_regime, axis=1)
    context.log.info(f"Generated {len(pivot)} daily macro state records")
    return pivot


# ─────────────────────────────────────────────────────────────────────────────
# 4. 定义 Job(运行什么、如何运行)和 Schedule(何时运行)
# ─────────────────────────────────────────────────────────────────────────────

daily_rates_job = define_asset_job(
    name="daily_rates_pipeline",
    selection=AssetSelection.groups("bronze", "silver", "gold"),
    description="每日运行完整的利率数据管道",
)

daily_schedule = ScheduleDefinition(
    job=daily_rates_job,
    cron_schedule="0 1 * * *",
    execution_timezone="Europe/Berlin",
)


# ─────────────────────────────────────────────────────────────────────────────
# 5. 汇总定义(Definitions 是 Dagster 代码库的入口点)
# ─────────────────────────────────────────────────────────────────────────────

defs = Definitions(
    assets=[ecb_deposit_rate_raw, euribor_3m_raw, rates_silver, macro_state_daily],
    resources={
        "ecb_api": ecb_api_resource,
        "s3": S3Resource(region_name="eu-west-1"),
        "parquet_s3_io_manager": parquet_s3_io_manager,
    },
    jobs=[daily_rates_job],
    schedules=[daily_schedule],
)

3.5 Dagster 的分区(Partitions)

Dagster 的分区是其最强大的功能之一,使得增量处理变得极其自然:

from dagster import (
    asset, DailyPartitionsDefinition, WeeklyPartitionsDefinition,
    AssetIn, PartitionKeyRange
)
from datetime import datetime

# 定义日分区:从 2024-01-01 开始,每天一个分区
daily_partitions = DailyPartitionsDefinition(start_date="2024-01-01")

@asset(
    partitions_def=daily_partitions,    # 声明此 Asset 按天分区
    group_name="bronze",
    description="按天分区的 ECB 数据,只处理当天数据"
)
def ecb_deposit_rate_partitioned(context) -> pd.DataFrame:
    # context.partition_key 是当前运行的分区日期,如 "2024-01-15"
    partition_date = context.partition_key
    context.log.info(f"Processing partition: {partition_date}")

    # 只抓取这一天的数据
    # response = requests.get(f"{base_url}?startPeriod={partition_date}&endPeriod={partition_date}")
    return pd.DataFrame([{"date": partition_date, "rate": 4.0}])


@asset(
    partitions_def=daily_partitions,
    ins={"raw": AssetIn("ecb_deposit_rate_partitioned")},
    description="已清洗的分区数据"
)
def ecb_deposit_rate_clean(context, raw: pd.DataFrame) -> pd.DataFrame:
    # Dagster 自动确保 raw 和 clean 使用同一个分区键
    partition_date = context.partition_key
    clean = raw.dropna()
    clean["processed_at"] = pd.Timestamp.now()
    return clean

# 在 UI 或代码中触发特定分区或分区范围的物化
# dagster asset materialize --select ecb_deposit_rate_partitioned --partition 2024-01-15
# 也可以一次性回填历史数据(Backfill):
# dagster asset backfill --select ecb_deposit_rate_partitioned --from 2024-01-01 --to 2024-01-31

3.6 Dagster 的测试体验

这是 Dagster 相比 Airflow 最显著的优势之一,你可以像测试普通 Python 代码一样测试 Asset:

# tests/test_assets.py
from dagster import materialize, build_asset_context
from unittest.mock import MagicMock
import pandas as pd
import pytest

# 导入要测试的 Asset 和 Resource
from my_pipeline.assets import ecb_deposit_rate_raw, rates_silver

def test_ecb_deposit_rate_raw():
    """测试 Bronze Asset 的逻辑,使用 Mock Resource"""
    mock_ecb_api = {"base_url": "http://mock"}

    # materialize() 执行 Asset 并返回结果,无需启动任何 Dagster 服务
    result = materialize(
        [ecb_deposit_rate_raw],
        resources={"ecb_api": mock_ecb_api}
    )
    assert result.success
    # 获取 Asset 的物化值(通过 IO Manager 读取)
    output = result.output_for_node("ecb_deposit_rate_raw")
    assert isinstance(output, pd.DataFrame)
    assert "rate_type" in output.columns
    assert len(output) > 0

def test_rates_silver_merges_correctly():
    """测试 Silver Asset 的合并逻辑"""
    mock_deposit = pd.DataFrame([{"date": "2024-01-01", "rate_type": "ECB_DEPOSIT", "value": 4.0}])
    mock_euribor = pd.DataFrame([{"date": "2024-01-01", "rate_type": "EURIBOR_3M", "value": 3.89}])

    # 直接调用 Asset 函数(因为它就是普通 Python 函数)
    context = build_asset_context()
    result = rates_silver(context, deposit_rate=mock_deposit, euribor=mock_euribor)

    assert len(result) == 2
    assert set(result["rate_type"].unique()) == {"ECB_DEPOSIT", "EURIBOR_3M"}
    assert "ingested_at" in result.columns

4. Prefect

4.1 Prefect 的诞生背景与核心哲学

Prefect 于 2018 年由 Jeremiah Lowin 创建并开源(Prefect 1.x),2022 年完全重写并推出 Prefect 2.x(也叫 Prefect Orion),2023 年发布 Prefect 3.x。Prefect 的诞生目标是:做一个比 Airflow 更现代、更易上手的编排工具,但比 Dagster 更轻量、部署更简单

Prefect 的核心哲学是 "Pythonic First"(Python 优先)和 "Negative Engineering"(负面工程)的概念。所谓负面工程,是指数据工程师大量时间花在处理失败、重试、日志、监控这些"防御性"工作上,而不是真正的业务逻辑。Prefect 的目标是通过几个简单的装饰器,让你用最小的代码改动为现有的 Python 脚本添加完整的生产级编排能力。

Prefect 的定位在 Airflow 和 Dagster 之间:它比 Airflow 更易上手(普通 Python 代码加几个装饰器即可),比 Dagster 更灵活(以 Flow/Task 为中心,而不强制以资产为中心),适合快速构建数据管道的团队。

4.2 Prefect 的核心概念

**Flow(流)**是 Prefect 的基本单位,相当于 Airflow 的 DAG 或 Dagster 的 Job。Flow 是一个 Python 函数,加上 @flow 装饰器即可。Flow 负责编排 Task,定义依赖关系,处理异常和重试。

**Task(任务)**是 Flow 中的一个工作单元,加上 @task 装饰器即可。Task 是可重试、可缓存、可并发的执行单元。

**Deployment(部署)**是 Flow 的配置化版本,定义了 Flow 如何被调度(Cron、事件触发)、在哪里执行(本地、Docker、Kubernetes)。

**Work Pool(工作池)**定义了任务在哪里实际运行(本地进程、Docker 容器、Kubernetes Pod、ECS Task),是 Prefect 2.x 引入的执行基础设施抽象。

**Block(块)**是 Prefect 管理外部系统凭据和配置的方式(类似于 Airflow 的 Connections,但更灵活),可以在 Prefect Cloud/Server UI 中统一管理。

4.3 Prefect 的架构

用户 / 浏览器
      │
      │ HTTP
      ▼
┌─────────────────────────────────────────────────┐
│           Prefect Server / Prefect Cloud         │
│  (编排控制平面)                                │
│  - Orion API(REST API)                        │
│  - UI(基于 Vue.js)                             │
│  - 调度引擎(Scheduler)                         │
│  - 状态存储(SQLite / PostgreSQL)               │
└────────────────────┬────────────────────────────┘
                     │ API 通信(HTTP/WebSocket)
                     ▼
┌─────────────────────────────────────────────────┐
│              Work Pool + Worker                  │
│                                                 │
│ Worker 轮询 Prefect Server,获取待执行的 Flow Run│
│ 然后在本地/Docker/Kubernetes 中启动执行进程      │
│                                                 │
│ 类型:                                          │
│ - Process(本地进程)                           │
│ - Docker(容器)                                │
│ - Kubernetes(Pod)                             │
│ - ECS / Cloud Run                               │
└────────────────────┬────────────────────────────┘
                     │
                     ▼
┌─────────────────────────────────────────────────┐
│         Flow Run(执行你的 Python 代码)          │
│  Flow 代码在 Worker 上下载并执行                 │
│  执行过程中实时上报状态到 Prefect Server         │
└─────────────────────────────────────────────────┘

Prefect 架构最重要的特点是 "Server 和执行完全分离"。Prefect Server(或 Prefect Cloud)只是一个控制平面,负责调度决策和状态追踪,真正的代码执行发生在 Worker 进程/容器中,Worker 主动轮询(而不是被推送)任务。这个设计使得 Prefect 可以在非常轻量的基础设施上运行,甚至在防火墙后面的环境中(Worker 只需要能访问 Prefect Server,无需反向可达)。

4.4 Prefect 的代码示例

# prefect_ecb_pipeline.py
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from prefect.blocks.system import Secret
from prefect_aws import S3Bucket
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from datetime import timedelta, datetime
import pandas as pd
import requests

# ─────────────────────────────────────────────────────────────────────────────
# 1. 定义 Task(基础计算单元)
#    @task 装饰器让普通函数获得重试、缓存、日志、状态追踪能力
# ─────────────────────────────────────────────────────────────────────────────

@task(
    name="fetch-ecb-deposit-rate",
    description="从 ECB API 抓取存款利率",
    retries=3,                              # 失败重试 3 次
    retry_delay_seconds=60,                 # 每次重试间隔 60 秒
    # result_storage_key 用于结果缓存:相同输入的调用会直接返回缓存结果
    # cache_key_fn=task_input_hash 表示用输入参数的 hash 作为缓存键
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=1),    # 缓存 1 小时
    tags=["bronze", "ecb", "external-api"],
)
def fetch_ecb_deposit_rate(date: str) -> pd.DataFrame:
    logger = get_run_logger()
    logger.info(f"Fetching ECB deposit rate for {date}")

    # Prefect Block:从 Prefect Server 读取存储的 API URL 配置
    # 这样 URL 不需要硬编码在代码里,可以在 UI 中修改
    try:
        api_url = Secret.load("ecb-api-base-url").get()
    except Exception:
        api_url = "https://data-api.ecb.europa.eu"

    # 模拟 API 调用
    logger.info(f"Calling {api_url} for date {date}")
    return pd.DataFrame([
        {"date": date, "rate_type": "ECB_DEPOSIT", "value": 4.0}
    ])


@task(
    name="fetch-euribor-3m",
    retries=3,
    retry_delay_seconds=60,
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=1),
)
def fetch_euribor_rate(date: str) -> pd.DataFrame:
    logger = get_run_logger()
    logger.info(f"Fetching Euribor 3M for {date}")
    return pd.DataFrame([
        {"date": date, "rate_type": "EURIBOR_3M", "value": 3.89}
    ])


@task(name="write-to-s3-bronze")
def write_to_bronze(deposit_rate: pd.DataFrame, euribor: pd.DataFrame, date: str) -> str:
    """将数据写入 S3 Bronze 层"""
    logger = get_run_logger()
    combined = pd.concat([deposit_rate, euribor])

    # Prefect Block:从 Prefect Server 加载 S3 配置(Bucket、Region、凭据)
    # s3_bucket = S3Bucket.load("finlakehouse-bronze")
    # s3_bucket.upload_from_dataframe(combined, f"ecb_rates/{date}/data.parquet")

    logger.info(f"Written {len(combined)} rows to S3 bronze layer for {date}")
    return f"s3://finlakehouse/bronze/ecb_rates/{date}/data.parquet"


@task(
    name="transform-to-silver",
    retries=2,
)
def transform_to_silver(bronze_path: str, date: str) -> pd.DataFrame:
    """读取 Bronze 数据,清洗转换,写入 Silver 层"""
    logger = get_run_logger()
    logger.info(f"Transforming {bronze_path} to silver layer")

    # 模拟转换逻辑
    silver_data = pd.DataFrame([
        {"rate_date": date, "rate_type": "ECB_DEPOSIT", "value": 4.0, "processed_at": pd.Timestamp.now()},
        {"rate_date": date, "rate_type": "EURIBOR_3M", "value": 3.89, "processed_at": pd.Timestamp.now()},
    ])
    return silver_data


@task(name="compute-macro-state")
def compute_macro_state(silver_data: pd.DataFrame) -> dict:
    """基于 Silver 数据计算宏观状态"""
    deposit_rate = silver_data[silver_data["rate_type"] == "ECB_DEPOSIT"]["value"].iloc[0]

    if deposit_rate > 3.0:
        regime = "TIGHT_MONETARY"
    elif deposit_rate < 0.5:
        regime = "LOOSE_MONETARY"
    else:
        regime = "NEUTRAL"

    return {"date": silver_data["rate_date"].iloc[0], "macro_regime": regime}


@task(name="send-completion-notification")
def send_notification(result: dict, success: bool = True) -> None:
    logger = get_run_logger()
    status = "✅ 成功" if success else "❌ 失败"
    logger.info(f"Pipeline {status}: {result}")
    # 实际发送 Slack/Email 通知


# ─────────────────────────────────────────────────────────────────────────────
# 2. 定义 Flow(编排 Task 的主函数)
#    Flow 就是普通 Python 函数 + @flow 装饰器
# ─────────────────────────────────────────────────────────────────────────────

@flow(
    name="ecb-rates-daily-pipeline",
    description="每日 ECB 利率数据管道:抓取 → Bronze → Silver → Gold",
    version="1.0",
    # flow_run_name 支持模板变量,每次运行有清晰的标识名
    flow_run_name="ecb-rates-{date}",
    # retries=1,                              # Flow 级别的重试(重跑整个 Flow)
    # timeout_seconds=3600,                   # Flow 级别的超时
    log_prints=True,                          # 自动捕获 print() 输出为日志
)
def ecb_rates_pipeline(date: str = None) -> dict:
    """
    完整的 ECB 利率数据管道。
    
    注意这就是普通的 Python 函数,可以直接调用、直接测试,
    不需要任何 Prefect 服务运行。
    """
    if date is None:
        date = datetime.now().strftime("%Y-%m-%d")

    logger = get_run_logger()
    logger.info(f"Starting ECB rates pipeline for date: {date}")

    # Prefect 自动检测任务调用之间的依赖关系
    # 没有输入依赖关系的 Task 会并发执行
    # (fetch_deposit_rate 和 fetch_euribor 会并发运行)
    deposit_rate = fetch_ecb_deposit_rate(date)
    euribor = fetch_euribor_rate(date)

    # 两个并行 Task 的结果传入 write_to_bronze,Prefect 自动等待两者完成
    bronze_path = write_to_bronze(deposit_rate, euribor, date)

    # 串行执行(silver 依赖 bronze)
    silver_data = transform_to_silver(bronze_path, date)
    macro_state = compute_macro_state(silver_data)

    send_notification(macro_state, success=True)

    logger.info(f"Pipeline completed. Macro regime: {macro_state['macro_regime']}")
    return macro_state


# 子 Flow(Flow 可以调用其他 Flow,形成嵌套结构)
@flow(name="weekly-macro-summary")
def weekly_macro_summary(start_date: str, end_date: str) -> list:
    """每周汇总 Flow,调用 7 次日级 Flow"""
    results = []
    dates = pd.date_range(start=start_date, end=end_date, freq="D")

    for date in dates:
        # 调用子 Flow,每天一次
        result = ecb_rates_pipeline(date=date.strftime("%Y-%m-%d"))
        results.append(result)

    return results


# ─────────────────────────────────────────────────────────────────────────────
# 3. 本地直接运行(无需任何 Prefect Server)
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    # 直接调用就像调用普通 Python 函数
    result = ecb_rates_pipeline(date="2024-01-15")
    print(f"Result: {result}")

4.5 Prefect 的部署(Deployment)

# deploy.py
from prefect.deployments import Deployment
from prefect.infrastructure.container import DockerContainer
from prefect.server.schemas.schedules import CronSchedule

# 方式 1:通过代码定义 Deployment
deployment = Deployment.build_from_flow(
    flow=ecb_rates_pipeline,
    name="ecb-rates-daily",
    version="1.0",
    work_queue_name="data-pipeline",   # 哪个 Work Pool 处理这个 Deployment
    schedule=CronSchedule(
        cron="0 1 * * *",
        timezone="Europe/Berlin"
    ),
    parameters={"date": None},          # 默认参数
    tags=["production", "finance"],
    # 指定执行基础设施(可以是 Docker 容器,也可以是 Kubernetes)
    # infrastructure=DockerContainer(
    #     image="my-pipeline:latest",
    #     env={"AWS_REGION": "eu-west-1"}
    # )
)

deployment.apply()  # 将 Deployment 注册到 Prefect Server

# 方式 2:使用 YAML 文件(prefect.yaml)定义 Deployment(Prefect 3.x 推荐方式)
# prefect deploy --all

5. 三者核心架构横向对比

5.1 哲学与编程模型对比

这是理解三者差异最重要的维度,也是面试时展示架构思维的切入点:

编程模型上,Airflow 的范式是"任务图"——你显式定义一个 DAG,声明哪些节点(任务)存在、节点之间的边(依赖关系)是什么。Dagster 的范式是"资产图"——你定义的是数据资产的计算逻辑,Dagster 根据资产之间的输入/输出关系自动推断依赖图。Prefect 的范式是"函数流"——你用普通 Python 函数定义工作流,任务之间的依赖关系由函数调用的顺序自动推断,无需显式声明 DAG。

数据可见性上,Airflow 对数据是不透明的,它只知道任务跑成功了,不知道任务生产了什么数据。Dagster 对数据是一等公民,每个 Asset 有完整的元数据、血缘、版本追踪,是三者中数据可见性最强的。Prefect 位于中间,通过 Artifact 功能可以记录任务产出物,但不如 Dagster 深度集成。

5.2 全面特性对比矩阵

特性 Apache Airflow Dagster Prefect
核心抽象 DAG + Operator + Task Asset + Op + Resource Flow + Task
编程模型 任务图(Task-centric) 资产图(Asset-centric) 函数流(Flow-centric)
学习曲线 高(概念多,坑多) 中高(概念新颖,需转变思维) 低(最接近普通 Python)
本地开发体验 差(需要完整 Airflow 环境) 极佳(普通 pytest 即可) 极佳(直接运行 Python 文件)
数据血缘追踪 需第三方(OpenLineage) 内建,一等公民 基本支持(Artifact)
资产可视化 不支持 内建 Asset Catalog 有限支持
动态任务 2.3+ 支持(Dynamic Task Mapping) 支持(动态分区) 支持(.map()/.submit())
并发执行 基于 Executor(Celery/K8s) 基于 Executor(内置多种) 基于 Task Runner(Dask/Ray)
分区/增量处理 手动实现 内建(Partitions),一等公民 手动实现
依赖注入 有限(Connection/Variable) 强(Resource 系统) 中(Block 系统)
测试友好性 极佳 极佳
UI 界面 功能完整,但较复杂 现代,血缘图优秀 简洁现代
调度能力 强(Cron + 传感器) 强(Schedule + Sensor) 强(Cron + Event Trigger)
部署复杂度 高(多组件) 中(较 Airflow 简单) 低(最简单)
云托管服务 MWAA(AWS)、Cloud Composer(GCP) Dagster Cloud Prefect Cloud
企业采用率 最高(事实标准) 快速增长 中等
开源许可 Apache 2.0 Apache 2.0 Apache 2.0
Databricks 集成 ✅ 官方 Operator ✅ 官方集成 ✅ 官方集成
Spark 集成 ✅ SparkSubmitOperator ✅ 原生支持 ✅ 支持
适合的团队规模 大型团队(10+ 数据工程师) 中大型团队 小中型团队(1-10 人)
适合的管道数量 数百到数千个 数百个 数十到数百个

5.3 数据血缘:最关键的差异

用一个具体例子说明血缘能力的差异。假设你的 Gold 层 macro_state_daily 表数据出现了问题,你需要追查根因:

Airflow 中,你只能看到"哪个 Task 运行成功/失败了"。要追查数据问题,你需要人工查看每个任务的代码,人工追踪数据流,或者依赖 OpenLineage 这个额外工具来注入到 Airflow 的各个 Operator 中。

Dagster 中,UI 的 Asset Catalog 直接展示:macro_state_daily 依赖于 rates_silverrates_silver 依赖于 ecb_deposit_rate_raweuribor_3m_raw,每个资产最后一次物化(Materialization)的时间、用的是哪个 Job、结果的行数,都一目了然。如果发现数据问题发生在某一天,你可以直接重新物化那一天的分区,Dagster 会自动处理所有依赖链。


6. 选型指南

6.1 选 Airflow 的场景

当你的组织已经有大量现有 Airflow 基础设施,迁移成本高昂时,维护现有 Airflow 部署往往是最务实的选择。Airflow 社区生态最成熟,几乎所有数据工具(Snowflake、dbt、Databricks、Fivetran 等)都提供官方 Airflow Operator,遇到问题时 Stack Overflow 和社区资源最丰富。如果你面试的目标公司(如 Deutsche Börse)使用 Airflow,学习 Airflow 有直接的求职价值,因为大型金融机构往往是技术保守型的。当你的 Pipeline 需要极其复杂的条件逻辑、多层依赖、跨系统集成,并且有专门的 Data Engineering 团队来维护时,Airflow 的灵活性也有其价值。

6.2 选 Dagster 的场景

如果你正在构建一个以数据质量和数据可信度为核心的现代数据平台,Dagster 是最佳选择。当你需要精确的数据血缘追踪(比如监管合规要求能够追溯每一行数据的来源,这在金融领域非常普遍)时,Dagster 的资产血缘是内建的,无需额外工具。当你的团队拥抱测试驱动开发,希望 Pipeline 代码可以像普通 Python 代码一样测试时,Dagster 的体验是三者中最佳的。当你的数据管道有自然的分区语义(按日、按小时、按地区处理数据),Dagster 的 Partitions 系统让增量处理和历史回填变得极其简单。这也是你在 SoloLakehouse 选择 Dagster 的核心理由:你的 Medallion 架构天然是 Asset-centric 的,Bronze/Silver/Gold 本身就是数据资产,Dagster 的模型与之完美契合。

6.3 选 Prefect 的场景

当你需要快速从零搭建一个生产级数据管道,同时团队中 Python 能力强但 Airflow 经验少时,Prefect 是最快的路径。Prefect 的 "Pythonic First" 设计意味着你现有的 Python 脚本只需加几个装饰器就变成了生产级 Pipeline,学习成本极低。当你的基础设施是混合的(部分任务在本地服务器运行,部分在 AWS Lambda,部分在 Kubernetes),Prefect 的 Work Pool 系统对这种混合场景的支持非常灵活。当你是初创公司或小型团队,不想维护复杂的 Airflow 集群,但又需要比 Cron Job 更强的可靠性和可观测性时,Prefect Cloud(托管服务)是最省心的选择。


7. 真实代码示例:同一个 Pipeline 的三种写法

为了直观比较三者的代码风格差异,我们用相同的业务逻辑(Kafka → Bronze → Silver 的增量处理)来展示三种写法:

7.1 Airflow 写法

# airflow 风格:显式定义 DAG 和任务依赖
from airflow import DAG
from airflow.decorators import task
from datetime import datetime

with DAG("kafka_to_silver", schedule_interval="*/15 * * * *",
         start_date=datetime(2024, 1, 1), catchup=False) as dag:

    @task()
    def consume_from_kafka(ds_nodash: str) -> list:
        """消费 Kafka 消息"""
        # 每15分钟消费一批
        return [{"event_id": 1, "ts": "2024-01-01T10:00:00", "amount": 150.0}]

    @task()
    def write_bronze(messages: list) -> str:
        """写入 Bronze 层"""
        print(f"Writing {len(messages)} messages to bronze")
        return "s3://bucket/bronze/transactions/2024-01-01/"

    @task()
    def transform_silver(bronze_path: str) -> int:
        """转换并写入 Silver 层"""
        print(f"Transforming {bronze_path} to silver")
        return 1000  # 处理行数

    # 依赖关系通过函数调用链隐式定义(TaskFlow API)
    messages = consume_from_kafka()
    bronze_path = write_bronze(messages)
    transform_silver(bronze_path)

7.2 Dagster 写法

# dagster 风格:定义数据资产,框架推断依赖
from dagster import asset, AssetIn, Definitions, define_asset_job

@asset(group_name="bronze", description="从 Kafka 消费的原始交易消息")
def kafka_transactions_raw(context) -> list:
    """Kafka Asset:将 Kafka 消费抽象为一个数据资产"""
    context.log.info("Consuming from Kafka...")
    return [{"event_id": 1, "ts": "2024-01-01T10:00:00", "amount": 150.0}]

@asset(
    group_name="silver",
    ins={"raw_messages": AssetIn("kafka_transactions_raw")},
    description="清洗后的交易数据(Silver 层)",
)
def transactions_silver(context, raw_messages: list) -> dict:
    """Silver Asset:自动依赖 Bronze Asset,IO Manager 自动处理数据传递"""
    context.log.info(f"Transforming {len(raw_messages)} messages")
    return {"record_count": len(raw_messages), "status": "success"}

# Dagster 自动知道要先物化 kafka_transactions_raw,再物化 transactions_silver
# 同时 UI 会展示两者的血缘关系和每次物化的详情

7.3 Prefect 写法

# prefect 风格:最接近普通 Python,函数调用顺序即依赖顺序
from prefect import flow, task

@task(retries=3, retry_delay_seconds=30)
def consume_from_kafka() -> list:
    """Task 就是普通 Python 函数加装饰器"""
    return [{"event_id": 1, "ts": "2024-01-01T10:00:00", "amount": 150.0}]

@task(retries=2)
def write_bronze(messages: list) -> str:
    print(f"Writing {len(messages)} messages to bronze")
    return "s3://bucket/bronze/transactions/"

@task(retries=2)
def transform_silver(bronze_path: str) -> int:
    print(f"Transforming {bronze_path}")
    return 1000

@flow(name="kafka-to-silver")
def kafka_to_silver_pipeline():
    """
    Flow 也是普通 Python 函数,可以直接运行:
    python prefect_pipeline.py  # 无需 Prefect Server
    """
    messages = consume_from_kafka()
    bronze_path = write_bronze(messages)
    count = transform_silver(bronze_path)
    print(f"Pipeline complete: processed {count} records")

# 直接运行,无需任何外部依赖
kafka_to_silver_pipeline()

观察三段代码的本质差异:Airflow 代码要求你理解 DAG、Operator、Context 等框架专有概念;Dagster 代码要求你转变思维,以"产生什么资产"而非"执行什么操作"来思考问题;Prefect 代码看起来和你在 Jupyter Notebook 里写的没有本质区别,只是多了两个装饰器。


8. 生产环境最佳实践

8.1 Airflow 生产最佳实践

Project 结构规范:将 DAG 文件、Operator 定义、工具函数分开管理,避免 dags/ 目录变成大杂烩:

airflow-project/
├── dags/                     # 只放 DAG 定义文件
│   ├── ecb_rates_dag.py
│   └── positions_dag.py
├── plugins/                  # 自定义 Operator、Hook
│   └── operators/
│       └── iceberg_operator.py
├── include/                  # SQL 模板、配置文件
│   └── sql/
│       └── transform_silver.sql
├── tests/                    # DAG 单元测试
│   └── test_dags.py
├── requirements.txt
└── Dockerfile

DAG 单元测试(这是 Airflow 中为数不多的可测试部分):

# tests/test_dags.py
import pytest
from airflow.models import DagBag

@pytest.fixture()
def dagbag():
    return DagBag(dag_folder="dags/", include_examples=False)

def test_no_import_errors(dagbag):
    """确保所有 DAG 文件可以无错误地导入"""
    assert len(dagbag.import_errors) == 0, \
        f"DAG import errors: {dagbag.import_errors}"

def test_dag_structure(dagbag):
    """测试 DAG 是否符合预期结构"""
    dag = dagbag.get_dag("ecb_rates_pipeline")
    assert dag is not None
    assert len(dag.tasks) == 6
    # 验证关键任务存在
    task_ids = {task.task_id for task in dag.tasks}
    assert "write_to_bronze" in task_ids
    assert "transform_to_silver" in task_ids

def test_dag_has_no_cycles(dagbag):
    """验证 DAG 确实是无环的"""
    dag = dagbag.get_dag("ecb_rates_pipeline")
    # Airflow 内部会在 DagBag 加载时检查环,有环会直接抛异常
    assert dag is not None  # 能加载成功说明无环

8.2 Dagster 生产最佳实践

代码结构规范,以 SoloLakehouse 为例:

finlakehouse/
├── finlakehouse/
│   ├── __init__.py            # 包含 Definitions 入口
│   ├── assets/
│   │   ├── __init__.py
│   │   ├── bronze/
│   │   │   ├── ecb_rates.py
│   │   │   └── euribor.py
│   │   ├── silver/
│   │   │   └── rates_combined.py
│   │   └── gold/
│   │       └── macro_state.py
│   ├── resources/
│   │   ├── __init__.py
│   │   ├── s3_resource.py
│   │   └── iceberg_resource.py
│   ├── io_managers/
│   │   ├── __init__.py
│   │   └── iceberg_io_manager.py
│   ├── sensors/
│   │   └── new_data_sensor.py
│   └── schedules/
│       └── daily_schedule.py
├── tests/
│   ├── assets/
│   │   ├── test_bronze_assets.py
│   │   └── test_silver_assets.py
│   └── conftest.py            # 共享 fixture:Mock Resources
├── pyproject.toml
└── workspace.yaml             # Dagster 代码位置配置

**Freshness Policy(新鲜度策略)**是 Dagster 的一个高级特性,特别适合金融数据场景:

from dagster import asset, FreshnessPolicy

@asset(
    group_name="gold",
    # 声明此 Asset 的新鲜度要求:最多 1 小时前的数据
    # 如果超过 1 小时没有更新,Dagster UI 会发出告警
    freshness_policy=FreshnessPolicy(maximum_lag_minutes=60),
    description="实时风险指标(要求 1 小时内更新)"
)
def realtime_risk_metrics(context):
    ...

8.3 Prefect 生产最佳实践

利用 Prefect Cloud 进行零基础设施托管是 Prefect 的核心优势。对于个人项目和初期产品,完全可以不运行自己的 Prefect Server:

# 1. 安装 Prefect 并连接到 Prefect Cloud(免费层)
pip install prefect
prefect cloud login

# 2. 创建 Work Pool(定义代码在哪里运行)
prefect work-pool create "local-pool" --type process

# 3. 部署 Flow
prefect deploy ecb_pipeline.py:ecb_rates_pipeline \
    --name "ecb-rates-daily" \
    --pool "local-pool" \
    --cron "0 1 * * *"

# 4. 启动 Worker(长期运行的进程,轮询任务并执行)
prefect worker start --pool "local-pool"

9. 与 Lakehouse 架构的集成模式

9.1 在 Dagster 中驱动 Iceberg 表物化

这是你的 SoloLakehouse + FinLakehouse 的核心集成场景。以下展示了一个完整的 Dagster + Iceberg + Trino 集成示例:

# resources/iceberg_resource.py
from dagster import resource, Field, StringSource
from pyspark.sql import SparkSession
import pyiceberg.catalog

class IcebergSparkResource:
    """封装 Iceberg + Spark 的操作,通过 Dagster Resource 注入"""

    def __init__(self, catalog_uri: str, warehouse: str, s3_endpoint: str):
        self._spark = SparkSession.builder \
            .config("spark.sql.extensions",
                    "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
            .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
            .config("spark.sql.catalog.iceberg.type", "hive") \
            .config("spark.sql.catalog.iceberg.uri", catalog_uri) \
            .config("spark.hadoop.fs.s3a.endpoint", s3_endpoint) \
            .getOrCreate()

    @property
    def spark(self) -> SparkSession:
        return self._spark

    def run_sql(self, sql: str):
        return self._spark.sql(sql)


@resource(config_schema={
    "catalog_uri": Field(StringSource, default_value="thrift://hive-metastore:9083"),
    "warehouse": Field(StringSource, default_value="s3a://sololakehouse/iceberg"),
    "s3_endpoint": Field(StringSource, default_value="http://minio:9000"),
})
def iceberg_spark_resource(context) -> IcebergSparkResource:
    return IcebergSparkResource(
        catalog_uri=context.resource_config["catalog_uri"],
        warehouse=context.resource_config["warehouse"],
        s3_endpoint=context.resource_config["s3_endpoint"],
    )


# assets/gold/macro_state.py
from dagster import asset, AssetIn
from dagster import MetadataValue  # 用于记录 Asset 的元数据

@asset(
    group_name="gold",
    ins={"rates_silver": AssetIn("rates_silver")},
    required_resource_keys={"iceberg_spark"},
    description="Gold 层:宏观利率状态宽表(存储在 Iceberg 格式)",
)
def macro_state_daily(context) -> None:
    """
    计算并更新 Gold 层的 Iceberg 表。
    注意:返回 None 是因为 IO Manager 直接由 SQL 写表,
    我们通过 Output Metadata 记录写入的行数等信息。
    """
    spark = context.resources.iceberg_spark.spark

    # 用 SQL MERGE 更新 Iceberg 表(支持 Upsert)
    result = spark.sql("""
        MERGE INTO iceberg.gold.macro_state_daily AS target
        USING (
            SELECT
                rate_date AS calc_date,
                MAX(CASE WHEN rate_type = 'ECB_DEPOSIT' THEN value END) AS ecb_deposit_rate,
                MAX(CASE WHEN rate_type = 'EURIBOR_3M' THEN value END) AS euribor_3m,
                CASE
                    WHEN MAX(CASE WHEN rate_type = 'ECB_DEPOSIT' THEN value END) > 3.0
                    THEN 'TIGHT_MONETARY'
                    ELSE 'NEUTRAL'
                END AS macro_regime,
                CURRENT_TIMESTAMP() AS updated_at
            FROM iceberg.silver.rates_clean
            GROUP BY rate_date
        ) AS source
        ON target.calc_date = source.calc_date
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
    """)

    # 查询写入结果,作为 Asset Metadata 记录(在 Dagster UI 中可见)
    row_count = spark.sql(
        "SELECT COUNT(*) as cnt FROM iceberg.gold.macro_state_daily"
    ).collect()[0]["cnt"]

    # 记录 Asset Materialization 的元数据,在 UI 的 Asset Details 页面显示
    context.add_output_metadata({
        "row_count": MetadataValue.int(row_count),
        "iceberg_table": MetadataValue.text("iceberg.gold.macro_state_daily"),
        "preview": MetadataValue.md(
            spark.sql("SELECT * FROM iceberg.gold.macro_state_daily LIMIT 5")
            .toPandas().to_markdown()
        )
    })

9.2 对接 dbt 的最佳实践

三个编排工具都与 dbt 有深度集成,因为 dbt 是现代数据栈中最常见的 SQL 转换工具。其中 Dagster 的集成最为优雅:

# Dagster + dbt 集成(dagster-dbt 包)
from dagster_dbt import DbtCliResource, dbt_assets, DagsterDbtTranslator

# dbt_assets 装饰器会读取 dbt project 的 manifest.json,
# 自动为每个 dbt model 创建一个 Dagster Asset
# dbt 模型之间的依赖关系(ref())会自动转化为 Dagster 的资产依赖

@dbt_assets(
    manifest=Path("dbt_project/target/manifest.json"),
)
def dbt_finance_models(context, dbt: DbtCliResource):
    """
    这一个 @dbt_assets 装饰器,会自动创建 dbt project 中所有 model 对应的 Asset。
    在 Dagster UI 中,每个 dbt model 都是一个独立的资产节点,
    和 Python Asset 的血缘关系完整呈现。
    """
    yield from dbt.cli(["run"], context=context).stream()

10. 面试高频问题与标准答案

Q1:Airflow 的 Scheduler 是如何工作的?为什么它在大规模场景下有性能问题?

标准答案:Airflow Scheduler 每隔一段时间(默认 30 秒,2.x 版本有优化)扫描整个 dags/ 目录,加载并解析每一个 Python 文件,执行其中的 DAG 定义代码,然后更新 Metadata Database 中的任务状态,决定哪些任务已就绪(依赖全部完成)可以执行,将其推入执行队列。性能瓶颈有两个:其一是"DAG 文件解析",每次扫描都要 Python import 所有文件,如果某个 DAG 文件有慢速 import(如大型 ORM),会拖慢整个调度循环;其二是 "Metadata Database 竞争",所有 Scheduler 判断逻辑都依赖数据库读写,在数百个并发任务时,数据库成为瓶颈。Airflow 2.x 通过"HA Scheduler"(多 Scheduler 实例)和优化的 DAG 解析器部分解决了这个问题,但根本上仍受限于这个中央数据库设计。

Q2:什么是 Dagster 的 Software-Defined Asset,它与 Airflow Task 的根本区别是什么?

标准答案:Dagster 的 Software-Defined Asset(SDA,软件定义资产)是对一个持久化数据对象的声明,包含三要素:这个数据对象在哪里(Storage Location)、它依赖哪些其他数据对象(Upstream Dependencies)、如何计算它(Computation Logic)。与 Airflow Task 的根本区别在于关注点不同:Airflow Task 描述的是"执行一个动作",框架不关心这个动作产生了什么;Dagster SDA 描述的是"一个数据对象的存在和计算方式",框架天然知道每个数据对象和其他对象的关系,从而可以提供内建的数据血缘、自动感知数据新鲜度(是否需要重新物化)、增量处理支持(只重算变化了的分区)。用一个比喻:Airflow Task 是"动词",Dagster Asset 是"名词"——你更倾向于用名词(数据)还是动词(动作)来思考你的数据系统,决定了你更适合用哪个工具。

Q3:Prefect 的 Work Pool 和 Airflow 的 Executor 有什么本质区别?

标准答案:两者都是解决"任务在哪里运行"的问题,但设计哲学不同。Airflow 的 Executor 是 Scheduler 的一部分,由 Airflow 服务器**推送(Push)任务到执行环境(Celery Worker、Kubernetes Pod)。Prefect 的 Work Pool + Worker 是拉取(Pull)**模型:Worker 进程自主轮询 Prefect Server 上的任务队列,主动获取并执行任务。拉取模型的优势在于:Worker 不需要被 Prefect Server 网络可达(防火墙友好),可以轻松在本地开发机、云服务器、Kubernetes 上混合部署,Worker 宕机后重启即可自动恢复,不需要 Prefect Server 做任何配置变更。

Q4:如何在 Airflow 中实现 Sensor 触发?与 Dagster Sensor 有何不同?

标准答案:Airflow 的 Sensor 是一种特殊的 Operator,它持续轮询某个外部状态(文件是否存在、API 是否返回特定状态),直到条件满足才继续执行下游任务。Airflow Sensor 的问题是它在等待期间持续占用一个 Worker Slot(在 CeleryExecutor 中),即使什么都没做,这会浪费 Worker 资源。Airflow 引入了 reschedule 模式来缓解这个问题(等待时释放 Slot),但实现相对复杂。Dagster Sensor 是完全不同的设计:Sensor 函数由 Dagster Daemon 以低频率(默认每 30 秒)调用,每次调用检查一次外部状态,如果条件满足则触发 Run Request,否则直接返回。Sensor 函数不是长期阻塞的,Daemon 只是周期性地调用它,资源效率更高。Dagster Sensor 还支持 cursor(游标)来记录上次检查到的状态,避免重复处理。

Q5:在金融数据平台中,你会选择哪个编排工具?为什么?

标准答案(展示架构思维):选型取决于三个关键约束。第一,如果是加入一个现有的大型金融机构,他们大概率已经部署了 Airflow(或 MWAA/Cloud Composer),此时的选择是在 Airflow 之上构建,而不是引入新工具。第二,如果是从零开始构建,并且数据血缘和合规可审计性是核心需求(这在 BaFin 监管的金融机构几乎是必须的),Dagster 是首选,因为其 Asset-centric 模型天然支持数据血缘,而数据血缘是 DORA 和 MiFID II 合规的基础要求。第三,如果是初创 FinTech,团队人少、需要快速迭代,Prefect Cloud 可以在几小时内搭建起来,是最快的路径。在我的 SoloLakehouse 中选择 Dagster,是因为我构建的是 Medallion 架构,Bronze/Silver/Gold 正是典型的数据资产层次,Dagster 的资产化模型与这个架构天然契合,同时 Dagster 的测试友好性使我可以为每个 Asset 写单元测试,这在金融数据处理中是必要的数据质量保障。


11. Platform Owner 思维:战略总结

学完这三个工具,如果只允许你记住三件事,这三件事应该是:

第一件事是编排工具的本质是"运维复杂性的集中管理"。每一个生产级数据管道都需要处理重试、失败恢复、并发控制、监控告警,这些是"负面工程"的核心。Airflow、Dagster、Prefect 都是把这些复杂性从你的业务代码中抽离出来,用框架统一管理。理解了这个本质,你就能理解为什么在 Jupyter Notebook 里运行的管道无法直接搬到生产环境:不是因为代码逻辑不对,而是因为缺少了整套运维基础设施。

第二件事是 Dagster 的 Asset-centric 模型代表了数据工程的演化方向。在过去十年里,数据工程的关注点从"如何运行任务"(Airflow 范式)逐渐转向"如何管理数据资产"(Dagster 范式)。这个趋势的背后是数据治理和数据可信度的需求在上升,特别是在金融、医疗等受监管行业。掌握 Dagster 不仅是掌握一个工具,更是掌握了未来数据工程的思维模式。

第三件事是工具选择从来不是技术问题,而是组织和战略问题。没有"最好的编排工具",只有"最适合当前约束的编排工具"。当你在面试中被问到"你会选什么工具",正确的回答不是直接给出答案,而是先问三个问题:现有基础设施是什么?数据治理要求是什么级别?团队的技术背景和人数是多少?能够以这种方式回答技术选型问题,展示的是 Platform Owner 的思维格局,而不仅仅是工程师的技术积累。

在你的 SoloLakehouse ADR 中,写清楚选择 Dagster 的理由:不是因为 Dagster"更好",而是因为(1)Asset-centric 模型与 Medallion 架构完美契合;(2)内建分区系统大幅降低了增量处理的复杂度;(3)测试友好性支持 CI/CD;(4)未来对接 Iceberg 表的血缘追踪有更清晰的实现路径。这四条理由,每一条都是具体的架构约束,而不是模糊的"感觉更好",这是区分高级工程师和初级工程师技术决策质量的核心差异。


文档版本:v1.0 | 作者:SoloLakehouse Knowledge Base | 最后更新:2024