Dagster 完整训练手册
从零到生产:数据工程师与 AI 平台架构师的系统性学习路径
适用读者:有 Python 基础、了解数据管道概念、希望在生产级数据/ML 平台中使用 Dagster 的工程师。
学习目标:完成本手册后,你将能够独立设计、实现、测试并部署基于 Dagster 的数据资产管道,包括与 Spark、MinIO、MLflow 等工具的集成。
目录
- Dagster 的核心哲学:为什么不用 Airflow?
- 安装与项目初始化
- 核心概念 I:Software-Defined Assets(SDA)
- 核心概念 II:Resources(资源)
- 核心概念 III:Jobs、Ops 与 Graphs
- 核心概念 IV:Partitions(分区)
- 核心概念 V:Schedules 与 Sensors
- 核心概念 VI:IO Managers
- 测试策略:让 Pipeline 可验证
- 与 MLflow 集成:ML Pipeline 实战
- 与 Spark / Delta Lake 集成
- Metadata、Lineage 与可观测性
- 生产部署:Docker + Dagster Daemon
- FinLakehouse 综合案例
<a name="chapter-1"></a>
Chapter 1:Dagster 的核心哲学——为什么不用 Airflow?
1.1 任务调度 vs. 资产管理:思维层级的转变
Airflow 的世界观是任务(Task):你告诉系统"先运行 A,再运行 B,再运行 C"。这是一种过程导向的思维,关注的是动词——"做什么"。
Dagster 的世界观是资产(Asset):你告诉系统"我有数据资产 X,它依赖资产 Y 和 Z"。这是一种结果导向的思维,关注的是名词——"有什么"。
这个区别在实际工作中意味着:
- 在 Airflow 中,如果你想知道"bronze_flights 表里的数据是从哪里来的",你需要阅读多个 DAG 文件,逐步推断。
- 在 Dagster 中,你直接在 UI 上点击
bronze_flights资产,就能看到它的上游(原始 CSV)和下游(silver_flights)以及每次物化的 metadata。
高阶思维:招聘经理在评估平台架构师时,最看重的不是"你会用哪个工具",而是"你是否理解数据平台的本质是管理数据资产的生命周期"。Dagster 的资产理念与 Data Mesh、Data Contract 等现代架构思想天然契合,这是一个可以在面试中展开讲的核心论点。
1.2 Dagster 的核心设计原则
原则一:可测试性(Testability)。每一个 Asset、Resource、Op 都可以在不启动 Dagster 服务的情况下独立进行单元测试。这在 Airflow 中几乎是不可能的。
原则二:类型系统与 Metadata。Dagster 鼓励你为每个资产定义输入/输出的类型,并在运行时记录 metadata(行数、数据分布等),形成自动化的数据质量日志——这在受监管行业(BaFin、EU AI Act)中非常重要。
原则三:依赖注入(Resource Injection)。数据库连接、S3 客户端、Spark Session 等都作为 Resource 注入,测试时可以替换为 Mock,生产时切换为真实连接。这让代码从根本上解耦。
<a name="chapter-2"></a>
Chapter 2:安装与项目初始化
2.1 环境准备
建议使用 Python 3.10+,并用 venv 或 conda 隔离环境。
# 创建并激活虚拟环境
python -m venv dagster-env
source dagster-env/bin/activate # Linux/macOS
# 或 dagster-env\Scripts\activate # Windows
# 安装 Dagster 核心包
pip install dagster dagster-webserver
# 验证安装
dagster --version
2.2 用官方脚手架初始化项目
# 创建一个新项目(推荐方式,会生成标准目录结构)
dagster project scaffold --name finlakehouse_pipeline
cd finlakehouse_pipeline
pip install -e ".[dev]"
生成的目录结构如下:
finlakehouse_pipeline/
├── finlakehouse_pipeline/ # 主包
│ ├── __init__.py # 注册所有 Definitions
│ ├── assets/ # Asset 定义
│ │ └── __init__.py
│ ├── resources/ # Resource 定义
│ │ └── __init__.py
│ ├── jobs/ # Job 定义
│ │ └── __init__.py
│ └── schedules/ # Schedule 定义
│ └── __init__.py
├── finlakehouse_pipeline_tests/ # 测试目录
├── pyproject.toml # 包配置
└── setup.py
为什么要关注目录结构? 作为架构师,你的代码组织方式本身就是一种"架构表达"。按关注点分离(Assets / Resources / Jobs / Schedules)组织代码,使每个目录的职责清晰,是大型平台项目的必要做法。
2.3 启动 Dagster UI
# 启动开发服务器(会自动热加载代码变更)
dagster dev
打开 http://localhost:3000,你会看到 Dagster 的 Launchpad UI。目前它是空的,接下来我们逐步填充内容。
<a name="chapter-3"></a>
Chapter 3:核心概念 I——Software-Defined Assets(SDA)
3.1 你的第一个 Asset
Asset 是 Dagster 中最重要的概念。一个 Asset 代表一个持久化的数据对象——一张数据库表、一个 Parquet 文件、一个训练好的模型,等等。
# finlakehouse_pipeline/assets/bronze.py
from dagster import asset
import pandas as pd
@asset(
# 描述这个资产是什么(会显示在 UI 上)
description="从原始 CSV 文件加载的航班延误数据(Landing Zone → Bronze)",
# 给资产打标签,便于分组和过滤
tags={"layer": "bronze", "domain": "flights"},
# 计算资产所需的组(可选,用于多团队项目)
group_name="bronze_layer",
)
def bronze_flights(context) -> pd.DataFrame:
"""
这是 Asset 的实现函数。
函数名 `bronze_flights` 就是这个资产的 key(唯一标识符)。
返回值就是这个资产的"物化(materialization)"结果。
"""
# context 对象提供日志、配置等功能
context.log.info("开始从 Landing Zone 加载原始航班数据...")
df = pd.read_csv("/data/landing/flights_raw.csv")
# 记录 metadata:这些信息会出现在 Dagster UI 的资产详情页
context.add_output_metadata({
"num_rows": len(df),
"num_columns": len(df.columns),
"columns": df.columns.tolist(),
"sample": df.head(3).to_markdown(), # 需要 tabulate 包
})
context.log.info(f"成功加载 {len(df)} 行数据")
return df
理解这段代码的关键:@asset 装饰器把一个普通 Python 函数变成了一个"资产定义"。函数名是资产的名字,函数的返回值是资产的内容,函数的参数(除了 context)是它依赖的其他资产。
3.2 资产依赖:让 Dagster 自动推断 DAG
# finlakehouse_pipeline/assets/silver.py
from dagster import asset
import pandas as pd
@asset(
description="清洗后的航班数据(Bronze → Silver)",
group_name="silver_layer",
tags={"layer": "silver"},
)
def silver_flights(
context,
bronze_flights: pd.DataFrame, # 参数名 = 依赖的资产名!Dagster 自动建立依赖关系
) -> pd.DataFrame:
"""
注意:参数 `bronze_flights` 的类型是 pd.DataFrame。
Dagster 看到这个参数名与上面定义的 `bronze_flights` 资产匹配,
就会自动:
1. 先物化 bronze_flights
2. 把结果传给这个函数
这就是"依赖注入式"的 DAG 构建,无需手动写 `>>` 或 `set_upstream()`。
"""
context.log.info("开始数据清洗...")
df = bronze_flights.copy()
# 数据质量规则
original_rows = len(df)
df = df.dropna(subset=["flight_id", "departure_time", "arrival_time"])
df = df[df["delay_minutes"] >= -30] # 过滤异常负延误
# 标准化列名
df.columns = [col.lower().replace(" ", "_") for col in df.columns]
rows_removed = original_rows - len(df)
context.add_output_metadata({
"rows_input": original_rows,
"rows_output": len(df),
"rows_removed": rows_removed,
"removal_rate": f"{rows_removed/original_rows:.2%}",
})
return df
@asset(
description="航班特征工程数据(Silver → Gold)",
group_name="gold_layer",
tags={"layer": "gold"},
)
def gold_flight_features(
context,
silver_flights: pd.DataFrame, # 依赖 silver 层
) -> pd.DataFrame:
"""Gold 层:为 ML 模型准备特征"""
df = silver_flights.copy()
# 特征工程
df["departure_hour"] = pd.to_datetime(df["departure_time"]).dt.hour
df["is_weekend"] = pd.to_datetime(df["departure_time"]).dt.dayofweek >= 5
df["delay_category"] = pd.cut(
df["delay_minutes"],
bins=[-30, 0, 15, 60, float("inf")],
labels=["early", "on_time", "minor_delay", "major_delay"],
)
context.add_output_metadata({
"num_features": len(df.columns),
"delay_distribution": df["delay_category"].value_counts().to_dict(),
})
return df
3.3 注册 Assets:让 Dagster 找到你的代码
# finlakehouse_pipeline/__init__.py
from dagster import Definitions, load_assets_from_modules
# 导入所有 asset 模块
from finlakehouse_pipeline.assets import bronze, silver
# Definitions 是整个项目的"入口",类似于 Flask 的 app
defs = Definitions(
assets=load_assets_from_modules([bronze, silver]),
)
load_assets_from_modules 会自动扫描模块中所有被 @asset 装饰的函数,你不需要逐一手动列出。
3.4 练习:在 UI 中物化资产
- 确保
dagster dev正在运行。 - 打开 UI,点击左侧的 Assets 菜单。
- 你会看到
bronze_flights → silver_flights → gold_flight_features的依赖图。 - 点击
gold_flight_features,然后点击右上角的 Materialize 按钮。 - Dagster 会自动按顺序物化整条链路,并在每个节点记录你定义的 metadata。
<a name="chapter-4"></a>
Chapter 4:核心概念 II——Resources(资源)
4.1 为什么需要 Resources?
假设你的 Bronze Asset 需要连接 MinIO(S3 兼容存储),你可能会这样写:
# ❌ 反模式:直接在 Asset 里硬编码连接信息
@asset
def bronze_flights():
import boto3
s3 = boto3.client("s3", endpoint_url="http://localhost:9000", ...)
...
这种方式有三个问题:一是连接信息散落在各处,二是测试时必须真正连接 MinIO,三是切换环境(dev → prod)需要修改代码。
Resource 模式解决了这些问题——把"如何连接"与"用连接做什么"彻底分离。
4.2 定义一个 MinIO Resource
# finlakehouse_pipeline/resources/storage.py
from dagster import ConfigurableResource
import boto3
from botocore.client import BaseClient
from typing import Optional
class MinIOResource(ConfigurableResource):
"""
封装 MinIO(S3 兼容)连接的 Dagster Resource。
ConfigurableResource 让参数可以通过 YAML 配置文件或环境变量注入,
而不是硬编码在代码里。
"""
endpoint_url: str # e.g., "http://minio:9000"
access_key: str
secret_key: str
region_name: str = "us-east-1"
def get_client(self) -> BaseClient:
"""返回 boto3 S3 客户端"""
return boto3.client(
"s3",
endpoint_url=self.endpoint_url,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
region_name=self.region_name,
)
def read_parquet(self, bucket: str, key: str) -> "pd.DataFrame":
"""从 MinIO 读取 Parquet 文件,返回 DataFrame"""
import pandas as pd
import io
client = self.get_client()
response = client.get_object(Bucket=bucket, Key=key)
return pd.read_parquet(io.BytesIO(response["Body"].read()))
def write_parquet(self, df: "pd.DataFrame", bucket: str, key: str) -> None:
"""将 DataFrame 写入 MinIO 的 Parquet 文件"""
import io
buffer = io.BytesIO()
df.to_parquet(buffer, index=False)
buffer.seek(0)
client = self.get_client()
client.put_object(Bucket=bucket, Key=key, Body=buffer)
4.3 在 Asset 中使用 Resource
# finlakehouse_pipeline/assets/bronze.py
from dagster import asset
from finlakehouse_pipeline.resources.storage import MinIOResource
import pandas as pd
@asset(
description="从 MinIO Landing Zone 加载原始航班数据",
group_name="bronze_layer",
)
def bronze_flights(
context,
minio: MinIOResource, # Resource 通过参数名注入,与 Asset 依赖的注入方式相同!
) -> pd.DataFrame:
"""
Dagster 如何区分这个参数是"Asset 依赖"还是"Resource 依赖"?
答:通过 Definitions 中的注册信息——在 resources 字典里注册的就是 Resource,
在 assets 列表里注册的就是 Asset。Dagster 在运行时自动路由。
"""
context.log.info("从 MinIO 读取原始数据...")
df = minio.read_parquet(
bucket="landing-zone",
key="flights/raw_flights.parquet"
)
context.add_output_metadata({"num_rows": len(df)})
return df
4.4 注册 Resources
# finlakehouse_pipeline/__init__.py
from dagster import Definitions, EnvVar, load_assets_from_modules
from finlakehouse_pipeline.assets import bronze, silver
from finlakehouse_pipeline.resources.storage import MinIOResource
defs = Definitions(
assets=load_assets_from_modules([bronze, silver]),
resources={
# "minio" 这个 key 必须与 Asset 参数名完全一致
"minio": MinIOResource(
endpoint_url=EnvVar("MINIO_ENDPOINT_URL"), # 从环境变量读取
access_key=EnvVar("MINIO_ACCESS_KEY"),
secret_key=EnvVar("MINIO_SECRET_KEY"),
),
},
)
EnvVar 是 Dagster 内置的环境变量读取器,它在运行时才解析,而不是在导入时——这意味着你可以安全地将代码提交到 Git,而不会泄露密钥。
4.5 为测试创建 Mock Resource
# finlakehouse_pipeline_tests/test_bronze.py
from finlakehouse_pipeline.assets.bronze import bronze_flights
from finlakehouse_pipeline.resources.storage import MinIOResource
from dagster import build_asset_context
import pandas as pd
from unittest.mock import MagicMock, patch
def test_bronze_flights_loads_data():
"""
测试时,我们不需要真正的 MinIO 连接。
只需要创建一个 Mock Resource,让它返回假数据。
"""
# 创建 Mock Resource
mock_minio = MagicMock(spec=MinIOResource)
mock_minio.read_parquet.return_value = pd.DataFrame({
"flight_id": ["FL001", "FL002"],
"delay_minutes": [10, -5],
"departure_time": ["2025-01-01 08:00", "2025-01-01 09:00"],
})
# 构建测试用的 context
context = build_asset_context()
# 直接调用 Asset 函数(无需启动 Dagster 服务!)
result = bronze_flights(context=context, minio=mock_minio)
assert isinstance(result, pd.DataFrame)
assert len(result) == 2
mock_minio.read_parquet.assert_called_once_with(
bucket="landing-zone",
key="flights/raw_flights.parquet"
)
这就是 Dagster 可测试性的精髓:Asset 逻辑和外部依赖完全解耦,单元测试不依赖任何基础设施。
<a name="chapter-5"></a>
Chapter 5:核心概念 III——Jobs、Ops 与 Graphs
5.1 Asset Job:选择性物化资产子集
大多数时候,你不需要每次物化整个 Asset 图。define_asset_job 让你选择一个资产子集作为独立的 Job。
# finlakehouse_pipeline/jobs/__init__.py
from dagster import define_asset_job, AssetSelection
# 只物化 bronze 和 silver 层(例如,数据摄入阶段的 Job)
ingestion_job = define_asset_job(
name="ingestion_job",
description="从 Landing Zone 摄入数据到 Silver 层",
# AssetSelection 提供强大的资产过滤语法
selection=AssetSelection.groups("bronze_layer", "silver_layer"),
)
# 只物化 ML 相关的资产
ml_training_job = define_asset_job(
name="ml_training_job",
description="端到端 ML 训练 Pipeline",
selection=AssetSelection.groups("gold_layer", "ml_layer"),
)
# 物化特定标签的资产
monitoring_job = define_asset_job(
name="monitoring_job",
selection=AssetSelection.tag("type", "monitor"),
)
5.2 Ops:更细粒度的任务单元
@op 是比 Asset 更低级的原语。当一个任务不产生持久化的数据资产时(比如发送 Slack 告警、触发外部 API),就用 Op。
# finlakehouse_pipeline/ops/notifications.py
from dagster import op, Out, In
import requests
@op(
description="数据质量检查通过后发送 Slack 通知",
ins={"validation_result": In(dict)}, # 显式声明输入类型
)
def notify_slack(context, validation_result: dict):
"""Op 不返回持久化的资产,只执行副作用(发送通知)"""
if not validation_result["passed"]:
message = f"⚠️ 数据质量检查失败:{validation_result['reason']}"
# 发送 Slack webhook
requests.post(
context.op_config["webhook_url"],
json={"text": message}
)
context.log.warning(message)
else:
context.log.info("✅ 数据质量检查通过")
5.3 Graphs:组合 Ops 构成工作流
当你需要将多个 Ops 组合成一个有向图时,使用 @graph。
from dagster import graph, op, GraphDefinition
@op
def validate_data(context, df) -> dict:
result = {"passed": True, "reason": ""}
if len(df) < 100:
result = {"passed": False, "reason": "数据量不足 100 行"}
return result
@graph
def data_quality_workflow():
"""
Graph 定义数据流:validate → notify
注意:graph 里面用函数调用的方式连接 Ops,非常直观
"""
validation_result = validate_data() # 实际上不会在这里执行,只是声明依赖
notify_slack(validation_result)
# 将 Graph 转成 Job 才能运行
quality_check_job = data_quality_workflow.to_job(name="quality_check_job")
何时用 Asset,何时用 Op/Graph? 一个简单的原则:如果你的任务产生了一个"可以被其他任务复用的数据结果",就用 Asset;如果只是执行某种操作(通知、校验、清理临时文件),就用 Op。
<a name="chapter-6"></a>
Chapter 6:核心概念 IV——Partitions(分区)
6.1 什么是分区?
分区是 Dagster 中处理时间序列数据的核心机制。假设你每天都有新的航班数据,分区让你能够:
- 单独物化某一天的数据,而不是重新处理所有历史数据。
- 追踪哪些日期的数据已经处理,哪些还没有。
- 并行处理多个日期的数据。
6.2 定义时间分区
# finlakehouse_pipeline/assets/partitioned.py
from dagster import asset, DailyPartitionsDefinition, MonthlyPartitionsDefinition
import pandas as pd
# 定义按天分区,从 2025-07-01 开始
daily_partitions = DailyPartitionsDefinition(start_date="2025-07-01")
@asset(
partitions_def=daily_partitions,
description="按天分区的航班 Bronze 数据",
group_name="bronze_layer",
)
def bronze_flights_partitioned(context) -> pd.DataFrame:
"""
context.partition_key 包含当前正在处理的分区值。
对于 DailyPartitionsDefinition,它的格式是 "YYYY-MM-DD"。
"""
partition_date = context.partition_key
context.log.info(f"正在处理日期:{partition_date}")
# 根据分区日期读取对应的数据文件
df = pd.read_parquet(f"/data/landing/flights/{partition_date}.parquet")
context.add_output_metadata({
"partition_date": partition_date,
"num_rows": len(df),
})
return df
@asset(
partitions_def=daily_partitions,
description="按天分区的 Silver 层航班数据",
group_name="silver_layer",
)
def silver_flights_partitioned(
context,
bronze_flights_partitioned: pd.DataFrame, # 同一分区的 Bronze 数据会自动传入
) -> pd.DataFrame:
"""
当上游和下游都有相同的分区定义时,Dagster 自动匹配:
处理 2025-01-15 的 silver 时,会读取 2025-01-15 的 bronze。
"""
df = bronze_flights_partitioned.copy()
df = df.dropna()
return df
6.3 跨分区聚合:PartitionMapping
有时你需要把多天的数据聚合成一个月的报告:
from dagster import asset, MonthlyPartitionsDefinition, TimeWindowPartitionMapping
monthly_partitions = MonthlyPartitionsDefinition(start_date="2025-07-01")
@asset(
partitions_def=monthly_partitions,
ins={
"silver_flights_partitioned": AssetIn(
partition_mapping=TimeWindowPartitionMapping(
start_offset=-1, # 从月初
end_offset=0, # 到月末
)
)
},
description="月度航班汇总报告",
)
def monthly_flight_report(context, silver_flights_partitioned: pd.DataFrame) -> dict:
"""
这里的 silver_flights_partitioned 是当前月份所有天的数据合并后的 DataFrame。
Dagster 自动完成跨分区的数据合并。
"""
return {
"month": context.partition_key,
"total_flights": len(silver_flights_partitioned),
"avg_delay": silver_flights_partitioned["delay_minutes"].mean(),
}
6.4 在 UI 中操作分区
在 Dagster UI 中,选择分区 Asset 后,你会看到一个"分区热力图"——每个格子代表一个日期分区,绿色表示已成功物化,红色表示失败,灰色表示未运行。你可以点击任意格子单独重跑或查看日志。
<a name="chapter-7"></a>
Chapter 7:核心概念 V——Schedules 与 Sensors
7.1 Schedule:基于时间的触发
# finlakehouse_pipeline/schedules/__init__.py
from dagster import ScheduleDefinition, build_schedule_from_partitioned_job
# 简单方式:直接从分区 Job 构建 Schedule
daily_ingestion_schedule = build_schedule_from_partitioned_job(
job=ingestion_job, # 必须是分区 Job
hour_of_day=2, # 每天凌晨 2 点运行
minute_of_hour=30, # 2:30 AM
description="每日凌晨 2:30 摄入前一天的航班数据",
)
# 高级方式:使用 Cron 表达式
from dagster import schedule
@schedule(
cron_schedule="0 6 * * MON", # 每周一早上 6 点
job=ml_training_job,
execution_timezone="Europe/Berlin", # 法兰克福时区
)
def weekly_model_retrain_schedule(context):
"""
Schedule 函数可以根据时间动态生成 run_config(运行配置)。
比如,每次训练时使用不同的超参数。
"""
# context.scheduled_execution_time 包含计划执行时间
run_date = context.scheduled_execution_time.strftime("%Y-%m-%d")
return {
"ops": {
"train_model": {
"config": {
"training_date": run_date,
"max_epochs": 50,
}
}
}
}
7.2 Sensor:基于事件的触发
Sensor 持续轮询某个外部状态,当条件满足时触发 Job 运行。
from dagster import sensor, RunRequest, SkipReason, SensorEvaluationContext
import boto3
import json
@sensor(
job=ingestion_job,
minimum_interval_seconds=60, # 每 60 秒检查一次
description="监听 MinIO Landing Zone,有新文件时触发摄入",
)
def minio_landing_sensor(context: SensorEvaluationContext):
"""
Sensor 的工作原理:
1. 每 60 秒被 Dagster Daemon 调用一次
2. 函数检查外部状态(MinIO 中是否有新文件)
3. 如果有新文件,yield RunRequest 触发 Job
4. 如果没有,yield SkipReason 或什么都不做
"""
s3 = boto3.client("s3", endpoint_url="http://minio:9000", ...)
# cursor 是 Sensor 的状态,用于记录上次处理到哪里
# 注意:cursor 在每次 Sensor 运行之间是持久化的
last_processed_key = context.cursor or ""
response = s3.list_objects_v2(
Bucket="landing-zone",
Prefix="flights/",
)
new_files = []
for obj in response.get("Contents", []):
if obj["Key"] > last_processed_key:
new_files.append(obj["Key"])
if not new_files:
yield SkipReason("Landing Zone 中没有新文件")
return
# 为每个新文件触发一次 Job 运行
for file_key in sorted(new_files):
# 从文件名提取日期作为分区键(假设文件名格式为 flights/2025-01-15.parquet)
partition_date = file_key.split("/")[1].replace(".parquet", "")
yield RunRequest(
run_key=file_key, # run_key 确保同一文件不会重复触发
partition_key=partition_date,
run_config={},
tags={"source_file": file_key},
)
# 更新 cursor 到最新处理的文件
context.update_cursor(sorted(new_files)[-1])
Sensor 的 run_key 机制非常重要:Dagster 保证同一个 run_key 的 RunRequest 只会被执行一次,即使 Sensor 多次检测到同一个文件,也不会重复触发。这是一种内置的幂等性保障。
<a name="chapter-8"></a>
Chapter 8:核心概念 VI——IO Managers
8.1 IO Manager 的作用
默认情况下,Dagster 把 Asset 的输出在内存中传递给下游 Asset。这在单机运行时没问题,但在分布式或大数据场景中,你需要把数据持久化到存储系统(MinIO、Delta Lake、数据库等)。
IO Manager 定义了"如何存储和加载 Asset 的输出",让 Asset 代码本身不需要关心存储细节。
8.2 实现一个 Delta Lake IO Manager
# finlakehouse_pipeline/resources/io_managers.py
from dagster import IOManager, io_manager, InputContext, OutputContext
import pandas as pd
from deltalake import write_deltalake, DeltaTable
from typing import Any
class DeltaLakeIOManager(IOManager):
"""
自定义 IO Manager:将 Asset 输出写入 Delta Lake 格式的 MinIO 存储,
并在 Asset 读取时从 Delta Lake 加载。
这样一来,所有 Asset 的数据自动具备:
- ACID 事务(Delta Lake 保证)
- Time travel(可以查询任意历史版本)
- Schema evolution
"""
def __init__(self, base_path: str, storage_options: dict):
self.base_path = base_path # e.g., "s3://lakehouse"
self.storage_options = storage_options # MinIO 连接配置
def _get_path(self, context: OutputContext | InputContext) -> str:
"""根据 Asset 的层级和名称构造存储路径"""
# Asset key 是一个元组,例如 ("bronze_layer", "bronze_flights")
return f"{self.base_path}/{'/'.join(context.asset_key.path)}"
def handle_output(self, context: OutputContext, obj: Any):
"""当 Asset 产生输出时,这个方法被调用"""
if not isinstance(obj, pd.DataFrame):
raise ValueError(f"DeltaLakeIOManager 只支持 pd.DataFrame,收到了 {type(obj)}")
path = self._get_path(context)
context.log.info(f"将 DataFrame 写入 Delta Lake:{path}")
write_deltalake(
path,
obj,
mode="overwrite",
storage_options=self.storage_options,
)
# 记录写入 metadata
context.add_output_metadata({
"delta_table_path": path,
"num_rows_written": len(obj),
"schema": str(obj.dtypes.to_dict()),
})
def load_input(self, context: InputContext) -> pd.DataFrame:
"""当下游 Asset 需要读取上游输出时,这个方法被调用"""
path = self._get_path(context)
context.log.info(f"从 Delta Lake 加载数据:{path}")
dt = DeltaTable(path, storage_options=self.storage_options)
return dt.to_pandas()
@io_manager(
config_schema={
"base_path": str,
"endpoint_url": str,
"access_key": str,
"secret_key": str,
}
)
def delta_lake_io_manager(context):
"""IO Manager 的工厂函数,从配置中读取参数"""
cfg = context.resource_config
storage_options = {
"endpoint_url": cfg["endpoint_url"],
"aws_access_key_id": cfg["access_key"],
"aws_secret_access_key": cfg["secret_key"],
}
return DeltaLakeIOManager(
base_path=cfg["base_path"],
storage_options=storage_options,
)
8.3 注册 IO Manager
# finlakehouse_pipeline/__init__.py
defs = Definitions(
assets=load_assets_from_modules([bronze, silver]),
resources={
"io_manager": delta_lake_io_manager.configured({
"base_path": "s3://lakehouse",
"endpoint_url": "http://minio:9000",
"access_key": "minioadmin",
"secret_key": "minioadmin",
}),
"minio": MinIOResource(...),
},
)
一旦注册了 io_manager,所有 Asset 的输出会自动通过它写入 Delta Lake,输入会自动从 Delta Lake 读取,而无需在每个 Asset 函数里写任何存储代码。这就是关注点分离的极致。
<a name="chapter-9"></a>
Chapter 9:测试策略——让 Pipeline 可验证
9.1 三层测试策略
生产级 Dagster 项目应该有三层测试:
第一层:单元测试(快速,不依赖基础设施)
- 测试每个 Asset 的业务逻辑
- 用 Mock 替换所有 Resource 和 IO Manager
第二层:集成测试(中等速度,依赖本地容器)
- 使用
docker-compose启动 MinIO、PostgreSQL 等服务 - 测试完整的数据流
第三层:端到端测试(慢速,仅在 CI/CD 中运行)
- 在测试环境中运行真实的 Job
9.2 单元测试示例
# finlakehouse_pipeline_tests/test_silver.py
import pytest
import pandas as pd
from dagster import build_asset_context, materialize
from finlakehouse_pipeline.assets.silver import silver_flights
@pytest.fixture
def raw_flights_df():
"""测试用的 fixture DataFrame"""
return pd.DataFrame({
"flight_id": ["FL001", "FL002", "FL003", None, "FL005"],
"delay_minutes": [10, -5, 200, 15, -50], # -50 应该被过滤
"departure_time": ["2025-01-01 08:00"] * 5,
"arrival_time": ["2025-01-01 10:00"] * 5,
})
def test_silver_removes_null_flight_id(raw_flights_df):
"""测试:flight_id 为 null 的行应该被移除"""
context = build_asset_context()
result = silver_flights(context=context, bronze_flights=raw_flights_df)
assert result["flight_id"].notna().all()
def test_silver_filters_extreme_negative_delay(raw_flights_df):
"""测试:delay_minutes < -30 的行应该被过滤"""
context = build_asset_context()
result = silver_flights(context=context, bronze_flights=raw_flights_df)
assert (result["delay_minutes"] >= -30).all()
def test_silver_metadata_recorded(raw_flights_df):
"""测试:metadata 是否正确记录(通过检查 context 的调用)"""
context = build_asset_context()
silver_flights(context=context, bronze_flights=raw_flights_df)
# 验证没有抛出异常就算通过(metadata 验证比较复杂,可以通过日志检查)
9.3 使用 materialize 进行集成测试
from dagster import materialize
from finlakehouse_pipeline.assets.bronze import bronze_flights
from finlakehouse_pipeline.assets.silver import silver_flights
from unittest.mock import MagicMock, patch
def test_full_bronze_to_silver_pipeline():
"""
`materialize` 函数让你在测试中运行真实的 Dagster 物化逻辑,
包括 IO Manager 的调用,但可以注入 Mock Resource。
"""
mock_minio = MagicMock()
mock_minio.read_parquet.return_value = pd.DataFrame({
"flight_id": ["FL001", "FL002"],
"delay_minutes": [10, 5],
"departure_time": ["2025-01-01 08:00", "2025-01-01 09:00"],
"arrival_time": ["2025-01-01 10:00", "2025-01-01 11:00"],
})
result = materialize(
assets=[bronze_flights, silver_flights],
resources={"minio": mock_minio},
)
assert result.success
# 检查 silver_flights 的输出
silver_df = result.output_for_node("silver_flights")
assert isinstance(silver_df, pd.DataFrame)
assert len(silver_df) == 2
<a name="chapter-10"></a>
Chapter 10:与 MLflow 集成——ML Pipeline 实战
10.1 MLflow Resource
# finlakehouse_pipeline/resources/mlflow_resource.py
from dagster import ConfigurableResource
import mlflow
from typing import Optional
class MLflowResource(ConfigurableResource):
"""封装 MLflow 连接和实验管理"""
tracking_uri: str # e.g., "http://mlflow:5000"
default_experiment: str = "finlakehouse"
def model_config(self) -> dict:
# Pydantic v2 required override
return {"arbitrary_types_allowed": True}
def setup_for_execution(self, context) -> None:
"""在执行前设置 MLflow 连接"""
mlflow.set_tracking_uri(self.tracking_uri)
mlflow.set_experiment(self.default_experiment)
def start_run(self, run_name: str, tags: Optional[dict] = None):
"""启动一个 MLflow 实验运行"""
return mlflow.start_run(run_name=run_name, tags=tags)
def log_model(self, model, artifact_path: str, registered_model_name: str):
"""注册模型到 MLflow Model Registry"""
mlflow.sklearn.log_model(
model,
artifact_path=artifact_path,
registered_model_name=registered_model_name,
)
10.2 ML Training Asset
# finlakehouse_pipeline/assets/ml.py
from dagster import asset, Output, MetadataValue
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import pandas as pd
import json
@asset(
description="训练航班延误预测模型并注册到 MLflow",
group_name="ml_layer",
tags={"type": "ml_model", "framework": "sklearn"},
)
def flight_delay_model(
context,
gold_flight_features: pd.DataFrame,
mlflow_resource: MLflowResource,
) -> dict:
"""
ML 训练 Asset:
1. 接收 Gold 层特征数据
2. 训练模型
3. 记录实验到 MLflow
4. 返回模型信息(而不是模型本身,因为模型注册在 MLflow)
"""
feature_cols = ["departure_hour", "is_weekend", "origin_airport_code"]
target_col = "delay_category"
X = pd.get_dummies(gold_flight_features[feature_cols])
y = gold_flight_features[target_col]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 启动 MLflow 实验运行
with mlflow_resource.start_run(
run_name=f"gbm_training_{context.run_id[:8]}",
tags={"dagster_run_id": context.run_id, "data_version": "v1"},
) as run:
# 定义超参数
params = {"n_estimators": 100, "max_depth": 4, "learning_rate": 0.1}
mlflow.log_params(params)
# 训练模型
model = GradientBoostingClassifier(**params)
model.fit(X_train, y_train)
# 评估模型
y_pred = model.predict(X_test)
report = classification_report(y_test, y_pred, output_dict=True)
# 记录指标
mlflow.log_metric("accuracy", report["accuracy"])
mlflow.log_metric("f1_macro", report["macro avg"]["f1-score"])
# 注册模型
mlflow_resource.log_model(
model=model,
artifact_path="flight_delay_model",
registered_model_name="flight_delay_predictor",
)
run_id = run.info.run_id
# 记录到 Dagster metadata
context.add_output_metadata({
"mlflow_run_id": run_id,
"accuracy": report["accuracy"],
"f1_macro": report["macro avg"]["f1-score"],
"model_name": "flight_delay_predictor",
"classification_report": MetadataValue.json(report),
})
return {
"mlflow_run_id": run_id,
"model_name": "flight_delay_predictor",
"accuracy": report["accuracy"],
}
@asset(
description="将最佳模型提升到生产阶段",
group_name="ml_layer",
tags={"type": "governance"},
)
def promoted_model(
context,
flight_delay_model: dict, # 依赖训练 Asset
mlflow_resource: MLflowResource,
) -> dict:
"""
模型治理 Asset:根据指标决定是否将模型提升到 Production 阶段。
这个模式是 EU AI Act 中"人工审核"要求的体现——模型不会自动上线,
需要满足阈值条件才能提升。
"""
ACCURACY_THRESHOLD = 0.75 # 生产门槛
if flight_delay_model["accuracy"] < ACCURACY_THRESHOLD:
context.log.warning(
f"模型准确率 {flight_delay_model['accuracy']:.3f} 未达到"
f"生产阈值 {ACCURACY_THRESHOLD},跳过提升"
)
return {"status": "rejected", "reason": "accuracy_below_threshold"}
# 将模型版本提升到 Production
mlflow.set_tracking_uri(mlflow_resource.tracking_uri)
client = mlflow.tracking.MlflowClient()
# 获取最新版本号
versions = client.get_latest_versions("flight_delay_predictor", stages=["None"])
latest_version = versions[0].version
client.transition_model_version_stage(
name="flight_delay_predictor",
version=latest_version,
stage="Production",
archive_existing_versions=True, # 将旧的 Production 版本归档
)
context.add_output_metadata({
"promoted_version": latest_version,
"accuracy": flight_delay_model["accuracy"],
"mlflow_run_id": flight_delay_model["mlflow_run_id"],
})
return {
"status": "promoted",
"version": latest_version,
"model_name": "flight_delay_predictor",
}
<a name="chapter-11"></a>
Chapter 11:与 Spark / Delta Lake 集成
11.1 Spark Session Resource
# finlakehouse_pipeline/resources/spark_resource.py
from dagster import ConfigurableResource
from pyspark.sql import SparkSession
from typing import Optional
class SparkResource(ConfigurableResource):
"""
封装 PySpark Session 的 Dagster Resource。
在分布式场景中(如 Kubernetes),每个 Asset 可以获得自己的 Spark Session。
"""
app_name: str = "FinLakehouse"
master: str = "local[*]" # 生产中替换为 "spark://spark-master:7077"
minio_endpoint: str = "http://minio:9000"
minio_access_key: str = "minioadmin"
minio_secret_key: str = "minioadmin"
def get_session(self) -> SparkSession:
"""
创建并返回配置好 S3A(MinIO)连接的 SparkSession。
注意:在实际使用中,应该考虑 Session 的复用,避免频繁创建。
"""
return (
SparkSession.builder
.appName(self.app_name)
.master(self.master)
# Delta Lake 支持
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
# MinIO S3A 配置
.config("spark.hadoop.fs.s3a.endpoint", self.minio_endpoint)
.config("spark.hadoop.fs.s3a.access.key", self.minio_access_key)
.config("spark.hadoop.fs.s3a.secret.key", self.minio_secret_key)
.config("spark.hadoop.fs.s3a.path.style.access", "true")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.getOrCreate()
)
11.2 使用 Spark 处理大规模数据的 Asset
# finlakehouse_pipeline/assets/spark_assets.py
from dagster import asset
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql import functions as F
@asset(
description="使用 Spark 从 Delta Lake 读取并进行大规模特征计算",
group_name="feature_layer",
tags={"engine": "spark"},
)
def spark_flight_features(
context,
spark: SparkResource, # 注入 Spark Resource
) -> None:
"""
注意:这个 Asset 返回 None(或写入路径),而不是 DataFrame。
因为 Spark DataFrame 不能序列化后传递给其他 Asset。
它直接写入 Delta Lake,下游 Asset 再从 Delta Lake 读取。
"""
session = spark.get_session()
# 从 Delta Lake 读取 Silver 层数据
silver_df = session.read.format("delta").load("s3a://lakehouse/silver_flights")
context.log.info(f"从 Silver 层读取了 {silver_df.count()} 行")
# 大规模特征工程(利用 Spark 分布式计算)
feature_df = (
silver_df
.withColumn("departure_hour", F.hour(F.col("departure_time")))
.withColumn("is_weekend", F.dayofweek(F.col("departure_time")).isin([1, 7]))
.withColumn("delay_bucket",
F.when(F.col("delay_minutes") <= 0, "on_time")
.when(F.col("delay_minutes") <= 15, "minor")
.when(F.col("delay_minutes") <= 60, "moderate")
.otherwise("severe")
)
# 滑动窗口特征:过去 7 天同一航线的平均延误
.withColumn("route", F.concat(F.col("origin"), F.lit("-"), F.col("destination")))
)
# 写入 Delta Lake(覆盖写入)
(
feature_df
.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true")
.save("s3a://lakehouse/feature_flights")
)
context.add_output_metadata({
"rows_written": feature_df.count(),
"output_path": "s3a://lakehouse/feature_flights",
})
session.stop()
<a name="chapter-12"></a>
Chapter 12:Metadata、Lineage 与可观测性
12.1 丰富的 Metadata 类型
Dagster 支持多种 Metadata 类型,让你的资产详情页变成一个内置的数据文档:
from dagster import asset, MetadataValue
import pandas as pd
import matplotlib.pyplot as plt
import io
@asset
def documented_asset(context) -> pd.DataFrame:
df = pd.DataFrame({"delay": range(100), "count": range(100)})
# 生成可视化图表并嵌入 metadata
fig, ax = plt.subplots()
df.plot(ax=ax, x="delay", y="count")
img_buffer = io.BytesIO()
fig.savefig(img_buffer, format="png")
img_buffer.seek(0)
context.add_output_metadata({
# 纯文本
"description": "航班延误分布数据",
# 数值(可以在 UI 中追踪趋势)
"num_rows": MetadataValue.int(len(df)),
"avg_delay": MetadataValue.float(df["delay"].mean()),
# Markdown(支持富文本展示)
"data_quality_report": MetadataValue.md(
"## 数据质量报告\n"
f"- 总行数:{len(df)}\n"
f"- 缺失值:0\n"
f"- 最大延误:{df['delay'].max()} 分钟\n"
),
# 数据预览(表格形式)
"preview": MetadataValue.md(df.head(5).to_markdown()),
# 图表(PNG 图片)
"delay_distribution": MetadataValue.png(img_buffer.read()),
# 外部链接(跳转到 MinIO 控制台)
"storage_location": MetadataValue.url(
"http://minio:9001/browser/lakehouse/silver_flights"
),
# JSON 对象(展示复杂结构)
"schema": MetadataValue.json({
"columns": df.dtypes.astype(str).to_dict()
}),
})
return df
12.2 数据血缘(Lineage)的自动追踪
Dagster 自动追踪 Asset 之间的依赖关系,形成完整的数据血缘图。你可以在 UI 的 Asset Graph 视图中看到:
- 哪些 Asset 是某个 Asset 的上游数据源
- 如果某个 Asset 物化失败,哪些下游 Asset 会受到影响
- 每个 Asset 的最近物化时间和状态
这对于合规审计(BaFin 要求能够追溯模型决策依据)非常有价值。
12.3 Fresh Policy:数据新鲜度监控
from dagster import asset, FreshnessPolicy
from datetime import timedelta
@asset(
# 声明:这个 Asset 应该每天至少更新一次,且不超过 30 分钟的延迟
freshness_policy=FreshnessPolicy(
maximum_lag_minutes=30,
cron_schedule="0 3 * * *", # 期望在每天 03:00 完成更新
),
description="每日航班数据(如果超时未更新,UI 会显示警告)",
)
def daily_bronze_flights(context) -> None:
...
当 Asset 的最后物化时间超过设定的 maximum_lag_minutes 时,Dagster UI 会在该 Asset 上显示橙色警告,帮助你主动发现数据管道问题。
<a name="chapter-13"></a>
Chapter 13:生产部署——Docker + Dagster Daemon
13.1 Dagster 的进程架构
生产部署的 Dagster 由四个核心进程组成:
Dagster Webserver:提供 UI 和 GraphQL API,是用户与 Dagster 交互的门户。它是无状态的,可以水平扩展。
Dagster Daemon:后台守护进程,负责执行 Schedule 和 Sensor 的轮询、以及 Backfill 任务的调度。它是有状态的,同一时间只能运行一个实例。
Dagster Instance:Dagster 的"数据库",存储所有运行历史、Asset 物化记录、Schedule 状态等。通常使用 PostgreSQL。
Code Server:运行你的用户代码(Asset、Resource、Job 定义)的进程。与 Webserver 和 Daemon 隔离,代码更新不需要重启 Dagster 服务。
13.2 Docker Compose 配置
# docker-compose.yml
version: "3.8"
services:
# Dagster 实例数据库(存储运行历史等)
dagster_postgres:
image: postgres:15
environment:
POSTGRES_USER: dagster
POSTGRES_PASSWORD: dagster
POSTGRES_DB: dagster
volumes:
- dagster_postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "dagster"]
interval: 5s
timeout: 5s
retries: 5
# Dagster WebServer(UI)
dagster_webserver:
image: dagster/dagster-k8s:latest # 或自定义镜像
entrypoint: ["dagster-webserver", "-h", "0.0.0.0", "-p", "3000", "-w", "/opt/dagster/workspace.yaml"]
ports:
- "3000:3000"
environment:
DAGSTER_POSTGRES_USER: dagster
DAGSTER_POSTGRES_PASSWORD: dagster
DAGSTER_POSTGRES_DB: dagster
DAGSTER_POSTGRES_HOST: dagster_postgres
depends_on:
dagster_postgres:
condition: service_healthy
volumes:
- ./workspace.yaml:/opt/dagster/workspace.yaml
# Dagster Daemon(调度器、Sensor 轮询)
dagster_daemon:
image: dagster/dagster-k8s:latest
entrypoint: ["dagster-daemon", "run"]
environment:
DAGSTER_POSTGRES_USER: dagster
DAGSTER_POSTGRES_PASSWORD: dagster
DAGSTER_POSTGRES_DB: dagster
DAGSTER_POSTGRES_HOST: dagster_postgres
depends_on:
dagster_postgres:
condition: service_healthy
volumes:
- ./workspace.yaml:/opt/dagster/workspace.yaml
# 你的用户代码服务(Code Server)
finlakehouse_pipeline:
build:
context: .
dockerfile: Dockerfile
environment:
MINIO_ENDPOINT_URL: http://minio:9000
MINIO_ACCESS_KEY: minioadmin
MINIO_SECRET_KEY: minioadmin
MLFLOW_TRACKING_URI: http://mlflow:5000
ports:
- "4000:4000" # gRPC 端口,供 Webserver 和 Daemon 调用
volumes:
dagster_postgres_data:
13.3 Workspace 配置
# workspace.yaml
# 告诉 Dagster 在哪里找到用户代码
load_from:
- grpc_server:
host: finlakehouse_pipeline
port: 4000
location_name: "finlakehouse"
13.4 用户代码的 Dockerfile
FROM python:3.11-slim
WORKDIR /opt/app
# 安装依赖
COPY requirements.txt .
RUN pip install -r requirements.txt
# 复制代码
COPY finlakehouse_pipeline/ finlakehouse_pipeline/
COPY setup.py .
RUN pip install -e .
# 启动 gRPC Code Server(不是 Webserver)
CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4000", \
"--python-file", "finlakehouse_pipeline/__init__.py"]
13.5 Dagster Instance 配置
# dagster.yaml(放在项目根目录或 $DAGSTER_HOME 指向的目录)
storage:
postgres:
postgres_url: "postgresql://dagster:dagster@dagster_postgres:5432/dagster"
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
max_concurrent_runs: 5 # 同时最多运行 5 个 Job
run_launcher:
module: dagster.core.launcher
class: DefaultRunLauncher
# 日志配置
python_logs:
python_log_level: INFO
<a name="chapter-14"></a>
Chapter 14:FinLakehouse 综合案例
14.1 完整 Pipeline 设计
以下是将 FinLakehouse 的完整 LakeFlow 映射到 Dagster Asset 的设计方案:
Landing Zone(外部)
↓ [Sensor: minio_landing_sensor]
Bronze Layer
bronze_raw_flights ← 原始 CSV 数据
bronze_raw_weather ← 原始天气数据
↓
Silver Layer
silver_flights ← 清洗、去重、类型转换
silver_weather ← 清洗天气数据
↓
Gold Layer
gold_flight_weather_joined ← 航班 + 天气 JOIN
↓
Feature Layer
feature_flight_delay ← ML 特征工程(Spark)
↓
ML Layer
flight_delay_model ← 模型训练 + MLflow 注册
promoted_model ← 模型治理 / 提升
↓
Serving Layer
model_serving_config ← 生成 BentoML 配置文件
↓
Monitoring Layer
drift_detection_report ← 数据漂移检测(每天运行)
model_performance_report ← 模型性能监控
14.2 完整的 Definitions 注册
# finlakehouse_pipeline/__init__.py
from dagster import Definitions, EnvVar, load_assets_from_modules
# 导入所有 Asset 模块
from finlakehouse_pipeline.assets import (
bronze, silver, gold, features, ml_assets, monitoring
)
# 导入所有 Resource
from finlakehouse_pipeline.resources.storage import MinIOResource
from finlakehouse_pipeline.resources.mlflow_resource import MLflowResource
from finlakehouse_pipeline.resources.spark_resource import SparkResource
from finlakehouse_pipeline.resources.io_managers import delta_lake_io_manager
# 导入 Jobs
from finlakehouse_pipeline.jobs import (
ingestion_job, ml_training_job, monitoring_job
)
# 导入 Schedules
from finlakehouse_pipeline.schedules import (
daily_ingestion_schedule, weekly_model_retrain_schedule
)
# 导入 Sensors
from finlakehouse_pipeline.sensors import minio_landing_sensor
defs = Definitions(
# 所有 Asset
assets=load_assets_from_modules([
bronze, silver, gold, features, ml_assets, monitoring
]),
# 所有 Resource(包括 IO Manager)
resources={
"io_manager": delta_lake_io_manager.configured({
"base_path": "s3://lakehouse",
"endpoint_url": EnvVar("MINIO_ENDPOINT_URL"),
"access_key": EnvVar("MINIO_ACCESS_KEY"),
"secret_key": EnvVar("MINIO_SECRET_KEY"),
}),
"minio": MinIOResource(
endpoint_url=EnvVar("MINIO_ENDPOINT_URL"),
access_key=EnvVar("MINIO_ACCESS_KEY"),
secret_key=EnvVar("MINIO_SECRET_KEY"),
),
"mlflow_resource": MLflowResource(
tracking_uri=EnvVar("MLFLOW_TRACKING_URI"),
),
"spark": SparkResource(
master=EnvVar("SPARK_MASTER"),
minio_endpoint=EnvVar("MINIO_ENDPOINT_URL"),
minio_access_key=EnvVar("MINIO_ACCESS_KEY"),
minio_secret_key=EnvVar("MINIO_SECRET_KEY"),
),
},
# 所有 Job
jobs=[ingestion_job, ml_training_job, monitoring_job],
# 所有 Schedule
schedules=[daily_ingestion_schedule, weekly_model_retrain_schedule],
# 所有 Sensor
sensors=[minio_landing_sensor],
)
14.3 监管合规要点
在受监管行业(BaFin、EU AI Act)中,Dagster 可以提供以下内置的合规支持:
审计追踪:每次 Asset 物化都记录了完整的运行 ID、时间戳、输入参数和输出 metadata,形成不可篡改的审计日志(存储在 PostgreSQL 中)。
模型血缘:从原始数据到模型决策的完整数据链路可以在 Asset Graph 中一键可视化,满足"可解释性"要求。
模型治理:promoted_model Asset 实现了"无法自动上线"的门控机制,只有满足准确率阈值的模型才能进入生产,体现了人工审核节点。
回滚能力:Delta Lake 的 Time Travel + MLflow 的版本管理,允许在发现问题时将数据和模型同时回滚到指定时间点。
附录:常用命令速查
# 开发环境
dagster dev # 启动开发服务器
dagster asset materialize -m my_module # 命令行物化所有资产
dagster asset materialize -m my_module --select bronze_flights # 物化指定资产
dagster job execute -m my_module -j ingestion_job # 执行指定 Job
# 分区操作
dagster asset materialize -m my_module --select bronze_flights \
--partition 2025-01-15 # 物化指定分区
# 检查 Dagster 状态
dagster instance info # 查看实例配置
dagster schedule list # 列出所有 Schedule
dagster sensor list # 列出所有 Sensor
dagster sensor start minio_landing_sensor # 启动 Sensor
# 生产部署检查
dagster-daemon health-check # 检查 Daemon 健康状态
学习路径建议
第一周:完成 Chapter 1-3,理解 Asset 的核心概念,手动搭建 bronze → silver → gold 的数据流。
第二周:完成 Chapter 4-5,实现 MinIO Resource 并将其注入 Asset,编写单元测试验证解耦效果。
第三周:完成 Chapter 6-7,为你的 Frankfurt Airport 税收时间预测数据实现按天分区,并添加 MinIO Landing Sensor。
第四周:完成 Chapter 8-10,实现 Delta Lake IO Manager 和 MLflow 集成,打通完整的 ML Pipeline。
第五周及以后:Chapter 11-14,接入 Spark 处理大规模特征工程,完成 Docker 生产部署,实现监管合规功能。
本手册版本:2025-02,适配 Dagster 1.7+