Dagster 完整训练手册

从零到生产:数据工程师与 AI 平台架构师的系统性学习路径

适用读者:有 Python 基础、了解数据管道概念、希望在生产级数据/ML 平台中使用 Dagster 的工程师。

学习目标:完成本手册后,你将能够独立设计、实现、测试并部署基于 Dagster 的数据资产管道,包括与 Spark、MinIO、MLflow 等工具的集成。


目录

  1. Dagster 的核心哲学:为什么不用 Airflow?
  2. 安装与项目初始化
  3. 核心概念 I:Software-Defined Assets(SDA)
  4. 核心概念 II:Resources(资源)
  5. 核心概念 III:Jobs、Ops 与 Graphs
  6. 核心概念 IV:Partitions(分区)
  7. 核心概念 V:Schedules 与 Sensors
  8. 核心概念 VI:IO Managers
  9. 测试策略:让 Pipeline 可验证
  10. 与 MLflow 集成:ML Pipeline 实战
  11. 与 Spark / Delta Lake 集成
  12. Metadata、Lineage 与可观测性
  13. 生产部署:Docker + Dagster Daemon
  14. FinLakehouse 综合案例

<a name="chapter-1"></a>

Chapter 1:Dagster 的核心哲学——为什么不用 Airflow?

1.1 任务调度 vs. 资产管理:思维层级的转变

Airflow 的世界观是任务(Task):你告诉系统"先运行 A,再运行 B,再运行 C"。这是一种过程导向的思维,关注的是动词——"做什么"。

Dagster 的世界观是资产(Asset):你告诉系统"我有数据资产 X,它依赖资产 Y 和 Z"。这是一种结果导向的思维,关注的是名词——"有什么"。

这个区别在实际工作中意味着:

  • 在 Airflow 中,如果你想知道"bronze_flights 表里的数据是从哪里来的",你需要阅读多个 DAG 文件,逐步推断。
  • 在 Dagster 中,你直接在 UI 上点击 bronze_flights 资产,就能看到它的上游(原始 CSV)和下游(silver_flights)以及每次物化的 metadata。

高阶思维:招聘经理在评估平台架构师时,最看重的不是"你会用哪个工具",而是"你是否理解数据平台的本质是管理数据资产的生命周期"。Dagster 的资产理念与 Data Mesh、Data Contract 等现代架构思想天然契合,这是一个可以在面试中展开讲的核心论点。

1.2 Dagster 的核心设计原则

原则一:可测试性(Testability)。每一个 Asset、Resource、Op 都可以在不启动 Dagster 服务的情况下独立进行单元测试。这在 Airflow 中几乎是不可能的。

原则二:类型系统与 Metadata。Dagster 鼓励你为每个资产定义输入/输出的类型,并在运行时记录 metadata(行数、数据分布等),形成自动化的数据质量日志——这在受监管行业(BaFin、EU AI Act)中非常重要。

原则三:依赖注入(Resource Injection)。数据库连接、S3 客户端、Spark Session 等都作为 Resource 注入,测试时可以替换为 Mock,生产时切换为真实连接。这让代码从根本上解耦。


<a name="chapter-2"></a>

Chapter 2:安装与项目初始化

2.1 环境准备

建议使用 Python 3.10+,并用 venvconda 隔离环境。

# 创建并激活虚拟环境
python -m venv dagster-env
source dagster-env/bin/activate  # Linux/macOS
# 或 dagster-env\Scripts\activate  # Windows

# 安装 Dagster 核心包
pip install dagster dagster-webserver

# 验证安装
dagster --version

2.2 用官方脚手架初始化项目

# 创建一个新项目(推荐方式,会生成标准目录结构)
dagster project scaffold --name finlakehouse_pipeline

cd finlakehouse_pipeline
pip install -e ".[dev]"

生成的目录结构如下:

finlakehouse_pipeline/
├── finlakehouse_pipeline/       # 主包
│   ├── __init__.py              # 注册所有 Definitions
│   ├── assets/                  # Asset 定义
│   │   └── __init__.py
│   ├── resources/               # Resource 定义
│   │   └── __init__.py
│   ├── jobs/                    # Job 定义
│   │   └── __init__.py
│   └── schedules/               # Schedule 定义
│       └── __init__.py
├── finlakehouse_pipeline_tests/  # 测试目录
├── pyproject.toml               # 包配置
└── setup.py

为什么要关注目录结构? 作为架构师,你的代码组织方式本身就是一种"架构表达"。按关注点分离(Assets / Resources / Jobs / Schedules)组织代码,使每个目录的职责清晰,是大型平台项目的必要做法。

2.3 启动 Dagster UI

# 启动开发服务器(会自动热加载代码变更)
dagster dev

打开 http://localhost:3000,你会看到 Dagster 的 Launchpad UI。目前它是空的,接下来我们逐步填充内容。


<a name="chapter-3"></a>

Chapter 3:核心概念 I——Software-Defined Assets(SDA)

3.1 你的第一个 Asset

Asset 是 Dagster 中最重要的概念。一个 Asset 代表一个持久化的数据对象——一张数据库表、一个 Parquet 文件、一个训练好的模型,等等。

# finlakehouse_pipeline/assets/bronze.py

from dagster import asset
import pandas as pd

@asset(
    # 描述这个资产是什么(会显示在 UI 上)
    description="从原始 CSV 文件加载的航班延误数据(Landing Zone → Bronze)",
    # 给资产打标签,便于分组和过滤
    tags={"layer": "bronze", "domain": "flights"},
    # 计算资产所需的组(可选,用于多团队项目)
    group_name="bronze_layer",
)
def bronze_flights(context) -> pd.DataFrame:
    """
    这是 Asset 的实现函数。
    函数名 `bronze_flights` 就是这个资产的 key(唯一标识符)。
    返回值就是这个资产的"物化(materialization)"结果。
    """
    # context 对象提供日志、配置等功能
    context.log.info("开始从 Landing Zone 加载原始航班数据...")
    
    df = pd.read_csv("/data/landing/flights_raw.csv")
    
    # 记录 metadata:这些信息会出现在 Dagster UI 的资产详情页
    context.add_output_metadata({
        "num_rows": len(df),
        "num_columns": len(df.columns),
        "columns": df.columns.tolist(),
        "sample": df.head(3).to_markdown(),  # 需要 tabulate 包
    })
    
    context.log.info(f"成功加载 {len(df)} 行数据")
    return df

理解这段代码的关键@asset 装饰器把一个普通 Python 函数变成了一个"资产定义"。函数名是资产的名字,函数的返回值是资产的内容,函数的参数(除了 context)是它依赖的其他资产。

3.2 资产依赖:让 Dagster 自动推断 DAG

# finlakehouse_pipeline/assets/silver.py

from dagster import asset
import pandas as pd

@asset(
    description="清洗后的航班数据(Bronze → Silver)",
    group_name="silver_layer",
    tags={"layer": "silver"},
)
def silver_flights(
    context,
    bronze_flights: pd.DataFrame,  # 参数名 = 依赖的资产名!Dagster 自动建立依赖关系
) -> pd.DataFrame:
    """
    注意:参数 `bronze_flights` 的类型是 pd.DataFrame。
    Dagster 看到这个参数名与上面定义的 `bronze_flights` 资产匹配,
    就会自动:
    1. 先物化 bronze_flights
    2. 把结果传给这个函数
    这就是"依赖注入式"的 DAG 构建,无需手动写 `>>` 或 `set_upstream()`。
    """
    context.log.info("开始数据清洗...")
    
    df = bronze_flights.copy()
    
    # 数据质量规则
    original_rows = len(df)
    df = df.dropna(subset=["flight_id", "departure_time", "arrival_time"])
    df = df[df["delay_minutes"] >= -30]  # 过滤异常负延误
    
    # 标准化列名
    df.columns = [col.lower().replace(" ", "_") for col in df.columns]
    
    rows_removed = original_rows - len(df)
    context.add_output_metadata({
        "rows_input": original_rows,
        "rows_output": len(df),
        "rows_removed": rows_removed,
        "removal_rate": f"{rows_removed/original_rows:.2%}",
    })
    
    return df


@asset(
    description="航班特征工程数据(Silver → Gold)",
    group_name="gold_layer",
    tags={"layer": "gold"},
)
def gold_flight_features(
    context,
    silver_flights: pd.DataFrame,  # 依赖 silver 层
) -> pd.DataFrame:
    """Gold 层:为 ML 模型准备特征"""
    
    df = silver_flights.copy()
    
    # 特征工程
    df["departure_hour"] = pd.to_datetime(df["departure_time"]).dt.hour
    df["is_weekend"] = pd.to_datetime(df["departure_time"]).dt.dayofweek >= 5
    df["delay_category"] = pd.cut(
        df["delay_minutes"],
        bins=[-30, 0, 15, 60, float("inf")],
        labels=["early", "on_time", "minor_delay", "major_delay"],
    )
    
    context.add_output_metadata({
        "num_features": len(df.columns),
        "delay_distribution": df["delay_category"].value_counts().to_dict(),
    })
    
    return df

3.3 注册 Assets:让 Dagster 找到你的代码

# finlakehouse_pipeline/__init__.py

from dagster import Definitions, load_assets_from_modules

# 导入所有 asset 模块
from finlakehouse_pipeline.assets import bronze, silver

# Definitions 是整个项目的"入口",类似于 Flask 的 app
defs = Definitions(
    assets=load_assets_from_modules([bronze, silver]),
)

load_assets_from_modules 会自动扫描模块中所有被 @asset 装饰的函数,你不需要逐一手动列出。

3.4 练习:在 UI 中物化资产

  1. 确保 dagster dev 正在运行。
  2. 打开 UI,点击左侧的 Assets 菜单。
  3. 你会看到 bronze_flights → silver_flights → gold_flight_features 的依赖图。
  4. 点击 gold_flight_features,然后点击右上角的 Materialize 按钮。
  5. Dagster 会自动按顺序物化整条链路,并在每个节点记录你定义的 metadata。

<a name="chapter-4"></a>

Chapter 4:核心概念 II——Resources(资源)

4.1 为什么需要 Resources?

假设你的 Bronze Asset 需要连接 MinIO(S3 兼容存储),你可能会这样写:

# ❌ 反模式:直接在 Asset 里硬编码连接信息
@asset
def bronze_flights():
    import boto3
    s3 = boto3.client("s3", endpoint_url="http://localhost:9000", ...)
    ...

这种方式有三个问题:一是连接信息散落在各处,二是测试时必须真正连接 MinIO,三是切换环境(dev → prod)需要修改代码。

Resource 模式解决了这些问题——把"如何连接"与"用连接做什么"彻底分离。

4.2 定义一个 MinIO Resource

# finlakehouse_pipeline/resources/storage.py

from dagster import ConfigurableResource
import boto3
from botocore.client import BaseClient
from typing import Optional


class MinIOResource(ConfigurableResource):
    """
    封装 MinIO(S3 兼容)连接的 Dagster Resource。
    ConfigurableResource 让参数可以通过 YAML 配置文件或环境变量注入,
    而不是硬编码在代码里。
    """
    
    endpoint_url: str           # e.g., "http://minio:9000"
    access_key: str
    secret_key: str
    region_name: str = "us-east-1"
    
    def get_client(self) -> BaseClient:
        """返回 boto3 S3 客户端"""
        return boto3.client(
            "s3",
            endpoint_url=self.endpoint_url,
            aws_access_key_id=self.access_key,
            aws_secret_access_key=self.secret_key,
            region_name=self.region_name,
        )
    
    def read_parquet(self, bucket: str, key: str) -> "pd.DataFrame":
        """从 MinIO 读取 Parquet 文件,返回 DataFrame"""
        import pandas as pd
        import io
        
        client = self.get_client()
        response = client.get_object(Bucket=bucket, Key=key)
        return pd.read_parquet(io.BytesIO(response["Body"].read()))
    
    def write_parquet(self, df: "pd.DataFrame", bucket: str, key: str) -> None:
        """将 DataFrame 写入 MinIO 的 Parquet 文件"""
        import io
        
        buffer = io.BytesIO()
        df.to_parquet(buffer, index=False)
        buffer.seek(0)
        
        client = self.get_client()
        client.put_object(Bucket=bucket, Key=key, Body=buffer)

4.3 在 Asset 中使用 Resource

# finlakehouse_pipeline/assets/bronze.py

from dagster import asset
from finlakehouse_pipeline.resources.storage import MinIOResource
import pandas as pd


@asset(
    description="从 MinIO Landing Zone 加载原始航班数据",
    group_name="bronze_layer",
)
def bronze_flights(
    context,
    minio: MinIOResource,  # Resource 通过参数名注入,与 Asset 依赖的注入方式相同!
) -> pd.DataFrame:
    """
    Dagster 如何区分这个参数是"Asset 依赖"还是"Resource 依赖"?
    答:通过 Definitions 中的注册信息——在 resources 字典里注册的就是 Resource,
    在 assets 列表里注册的就是 Asset。Dagster 在运行时自动路由。
    """
    context.log.info("从 MinIO 读取原始数据...")
    
    df = minio.read_parquet(
        bucket="landing-zone",
        key="flights/raw_flights.parquet"
    )
    
    context.add_output_metadata({"num_rows": len(df)})
    return df

4.4 注册 Resources

# finlakehouse_pipeline/__init__.py

from dagster import Definitions, EnvVar, load_assets_from_modules
from finlakehouse_pipeline.assets import bronze, silver
from finlakehouse_pipeline.resources.storage import MinIOResource

defs = Definitions(
    assets=load_assets_from_modules([bronze, silver]),
    resources={
        # "minio" 这个 key 必须与 Asset 参数名完全一致
        "minio": MinIOResource(
            endpoint_url=EnvVar("MINIO_ENDPOINT_URL"),  # 从环境变量读取
            access_key=EnvVar("MINIO_ACCESS_KEY"),
            secret_key=EnvVar("MINIO_SECRET_KEY"),
        ),
    },
)

EnvVar 是 Dagster 内置的环境变量读取器,它在运行时才解析,而不是在导入时——这意味着你可以安全地将代码提交到 Git,而不会泄露密钥。

4.5 为测试创建 Mock Resource

# finlakehouse_pipeline_tests/test_bronze.py

from finlakehouse_pipeline.assets.bronze import bronze_flights
from finlakehouse_pipeline.resources.storage import MinIOResource
from dagster import build_asset_context
import pandas as pd
from unittest.mock import MagicMock, patch


def test_bronze_flights_loads_data():
    """
    测试时,我们不需要真正的 MinIO 连接。
    只需要创建一个 Mock Resource,让它返回假数据。
    """
    # 创建 Mock Resource
    mock_minio = MagicMock(spec=MinIOResource)
    mock_minio.read_parquet.return_value = pd.DataFrame({
        "flight_id": ["FL001", "FL002"],
        "delay_minutes": [10, -5],
        "departure_time": ["2025-01-01 08:00", "2025-01-01 09:00"],
    })
    
    # 构建测试用的 context
    context = build_asset_context()
    
    # 直接调用 Asset 函数(无需启动 Dagster 服务!)
    result = bronze_flights(context=context, minio=mock_minio)
    
    assert isinstance(result, pd.DataFrame)
    assert len(result) == 2
    mock_minio.read_parquet.assert_called_once_with(
        bucket="landing-zone",
        key="flights/raw_flights.parquet"
    )

这就是 Dagster 可测试性的精髓:Asset 逻辑和外部依赖完全解耦,单元测试不依赖任何基础设施。


<a name="chapter-5"></a>

Chapter 5:核心概念 III——Jobs、Ops 与 Graphs

5.1 Asset Job:选择性物化资产子集

大多数时候,你不需要每次物化整个 Asset 图。define_asset_job 让你选择一个资产子集作为独立的 Job。

# finlakehouse_pipeline/jobs/__init__.py

from dagster import define_asset_job, AssetSelection

# 只物化 bronze 和 silver 层(例如,数据摄入阶段的 Job)
ingestion_job = define_asset_job(
    name="ingestion_job",
    description="从 Landing Zone 摄入数据到 Silver 层",
    # AssetSelection 提供强大的资产过滤语法
    selection=AssetSelection.groups("bronze_layer", "silver_layer"),
)

# 只物化 ML 相关的资产
ml_training_job = define_asset_job(
    name="ml_training_job",
    description="端到端 ML 训练 Pipeline",
    selection=AssetSelection.groups("gold_layer", "ml_layer"),
)

# 物化特定标签的资产
monitoring_job = define_asset_job(
    name="monitoring_job",
    selection=AssetSelection.tag("type", "monitor"),
)

5.2 Ops:更细粒度的任务单元

@op 是比 Asset 更低级的原语。当一个任务不产生持久化的数据资产时(比如发送 Slack 告警、触发外部 API),就用 Op。

# finlakehouse_pipeline/ops/notifications.py

from dagster import op, Out, In
import requests


@op(
    description="数据质量检查通过后发送 Slack 通知",
    ins={"validation_result": In(dict)},  # 显式声明输入类型
)
def notify_slack(context, validation_result: dict):
    """Op 不返回持久化的资产,只执行副作用(发送通知)"""
    if not validation_result["passed"]:
        message = f"⚠️ 数据质量检查失败:{validation_result['reason']}"
        # 发送 Slack webhook
        requests.post(
            context.op_config["webhook_url"],
            json={"text": message}
        )
        context.log.warning(message)
    else:
        context.log.info("✅ 数据质量检查通过")

5.3 Graphs:组合 Ops 构成工作流

当你需要将多个 Ops 组合成一个有向图时,使用 @graph

from dagster import graph, op, GraphDefinition

@op
def validate_data(context, df) -> dict:
    result = {"passed": True, "reason": ""}
    if len(df) < 100:
        result = {"passed": False, "reason": "数据量不足 100 行"}
    return result

@graph
def data_quality_workflow():
    """
    Graph 定义数据流:validate → notify
    注意:graph 里面用函数调用的方式连接 Ops,非常直观
    """
    validation_result = validate_data()  # 实际上不会在这里执行,只是声明依赖
    notify_slack(validation_result)

# 将 Graph 转成 Job 才能运行
quality_check_job = data_quality_workflow.to_job(name="quality_check_job")

何时用 Asset,何时用 Op/Graph? 一个简单的原则:如果你的任务产生了一个"可以被其他任务复用的数据结果",就用 Asset;如果只是执行某种操作(通知、校验、清理临时文件),就用 Op。


<a name="chapter-6"></a>

Chapter 6:核心概念 IV——Partitions(分区)

6.1 什么是分区?

分区是 Dagster 中处理时间序列数据的核心机制。假设你每天都有新的航班数据,分区让你能够:

  • 单独物化某一天的数据,而不是重新处理所有历史数据。
  • 追踪哪些日期的数据已经处理,哪些还没有。
  • 并行处理多个日期的数据。

6.2 定义时间分区

# finlakehouse_pipeline/assets/partitioned.py

from dagster import asset, DailyPartitionsDefinition, MonthlyPartitionsDefinition
import pandas as pd

# 定义按天分区,从 2025-07-01 开始
daily_partitions = DailyPartitionsDefinition(start_date="2025-07-01")

@asset(
    partitions_def=daily_partitions,
    description="按天分区的航班 Bronze 数据",
    group_name="bronze_layer",
)
def bronze_flights_partitioned(context) -> pd.DataFrame:
    """
    context.partition_key 包含当前正在处理的分区值。
    对于 DailyPartitionsDefinition,它的格式是 "YYYY-MM-DD"。
    """
    partition_date = context.partition_key
    context.log.info(f"正在处理日期:{partition_date}")
    
    # 根据分区日期读取对应的数据文件
    df = pd.read_parquet(f"/data/landing/flights/{partition_date}.parquet")
    
    context.add_output_metadata({
        "partition_date": partition_date,
        "num_rows": len(df),
    })
    
    return df


@asset(
    partitions_def=daily_partitions,
    description="按天分区的 Silver 层航班数据",
    group_name="silver_layer",
)
def silver_flights_partitioned(
    context,
    bronze_flights_partitioned: pd.DataFrame,  # 同一分区的 Bronze 数据会自动传入
) -> pd.DataFrame:
    """
    当上游和下游都有相同的分区定义时,Dagster 自动匹配:
    处理 2025-01-15 的 silver 时,会读取 2025-01-15 的 bronze。
    """
    df = bronze_flights_partitioned.copy()
    df = df.dropna()
    return df

6.3 跨分区聚合:PartitionMapping

有时你需要把多天的数据聚合成一个月的报告:

from dagster import asset, MonthlyPartitionsDefinition, TimeWindowPartitionMapping

monthly_partitions = MonthlyPartitionsDefinition(start_date="2025-07-01")

@asset(
    partitions_def=monthly_partitions,
    ins={
        "silver_flights_partitioned": AssetIn(
            partition_mapping=TimeWindowPartitionMapping(
                start_offset=-1,  # 从月初
                end_offset=0,     # 到月末
            )
        )
    },
    description="月度航班汇总报告",
)
def monthly_flight_report(context, silver_flights_partitioned: pd.DataFrame) -> dict:
    """
    这里的 silver_flights_partitioned 是当前月份所有天的数据合并后的 DataFrame。
    Dagster 自动完成跨分区的数据合并。
    """
    return {
        "month": context.partition_key,
        "total_flights": len(silver_flights_partitioned),
        "avg_delay": silver_flights_partitioned["delay_minutes"].mean(),
    }

6.4 在 UI 中操作分区

在 Dagster UI 中,选择分区 Asset 后,你会看到一个"分区热力图"——每个格子代表一个日期分区,绿色表示已成功物化,红色表示失败,灰色表示未运行。你可以点击任意格子单独重跑或查看日志。


<a name="chapter-7"></a>

Chapter 7:核心概念 V——Schedules 与 Sensors

7.1 Schedule:基于时间的触发

# finlakehouse_pipeline/schedules/__init__.py

from dagster import ScheduleDefinition, build_schedule_from_partitioned_job

# 简单方式:直接从分区 Job 构建 Schedule
daily_ingestion_schedule = build_schedule_from_partitioned_job(
    job=ingestion_job,        # 必须是分区 Job
    hour_of_day=2,            # 每天凌晨 2 点运行
    minute_of_hour=30,        # 2:30 AM
    description="每日凌晨 2:30 摄入前一天的航班数据",
)

# 高级方式:使用 Cron 表达式
from dagster import schedule

@schedule(
    cron_schedule="0 6 * * MON",  # 每周一早上 6 点
    job=ml_training_job,
    execution_timezone="Europe/Berlin",  # 法兰克福时区
)
def weekly_model_retrain_schedule(context):
    """
    Schedule 函数可以根据时间动态生成 run_config(运行配置)。
    比如,每次训练时使用不同的超参数。
    """
    # context.scheduled_execution_time 包含计划执行时间
    run_date = context.scheduled_execution_time.strftime("%Y-%m-%d")
    
    return {
        "ops": {
            "train_model": {
                "config": {
                    "training_date": run_date,
                    "max_epochs": 50,
                }
            }
        }
    }

7.2 Sensor:基于事件的触发

Sensor 持续轮询某个外部状态,当条件满足时触发 Job 运行。

from dagster import sensor, RunRequest, SkipReason, SensorEvaluationContext
import boto3
import json


@sensor(
    job=ingestion_job,
    minimum_interval_seconds=60,  # 每 60 秒检查一次
    description="监听 MinIO Landing Zone,有新文件时触发摄入",
)
def minio_landing_sensor(context: SensorEvaluationContext):
    """
    Sensor 的工作原理:
    1. 每 60 秒被 Dagster Daemon 调用一次
    2. 函数检查外部状态(MinIO 中是否有新文件)
    3. 如果有新文件,yield RunRequest 触发 Job
    4. 如果没有,yield SkipReason 或什么都不做
    """
    
    s3 = boto3.client("s3", endpoint_url="http://minio:9000", ...)
    
    # cursor 是 Sensor 的状态,用于记录上次处理到哪里
    # 注意:cursor 在每次 Sensor 运行之间是持久化的
    last_processed_key = context.cursor or ""
    
    response = s3.list_objects_v2(
        Bucket="landing-zone",
        Prefix="flights/",
    )
    
    new_files = []
    for obj in response.get("Contents", []):
        if obj["Key"] > last_processed_key:
            new_files.append(obj["Key"])
    
    if not new_files:
        yield SkipReason("Landing Zone 中没有新文件")
        return
    
    # 为每个新文件触发一次 Job 运行
    for file_key in sorted(new_files):
        # 从文件名提取日期作为分区键(假设文件名格式为 flights/2025-01-15.parquet)
        partition_date = file_key.split("/")[1].replace(".parquet", "")
        
        yield RunRequest(
            run_key=file_key,         # run_key 确保同一文件不会重复触发
            partition_key=partition_date,
            run_config={},
            tags={"source_file": file_key},
        )
    
    # 更新 cursor 到最新处理的文件
    context.update_cursor(sorted(new_files)[-1])

Sensor 的 run_key 机制非常重要:Dagster 保证同一个 run_keyRunRequest 只会被执行一次,即使 Sensor 多次检测到同一个文件,也不会重复触发。这是一种内置的幂等性保障。


<a name="chapter-8"></a>

Chapter 8:核心概念 VI——IO Managers

8.1 IO Manager 的作用

默认情况下,Dagster 把 Asset 的输出在内存中传递给下游 Asset。这在单机运行时没问题,但在分布式或大数据场景中,你需要把数据持久化到存储系统(MinIO、Delta Lake、数据库等)。

IO Manager 定义了"如何存储和加载 Asset 的输出",让 Asset 代码本身不需要关心存储细节。

8.2 实现一个 Delta Lake IO Manager

# finlakehouse_pipeline/resources/io_managers.py

from dagster import IOManager, io_manager, InputContext, OutputContext
import pandas as pd
from deltalake import write_deltalake, DeltaTable
from typing import Any


class DeltaLakeIOManager(IOManager):
    """
    自定义 IO Manager:将 Asset 输出写入 Delta Lake 格式的 MinIO 存储,
    并在 Asset 读取时从 Delta Lake 加载。
    
    这样一来,所有 Asset 的数据自动具备:
    - ACID 事务(Delta Lake 保证)
    - Time travel(可以查询任意历史版本)
    - Schema evolution
    """
    
    def __init__(self, base_path: str, storage_options: dict):
        self.base_path = base_path  # e.g., "s3://lakehouse"
        self.storage_options = storage_options  # MinIO 连接配置
    
    def _get_path(self, context: OutputContext | InputContext) -> str:
        """根据 Asset 的层级和名称构造存储路径"""
        # Asset key 是一个元组,例如 ("bronze_layer", "bronze_flights")
        return f"{self.base_path}/{'/'.join(context.asset_key.path)}"
    
    def handle_output(self, context: OutputContext, obj: Any):
        """当 Asset 产生输出时,这个方法被调用"""
        if not isinstance(obj, pd.DataFrame):
            raise ValueError(f"DeltaLakeIOManager 只支持 pd.DataFrame,收到了 {type(obj)}")
        
        path = self._get_path(context)
        context.log.info(f"将 DataFrame 写入 Delta Lake:{path}")
        
        write_deltalake(
            path,
            obj,
            mode="overwrite",
            storage_options=self.storage_options,
        )
        
        # 记录写入 metadata
        context.add_output_metadata({
            "delta_table_path": path,
            "num_rows_written": len(obj),
            "schema": str(obj.dtypes.to_dict()),
        })
    
    def load_input(self, context: InputContext) -> pd.DataFrame:
        """当下游 Asset 需要读取上游输出时,这个方法被调用"""
        path = self._get_path(context)
        context.log.info(f"从 Delta Lake 加载数据:{path}")
        
        dt = DeltaTable(path, storage_options=self.storage_options)
        return dt.to_pandas()


@io_manager(
    config_schema={
        "base_path": str,
        "endpoint_url": str,
        "access_key": str,
        "secret_key": str,
    }
)
def delta_lake_io_manager(context):
    """IO Manager 的工厂函数,从配置中读取参数"""
    cfg = context.resource_config
    storage_options = {
        "endpoint_url": cfg["endpoint_url"],
        "aws_access_key_id": cfg["access_key"],
        "aws_secret_access_key": cfg["secret_key"],
    }
    return DeltaLakeIOManager(
        base_path=cfg["base_path"],
        storage_options=storage_options,
    )

8.3 注册 IO Manager

# finlakehouse_pipeline/__init__.py

defs = Definitions(
    assets=load_assets_from_modules([bronze, silver]),
    resources={
        "io_manager": delta_lake_io_manager.configured({
            "base_path": "s3://lakehouse",
            "endpoint_url": "http://minio:9000",
            "access_key": "minioadmin",
            "secret_key": "minioadmin",
        }),
        "minio": MinIOResource(...),
    },
)

一旦注册了 io_manager,所有 Asset 的输出会自动通过它写入 Delta Lake,输入会自动从 Delta Lake 读取,而无需在每个 Asset 函数里写任何存储代码。这就是关注点分离的极致


<a name="chapter-9"></a>

Chapter 9:测试策略——让 Pipeline 可验证

9.1 三层测试策略

生产级 Dagster 项目应该有三层测试:

第一层:单元测试(快速,不依赖基础设施)

  • 测试每个 Asset 的业务逻辑
  • 用 Mock 替换所有 Resource 和 IO Manager

第二层:集成测试(中等速度,依赖本地容器)

  • 使用 docker-compose 启动 MinIO、PostgreSQL 等服务
  • 测试完整的数据流

第三层:端到端测试(慢速,仅在 CI/CD 中运行)

  • 在测试环境中运行真实的 Job

9.2 单元测试示例

# finlakehouse_pipeline_tests/test_silver.py

import pytest
import pandas as pd
from dagster import build_asset_context, materialize
from finlakehouse_pipeline.assets.silver import silver_flights


@pytest.fixture
def raw_flights_df():
    """测试用的 fixture DataFrame"""
    return pd.DataFrame({
        "flight_id": ["FL001", "FL002", "FL003", None, "FL005"],
        "delay_minutes": [10, -5, 200, 15, -50],  # -50 应该被过滤
        "departure_time": ["2025-01-01 08:00"] * 5,
        "arrival_time": ["2025-01-01 10:00"] * 5,
    })


def test_silver_removes_null_flight_id(raw_flights_df):
    """测试:flight_id 为 null 的行应该被移除"""
    context = build_asset_context()
    result = silver_flights(context=context, bronze_flights=raw_flights_df)
    assert result["flight_id"].notna().all()


def test_silver_filters_extreme_negative_delay(raw_flights_df):
    """测试:delay_minutes < -30 的行应该被过滤"""
    context = build_asset_context()
    result = silver_flights(context=context, bronze_flights=raw_flights_df)
    assert (result["delay_minutes"] >= -30).all()


def test_silver_metadata_recorded(raw_flights_df):
    """测试:metadata 是否正确记录(通过检查 context 的调用)"""
    context = build_asset_context()
    silver_flights(context=context, bronze_flights=raw_flights_df)
    # 验证没有抛出异常就算通过(metadata 验证比较复杂,可以通过日志检查)

9.3 使用 materialize 进行集成测试

from dagster import materialize
from finlakehouse_pipeline.assets.bronze import bronze_flights
from finlakehouse_pipeline.assets.silver import silver_flights
from unittest.mock import MagicMock, patch


def test_full_bronze_to_silver_pipeline():
    """
    `materialize` 函数让你在测试中运行真实的 Dagster 物化逻辑,
    包括 IO Manager 的调用,但可以注入 Mock Resource。
    """
    mock_minio = MagicMock()
    mock_minio.read_parquet.return_value = pd.DataFrame({
        "flight_id": ["FL001", "FL002"],
        "delay_minutes": [10, 5],
        "departure_time": ["2025-01-01 08:00", "2025-01-01 09:00"],
        "arrival_time": ["2025-01-01 10:00", "2025-01-01 11:00"],
    })
    
    result = materialize(
        assets=[bronze_flights, silver_flights],
        resources={"minio": mock_minio},
    )
    
    assert result.success
    # 检查 silver_flights 的输出
    silver_df = result.output_for_node("silver_flights")
    assert isinstance(silver_df, pd.DataFrame)
    assert len(silver_df) == 2

<a name="chapter-10"></a>

Chapter 10:与 MLflow 集成——ML Pipeline 实战

10.1 MLflow Resource

# finlakehouse_pipeline/resources/mlflow_resource.py

from dagster import ConfigurableResource
import mlflow
from typing import Optional


class MLflowResource(ConfigurableResource):
    """封装 MLflow 连接和实验管理"""
    
    tracking_uri: str          # e.g., "http://mlflow:5000"
    default_experiment: str = "finlakehouse"
    
    def model_config(self) -> dict:
        # Pydantic v2 required override
        return {"arbitrary_types_allowed": True}
    
    def setup_for_execution(self, context) -> None:
        """在执行前设置 MLflow 连接"""
        mlflow.set_tracking_uri(self.tracking_uri)
        mlflow.set_experiment(self.default_experiment)
    
    def start_run(self, run_name: str, tags: Optional[dict] = None):
        """启动一个 MLflow 实验运行"""
        return mlflow.start_run(run_name=run_name, tags=tags)
    
    def log_model(self, model, artifact_path: str, registered_model_name: str):
        """注册模型到 MLflow Model Registry"""
        mlflow.sklearn.log_model(
            model,
            artifact_path=artifact_path,
            registered_model_name=registered_model_name,
        )

10.2 ML Training Asset

# finlakehouse_pipeline/assets/ml.py

from dagster import asset, Output, MetadataValue
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import pandas as pd
import json


@asset(
    description="训练航班延误预测模型并注册到 MLflow",
    group_name="ml_layer",
    tags={"type": "ml_model", "framework": "sklearn"},
)
def flight_delay_model(
    context,
    gold_flight_features: pd.DataFrame,
    mlflow_resource: MLflowResource,
) -> dict:
    """
    ML 训练 Asset:
    1. 接收 Gold 层特征数据
    2. 训练模型
    3. 记录实验到 MLflow
    4. 返回模型信息(而不是模型本身,因为模型注册在 MLflow)
    """
    
    feature_cols = ["departure_hour", "is_weekend", "origin_airport_code"]
    target_col = "delay_category"
    
    X = pd.get_dummies(gold_flight_features[feature_cols])
    y = gold_flight_features[target_col]
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    # 启动 MLflow 实验运行
    with mlflow_resource.start_run(
        run_name=f"gbm_training_{context.run_id[:8]}",
        tags={"dagster_run_id": context.run_id, "data_version": "v1"},
    ) as run:
        
        # 定义超参数
        params = {"n_estimators": 100, "max_depth": 4, "learning_rate": 0.1}
        mlflow.log_params(params)
        
        # 训练模型
        model = GradientBoostingClassifier(**params)
        model.fit(X_train, y_train)
        
        # 评估模型
        y_pred = model.predict(X_test)
        report = classification_report(y_test, y_pred, output_dict=True)
        
        # 记录指标
        mlflow.log_metric("accuracy", report["accuracy"])
        mlflow.log_metric("f1_macro", report["macro avg"]["f1-score"])
        
        # 注册模型
        mlflow_resource.log_model(
            model=model,
            artifact_path="flight_delay_model",
            registered_model_name="flight_delay_predictor",
        )
        
        run_id = run.info.run_id
    
    # 记录到 Dagster metadata
    context.add_output_metadata({
        "mlflow_run_id": run_id,
        "accuracy": report["accuracy"],
        "f1_macro": report["macro avg"]["f1-score"],
        "model_name": "flight_delay_predictor",
        "classification_report": MetadataValue.json(report),
    })
    
    return {
        "mlflow_run_id": run_id,
        "model_name": "flight_delay_predictor",
        "accuracy": report["accuracy"],
    }


@asset(
    description="将最佳模型提升到生产阶段",
    group_name="ml_layer",
    tags={"type": "governance"},
)
def promoted_model(
    context,
    flight_delay_model: dict,  # 依赖训练 Asset
    mlflow_resource: MLflowResource,
) -> dict:
    """
    模型治理 Asset:根据指标决定是否将模型提升到 Production 阶段。
    这个模式是 EU AI Act 中"人工审核"要求的体现——模型不会自动上线,
    需要满足阈值条件才能提升。
    """
    
    ACCURACY_THRESHOLD = 0.75  # 生产门槛
    
    if flight_delay_model["accuracy"] < ACCURACY_THRESHOLD:
        context.log.warning(
            f"模型准确率 {flight_delay_model['accuracy']:.3f} 未达到"
            f"生产阈值 {ACCURACY_THRESHOLD},跳过提升"
        )
        return {"status": "rejected", "reason": "accuracy_below_threshold"}
    
    # 将模型版本提升到 Production
    mlflow.set_tracking_uri(mlflow_resource.tracking_uri)
    client = mlflow.tracking.MlflowClient()
    
    # 获取最新版本号
    versions = client.get_latest_versions("flight_delay_predictor", stages=["None"])
    latest_version = versions[0].version
    
    client.transition_model_version_stage(
        name="flight_delay_predictor",
        version=latest_version,
        stage="Production",
        archive_existing_versions=True,  # 将旧的 Production 版本归档
    )
    
    context.add_output_metadata({
        "promoted_version": latest_version,
        "accuracy": flight_delay_model["accuracy"],
        "mlflow_run_id": flight_delay_model["mlflow_run_id"],
    })
    
    return {
        "status": "promoted",
        "version": latest_version,
        "model_name": "flight_delay_predictor",
    }

<a name="chapter-11"></a>

Chapter 11:与 Spark / Delta Lake 集成

11.1 Spark Session Resource

# finlakehouse_pipeline/resources/spark_resource.py

from dagster import ConfigurableResource
from pyspark.sql import SparkSession
from typing import Optional


class SparkResource(ConfigurableResource):
    """
    封装 PySpark Session 的 Dagster Resource。
    在分布式场景中(如 Kubernetes),每个 Asset 可以获得自己的 Spark Session。
    """
    
    app_name: str = "FinLakehouse"
    master: str = "local[*]"          # 生产中替换为 "spark://spark-master:7077"
    minio_endpoint: str = "http://minio:9000"
    minio_access_key: str = "minioadmin"
    minio_secret_key: str = "minioadmin"
    
    def get_session(self) -> SparkSession:
        """
        创建并返回配置好 S3A(MinIO)连接的 SparkSession。
        注意:在实际使用中,应该考虑 Session 的复用,避免频繁创建。
        """
        return (
            SparkSession.builder
            .appName(self.app_name)
            .master(self.master)
            # Delta Lake 支持
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
            # MinIO S3A 配置
            .config("spark.hadoop.fs.s3a.endpoint", self.minio_endpoint)
            .config("spark.hadoop.fs.s3a.access.key", self.minio_access_key)
            .config("spark.hadoop.fs.s3a.secret.key", self.minio_secret_key)
            .config("spark.hadoop.fs.s3a.path.style.access", "true")
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
            .getOrCreate()
        )

11.2 使用 Spark 处理大规模数据的 Asset

# finlakehouse_pipeline/assets/spark_assets.py

from dagster import asset
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql import functions as F


@asset(
    description="使用 Spark 从 Delta Lake 读取并进行大规模特征计算",
    group_name="feature_layer",
    tags={"engine": "spark"},
)
def spark_flight_features(
    context,
    spark: SparkResource,  # 注入 Spark Resource
) -> None:
    """
    注意:这个 Asset 返回 None(或写入路径),而不是 DataFrame。
    因为 Spark DataFrame 不能序列化后传递给其他 Asset。
    它直接写入 Delta Lake,下游 Asset 再从 Delta Lake 读取。
    """
    
    session = spark.get_session()
    
    # 从 Delta Lake 读取 Silver 层数据
    silver_df = session.read.format("delta").load("s3a://lakehouse/silver_flights")
    
    context.log.info(f"从 Silver 层读取了 {silver_df.count()} 行")
    
    # 大规模特征工程(利用 Spark 分布式计算)
    feature_df = (
        silver_df
        .withColumn("departure_hour", F.hour(F.col("departure_time")))
        .withColumn("is_weekend", F.dayofweek(F.col("departure_time")).isin([1, 7]))
        .withColumn("delay_bucket", 
            F.when(F.col("delay_minutes") <= 0, "on_time")
             .when(F.col("delay_minutes") <= 15, "minor")
             .when(F.col("delay_minutes") <= 60, "moderate")
             .otherwise("severe")
        )
        # 滑动窗口特征:过去 7 天同一航线的平均延误
        .withColumn("route", F.concat(F.col("origin"), F.lit("-"), F.col("destination")))
    )
    
    # 写入 Delta Lake(覆盖写入)
    (
        feature_df
        .write
        .format("delta")
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .save("s3a://lakehouse/feature_flights")
    )
    
    context.add_output_metadata({
        "rows_written": feature_df.count(),
        "output_path": "s3a://lakehouse/feature_flights",
    })
    
    session.stop()

<a name="chapter-12"></a>

Chapter 12:Metadata、Lineage 与可观测性

12.1 丰富的 Metadata 类型

Dagster 支持多种 Metadata 类型,让你的资产详情页变成一个内置的数据文档:

from dagster import asset, MetadataValue
import pandas as pd
import matplotlib.pyplot as plt
import io


@asset
def documented_asset(context) -> pd.DataFrame:
    df = pd.DataFrame({"delay": range(100), "count": range(100)})
    
    # 生成可视化图表并嵌入 metadata
    fig, ax = plt.subplots()
    df.plot(ax=ax, x="delay", y="count")
    img_buffer = io.BytesIO()
    fig.savefig(img_buffer, format="png")
    img_buffer.seek(0)
    
    context.add_output_metadata({
        # 纯文本
        "description": "航班延误分布数据",
        
        # 数值(可以在 UI 中追踪趋势)
        "num_rows": MetadataValue.int(len(df)),
        "avg_delay": MetadataValue.float(df["delay"].mean()),
        
        # Markdown(支持富文本展示)
        "data_quality_report": MetadataValue.md(
            "## 数据质量报告\n"
            f"- 总行数:{len(df)}\n"
            f"- 缺失值:0\n"
            f"- 最大延误:{df['delay'].max()} 分钟\n"
        ),
        
        # 数据预览(表格形式)
        "preview": MetadataValue.md(df.head(5).to_markdown()),
        
        # 图表(PNG 图片)
        "delay_distribution": MetadataValue.png(img_buffer.read()),
        
        # 外部链接(跳转到 MinIO 控制台)
        "storage_location": MetadataValue.url(
            "http://minio:9001/browser/lakehouse/silver_flights"
        ),
        
        # JSON 对象(展示复杂结构)
        "schema": MetadataValue.json({
            "columns": df.dtypes.astype(str).to_dict()
        }),
    })
    
    return df

12.2 数据血缘(Lineage)的自动追踪

Dagster 自动追踪 Asset 之间的依赖关系,形成完整的数据血缘图。你可以在 UI 的 Asset Graph 视图中看到:

  • 哪些 Asset 是某个 Asset 的上游数据源
  • 如果某个 Asset 物化失败,哪些下游 Asset 会受到影响
  • 每个 Asset 的最近物化时间和状态

这对于合规审计(BaFin 要求能够追溯模型决策依据)非常有价值。

12.3 Fresh Policy:数据新鲜度监控

from dagster import asset, FreshnessPolicy
from datetime import timedelta

@asset(
    # 声明:这个 Asset 应该每天至少更新一次,且不超过 30 分钟的延迟
    freshness_policy=FreshnessPolicy(
        maximum_lag_minutes=30,
        cron_schedule="0 3 * * *",  # 期望在每天 03:00 完成更新
    ),
    description="每日航班数据(如果超时未更新,UI 会显示警告)",
)
def daily_bronze_flights(context) -> None:
    ...

当 Asset 的最后物化时间超过设定的 maximum_lag_minutes 时,Dagster UI 会在该 Asset 上显示橙色警告,帮助你主动发现数据管道问题。


<a name="chapter-13"></a>

Chapter 13:生产部署——Docker + Dagster Daemon

13.1 Dagster 的进程架构

生产部署的 Dagster 由四个核心进程组成:

Dagster Webserver:提供 UI 和 GraphQL API,是用户与 Dagster 交互的门户。它是无状态的,可以水平扩展。

Dagster Daemon:后台守护进程,负责执行 Schedule 和 Sensor 的轮询、以及 Backfill 任务的调度。它是有状态的,同一时间只能运行一个实例。

Dagster Instance:Dagster 的"数据库",存储所有运行历史、Asset 物化记录、Schedule 状态等。通常使用 PostgreSQL。

Code Server:运行你的用户代码(Asset、Resource、Job 定义)的进程。与 Webserver 和 Daemon 隔离,代码更新不需要重启 Dagster 服务。

13.2 Docker Compose 配置

# docker-compose.yml

version: "3.8"

services:
  # Dagster 实例数据库(存储运行历史等)
  dagster_postgres:
    image: postgres:15
    environment:
      POSTGRES_USER: dagster
      POSTGRES_PASSWORD: dagster
      POSTGRES_DB: dagster
    volumes:
      - dagster_postgres_data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "dagster"]
      interval: 5s
      timeout: 5s
      retries: 5

  # Dagster WebServer(UI)
  dagster_webserver:
    image: dagster/dagster-k8s:latest  # 或自定义镜像
    entrypoint: ["dagster-webserver", "-h", "0.0.0.0", "-p", "3000", "-w", "/opt/dagster/workspace.yaml"]
    ports:
      - "3000:3000"
    environment:
      DAGSTER_POSTGRES_USER: dagster
      DAGSTER_POSTGRES_PASSWORD: dagster
      DAGSTER_POSTGRES_DB: dagster
      DAGSTER_POSTGRES_HOST: dagster_postgres
    depends_on:
      dagster_postgres:
        condition: service_healthy
    volumes:
      - ./workspace.yaml:/opt/dagster/workspace.yaml

  # Dagster Daemon(调度器、Sensor 轮询)
  dagster_daemon:
    image: dagster/dagster-k8s:latest
    entrypoint: ["dagster-daemon", "run"]
    environment:
      DAGSTER_POSTGRES_USER: dagster
      DAGSTER_POSTGRES_PASSWORD: dagster
      DAGSTER_POSTGRES_DB: dagster
      DAGSTER_POSTGRES_HOST: dagster_postgres
    depends_on:
      dagster_postgres:
        condition: service_healthy
    volumes:
      - ./workspace.yaml:/opt/dagster/workspace.yaml

  # 你的用户代码服务(Code Server)
  finlakehouse_pipeline:
    build:
      context: .
      dockerfile: Dockerfile
    environment:
      MINIO_ENDPOINT_URL: http://minio:9000
      MINIO_ACCESS_KEY: minioadmin
      MINIO_SECRET_KEY: minioadmin
      MLFLOW_TRACKING_URI: http://mlflow:5000
    ports:
      - "4000:4000"  # gRPC 端口,供 Webserver 和 Daemon 调用

volumes:
  dagster_postgres_data:

13.3 Workspace 配置

# workspace.yaml
# 告诉 Dagster 在哪里找到用户代码

load_from:
  - grpc_server:
      host: finlakehouse_pipeline
      port: 4000
      location_name: "finlakehouse"

13.4 用户代码的 Dockerfile

FROM python:3.11-slim

WORKDIR /opt/app

# 安装依赖
COPY requirements.txt .
RUN pip install -r requirements.txt

# 复制代码
COPY finlakehouse_pipeline/ finlakehouse_pipeline/
COPY setup.py .
RUN pip install -e .

# 启动 gRPC Code Server(不是 Webserver)
CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4000", \
     "--python-file", "finlakehouse_pipeline/__init__.py"]

13.5 Dagster Instance 配置

# dagster.yaml(放在项目根目录或 $DAGSTER_HOME 指向的目录)

storage:
  postgres:
    postgres_url: "postgresql://dagster:dagster@dagster_postgres:5432/dagster"

run_coordinator:
  module: dagster.core.run_coordinator
  class: QueuedRunCoordinator
  config:
    max_concurrent_runs: 5  # 同时最多运行 5 个 Job

run_launcher:
  module: dagster.core.launcher
  class: DefaultRunLauncher

# 日志配置
python_logs:
  python_log_level: INFO

<a name="chapter-14"></a>

Chapter 14:FinLakehouse 综合案例

14.1 完整 Pipeline 设计

以下是将 FinLakehouse 的完整 LakeFlow 映射到 Dagster Asset 的设计方案:

Landing Zone(外部)
    ↓ [Sensor: minio_landing_sensor]
Bronze Layer
    bronze_raw_flights          ← 原始 CSV 数据
    bronze_raw_weather          ← 原始天气数据
    ↓
Silver Layer
    silver_flights              ← 清洗、去重、类型转换
    silver_weather              ← 清洗天气数据
    ↓
Gold Layer
    gold_flight_weather_joined  ← 航班 + 天气 JOIN
    ↓
Feature Layer
    feature_flight_delay        ← ML 特征工程(Spark)
    ↓
ML Layer
    flight_delay_model          ← 模型训练 + MLflow 注册
    promoted_model              ← 模型治理 / 提升
    ↓
Serving Layer
    model_serving_config        ← 生成 BentoML 配置文件
    ↓
Monitoring Layer
    drift_detection_report      ← 数据漂移检测(每天运行)
    model_performance_report    ← 模型性能监控

14.2 完整的 Definitions 注册

# finlakehouse_pipeline/__init__.py

from dagster import Definitions, EnvVar, load_assets_from_modules

# 导入所有 Asset 模块
from finlakehouse_pipeline.assets import (
    bronze, silver, gold, features, ml_assets, monitoring
)

# 导入所有 Resource
from finlakehouse_pipeline.resources.storage import MinIOResource
from finlakehouse_pipeline.resources.mlflow_resource import MLflowResource
from finlakehouse_pipeline.resources.spark_resource import SparkResource
from finlakehouse_pipeline.resources.io_managers import delta_lake_io_manager

# 导入 Jobs
from finlakehouse_pipeline.jobs import (
    ingestion_job, ml_training_job, monitoring_job
)

# 导入 Schedules
from finlakehouse_pipeline.schedules import (
    daily_ingestion_schedule, weekly_model_retrain_schedule
)

# 导入 Sensors
from finlakehouse_pipeline.sensors import minio_landing_sensor


defs = Definitions(
    # 所有 Asset
    assets=load_assets_from_modules([
        bronze, silver, gold, features, ml_assets, monitoring
    ]),
    
    # 所有 Resource(包括 IO Manager)
    resources={
        "io_manager": delta_lake_io_manager.configured({
            "base_path": "s3://lakehouse",
            "endpoint_url": EnvVar("MINIO_ENDPOINT_URL"),
            "access_key": EnvVar("MINIO_ACCESS_KEY"),
            "secret_key": EnvVar("MINIO_SECRET_KEY"),
        }),
        "minio": MinIOResource(
            endpoint_url=EnvVar("MINIO_ENDPOINT_URL"),
            access_key=EnvVar("MINIO_ACCESS_KEY"),
            secret_key=EnvVar("MINIO_SECRET_KEY"),
        ),
        "mlflow_resource": MLflowResource(
            tracking_uri=EnvVar("MLFLOW_TRACKING_URI"),
        ),
        "spark": SparkResource(
            master=EnvVar("SPARK_MASTER"),
            minio_endpoint=EnvVar("MINIO_ENDPOINT_URL"),
            minio_access_key=EnvVar("MINIO_ACCESS_KEY"),
            minio_secret_key=EnvVar("MINIO_SECRET_KEY"),
        ),
    },
    
    # 所有 Job
    jobs=[ingestion_job, ml_training_job, monitoring_job],
    
    # 所有 Schedule
    schedules=[daily_ingestion_schedule, weekly_model_retrain_schedule],
    
    # 所有 Sensor
    sensors=[minio_landing_sensor],
)

14.3 监管合规要点

在受监管行业(BaFin、EU AI Act)中,Dagster 可以提供以下内置的合规支持:

审计追踪:每次 Asset 物化都记录了完整的运行 ID、时间戳、输入参数和输出 metadata,形成不可篡改的审计日志(存储在 PostgreSQL 中)。

模型血缘:从原始数据到模型决策的完整数据链路可以在 Asset Graph 中一键可视化,满足"可解释性"要求。

模型治理promoted_model Asset 实现了"无法自动上线"的门控机制,只有满足准确率阈值的模型才能进入生产,体现了人工审核节点。

回滚能力:Delta Lake 的 Time Travel + MLflow 的版本管理,允许在发现问题时将数据和模型同时回滚到指定时间点。


附录:常用命令速查

# 开发环境
dagster dev                                    # 启动开发服务器
dagster asset materialize -m my_module         # 命令行物化所有资产
dagster asset materialize -m my_module --select bronze_flights  # 物化指定资产
dagster job execute -m my_module -j ingestion_job  # 执行指定 Job

# 分区操作
dagster asset materialize -m my_module --select bronze_flights \
  --partition 2025-01-15                       # 物化指定分区

# 检查 Dagster 状态
dagster instance info                          # 查看实例配置
dagster schedule list                          # 列出所有 Schedule
dagster sensor list                            # 列出所有 Sensor
dagster sensor start minio_landing_sensor      # 启动 Sensor

# 生产部署检查
dagster-daemon health-check                    # 检查 Daemon 健康状态

学习路径建议

第一周:完成 Chapter 1-3,理解 Asset 的核心概念,手动搭建 bronze → silver → gold 的数据流。

第二周:完成 Chapter 4-5,实现 MinIO Resource 并将其注入 Asset,编写单元测试验证解耦效果。

第三周:完成 Chapter 6-7,为你的 Frankfurt Airport 税收时间预测数据实现按天分区,并添加 MinIO Landing Sensor。

第四周:完成 Chapter 8-10,实现 Delta Lake IO Manager 和 MLflow 集成,打通完整的 ML Pipeline。

第五周及以后:Chapter 11-14,接入 Spark 处理大规模特征工程,完成 Docker 生产部署,实现监管合规功能。


本手册版本:2025-02,适配 Dagster 1.7+