Medallion Architecture 完整构建指南

MinIO + PySpark (Delta Lake) + Trino | 从零开始

0. 环境假设

组件 配置
MinIO endpoint http://minio:9000
Access Key minioadmin
Secret Key minioadmin
Spark Delta 版本 delta-spark 3.x
Trino Delta connector 已配置好 delta catalog 指向 MinIO
Bucket 名称 lakehouse (新建)
注意: 如果你已有 bucket(如截图中的 test-bucket-lakehouse),
把下方所有 s3a://lakehouse 替换为 s3a://test-bucket-lakehouse 即可。

Step 1: 创建 MinIO Bucket 结构

方式 A:Python(推荐,可脚本化)

# setup_minio.py
# 依赖: pip install minio

from minio import Minio

client = Minio(
    "localhost:9000",          # MinIO 地址
    access_key="minioadmin",
    secret_key="minioadmin",
    secure=False
)

BUCKET = "lakehouse"

# 1. 创建 bucket(如果不存在)
if not client.bucket_exists(BUCKET):
    client.make_bucket(BUCKET)
    print(f"✅ Bucket '{BUCKET}' 创建成功")
else:
    print(f"ℹ️  Bucket '{BUCKET}' 已存在,跳过")

# 2. 在 bucket 内创建占位符,模拟三层目录结构
# MinIO 是对象存储,没有真正的"目录";用空对象模拟路径
import io
for layer in ["bronze/.keep", "silver/.keep", "gold/.keep"]:
    client.put_object(
        BUCKET, layer,
        data=io.BytesIO(b""),
        length=0
    )
    print(f"  📁 {BUCKET}/{layer.split('/')[0]}/ 目录占位创建")

print("\n✅ MinIO 结构初始化完成")
print(f"   s3a://{BUCKET}/bronze/")
print(f"   s3a://{BUCKET}/silver/")
print(f"   s3a://{BUCKET}/gold/")

运行:

python setup_minio.py

方式 B:MinIO Client CLI(mc)

# 配置 mc 别名
mc alias set local http://localhost:9000 minioadmin minioadmin

# 创建 bucket
mc mb local/lakehouse

# 验证
mc ls local/

Step 2: Trino 创建 Catalog & Schema

说明: Trino 的 delta catalog 通常在配置文件中定义(etc/catalog/delta.properties)。
以下 SQL 假设 catalog 名为 delta,已在 Trino 配置中指向 MinIO。
如果你需要动态创建 catalog,需通过 REST API(Trino 393+)或配置文件。

2.1 确认 Catalog 可用

-- 在 Trino / CloudBeaver 中执行
SHOW CATALOGS;
-- 应该看到 delta(对应你的 MinIO)

SHOW SCHEMAS FROM delta;
-- 查看现有 schema

2.2 创建三层 Schema

-- Bronze: 原始数据层
CREATE SCHEMA IF NOT EXISTS delta.bronze
WITH (location = 's3a://lakehouse/bronze');

-- Silver: 清洗层
CREATE SCHEMA IF NOT EXISTS delta.silver
WITH (location = 's3a://lakehouse/silver');

-- Gold: 聚合层
CREATE SCHEMA IF NOT EXISTS delta.gold
WITH (location = 's3a://lakehouse/gold');

-- 验证
SHOW SCHEMAS FROM delta;

执行结果预期

Schema
--------
bronze
silver
gold
information_schema

Step 3: PySpark 写入三层数据

以下代码在 Jupyter / PySpark shell 中可直接运行
假设 SparkSession 已配置好 S3A 连接到 MinIO。
# medallion_pipeline.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, avg, count, round as spark_round
from delta.tables import DeltaTable

# ──────────────────────────────────────────────────────────────────────────────
# SparkSession 初始化
# 如果已有 spark 对象(如在 Jupyter 中),跳过此段
# ──────────────────────────────────────────────────────────────────────────────
spark = (
    SparkSession.builder
    .appName("medallion-lakehouse")
    .config("spark.sql.extensions",
            "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    # MinIO 连接(如果尚未在 spark-defaults.conf 中配置)
    # .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
    # .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
    # .config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
    # .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .getOrCreate()
)

BUCKET = "s3a://lakehouse"
# 如果用已有 bucket: BUCKET = "s3a://test-bucket-lakehouse"

# ==============================================================================
# BRONZE LAYER — 原始数据直接落地,保留所有字段,不做任何清洗
# 职责:数据保真,可追溯,幂等写入
# ==============================================================================
print("=" * 60)
print("📥 BRONZE: 写入原始订单数据")
print("=" * 60)

# 模拟来自上游系统的原始数据(含脏数据:null、格式不一致)
raw_orders = [
    (1, "Alice",   "2024-01-10", 250.0,  "electronics"),
    (2, "Bob",     "2024-01-11", None,   "clothing"),     # amount = null
    (3, "Charlie", "2024-01-11", 89.5,   "electronics"),
    (4, "Alice",   "2024-01-12", 310.0,  "clothing"),
    (5, "Bob",     "2024-01-12", 450.0,  "ELECTRONICS"),  # 大写(脏数据)
    (6, "Diana",   "2024-01-13", 120.0,  None),           # category = null
    (7, "Charlie", "2024-01-14", -50.0,  "electronics"),  # 负数(异常值)
]

bronze_df = spark.createDataFrame(
    raw_orders,
    schema=["order_id", "customer", "order_date", "amount", "category"]
)

bronze_path = f"{BUCKET}/bronze/orders"

(
    bronze_df
    .write
    .format("delta")
    .mode("overwrite")   # 生产环境改为 "append" 或用 merge
    .save(bronze_path)
)

print(f"✅ Bronze 写入完成 → {bronze_path}")
spark.read.format("delta").load(bronze_path).show()

# ==============================================================================
# SILVER LAYER — 清洗、标准化、类型转换
# 职责:数据质量保障,业务规则执行
# ==============================================================================
print("\n" + "=" * 60)
print("🔧 SILVER: 清洗转换")
print("=" * 60)

bronze_read = spark.read.format("delta").load(bronze_path)

silver_df = (
    bronze_read
    # 1. 过滤无效行:amount 为 null 或负数
    .filter(col("amount").isNotNull() & (col("amount") > 0))
    # 2. 填充 null category
    .fillna({"category": "unknown"})
    # 3. 标准化 category 为小写
    .withColumn("category", col("category").cast("string"))
    .withColumn("category", 
                col("category").substr(1, 100))  # trim 长度
    .withColumn("category",
                col("category").cast("string"))
    # 使用 selectExpr 做 lower
    .selectExpr(
        "order_id",
        "customer",
        "order_date",
        "CAST(amount AS DOUBLE) AS amount",
        "LOWER(category) AS category"
    )
    # 4. 日期类型转换
    .withColumn("order_date", to_date(col("order_date"), "yyyy-MM-dd"))
)

silver_path = f"{BUCKET}/silver/orders"

(
    silver_df
    .write
    .format("delta")
    .mode("overwrite")
    .save(silver_path)
)

print(f"✅ Silver 写入完成 → {silver_path}")
spark.read.format("delta").load(silver_path).show()

# ==============================================================================
# GOLD LAYER — 业务聚合,面向消费
# 职责:支撑 BI、报表、API,高性能读取
# ==============================================================================
print("\n" + "=" * 60)
print("🏅 GOLD: 聚合生成 customer_summary")
print("=" * 60)

silver_read = spark.read.format("delta").load(silver_path)

gold_df = (
    silver_read
    .groupBy("customer", "category")
    .agg(
        count("order_id").alias("order_count"),
        spark_round(avg("amount"), 2).alias("avg_amount")
    )
    .orderBy("customer", "category")
)

gold_path = f"{BUCKET}/gold/customer_summary"

(
    gold_df
    .write
    .format("delta")
    .mode("overwrite")
    .save(gold_path)
)

print(f"✅ Gold 写入完成 → {gold_path}")
spark.read.format("delta").load(gold_path).show()

print("\n🎉 Medallion Pipeline 完成!")

Step 4: Trino 注册表并查询

Spark 写入的 Delta 表,Trino 不会自动发现,需要手动注册一次。

4.1 注册表

-- 注册 Bronze 表
CALL delta.system.register_table(
    schema_name    => 'bronze',
    table_name     => 'orders',
    table_location => 's3a://lakehouse/bronze/orders'
);

-- 注册 Silver 表
CALL delta.system.register_table(
    schema_name    => 'silver',
    table_name     => 'orders',
    table_location => 's3a://lakehouse/silver/orders'
);

-- 注册 Gold 表
CALL delta.system.register_table(
    schema_name    => 'gold',
    table_name     => 'customer_summary',
    table_location => 's3a://lakehouse/gold/customer_summary'
);

-- 验证
SHOW TABLES FROM delta.bronze;
SHOW TABLES FROM delta.silver;
SHOW TABLES FROM delta.gold;

4.2 基础查询

-- Bronze: 查看原始数据(含脏数据)
SELECT * FROM delta.bronze.orders;

-- Silver: 查看清洗后数据
SELECT * FROM delta.silver.orders;

-- Gold: 查看聚合结果
SELECT * FROM delta.gold.customer_summary;

4.3 业务查询示例

-- 1. 查询某类别的所有订单(Silver)
SELECT customer, order_date, amount
FROM delta.silver.orders
WHERE category = 'electronics'
ORDER BY amount DESC;

-- 2. 每个客户的总消费(Gold 直接读,无需重新聚合)
SELECT customer,
       SUM(order_count) AS total_orders,
       ROUND(AVG(avg_amount), 2) AS overall_avg
FROM delta.gold.customer_summary
GROUP BY customer
ORDER BY total_orders DESC;

-- 3. Bronze vs Silver 行数对比(验证清洗效果)
SELECT 'bronze' AS layer, COUNT(*) AS row_count FROM delta.bronze.orders
UNION ALL
SELECT 'silver' AS layer, COUNT(*) AS row_count FROM delta.silver.orders
UNION ALL
SELECT 'gold'   AS layer, COUNT(*) AS row_count FROM delta.gold.customer_summary;

-- 4. 跨层 JOIN:Silver 明细 + Gold 聚合对比
SELECT
    s.customer,
    s.order_id,
    s.amount,
    g.avg_amount AS customer_avg,
    ROUND(s.amount - g.avg_amount, 2) AS diff_from_avg
FROM delta.silver.orders s
JOIN delta.gold.customer_summary g
    ON s.customer = g.customer
    AND s.category = g.category
ORDER BY s.customer, s.order_id;

Step 5: 增删改示例 (CRUD)

5.1 INSERT — 新增数据

PySpark

# 新增一批订单到 Bronze(追加模式)
new_orders = [
    (8, "Eve",   "2024-01-15", 180.0, "clothing"),
    (9, "Alice", "2024-01-15", 99.0,  "electronics"),
]

new_df = spark.createDataFrame(
    new_orders,
    schema=["order_id", "customer", "order_date", "amount", "category"]
)

# append 模式:不覆盖已有数据
(
    new_df
    .write
    .format("delta")
    .mode("append")
    .save(f"{BUCKET}/bronze/orders")
)
print("✅ Bronze 追加写入完成")

Trino SQL

-- 直接用 SQL 插入(Trino Delta connector 支持)
INSERT INTO delta.bronze.orders
VALUES
    (8, 'Eve',   DATE '2024-01-15', 180.0, 'clothing'),
    (9, 'Alice', DATE '2024-01-15', 99.0,  'electronics');

-- 验证
SELECT * FROM delta.bronze.orders WHERE order_id >= 8;

5.2 UPDATE — 更新数据

PySpark(使用 DeltaTable merge API)

from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, f"{BUCKET}/silver/orders")

# 将 order_id=3 的 amount 更新为 100.0
(
    delta_table.update(
        condition = col("order_id") == 3,
        set = {"amount": "100.0"}
    )
)
print("✅ Silver 更新完成")
spark.read.format("delta").load(f"{BUCKET}/silver/orders") \
     .filter(col("order_id") == 3).show()

Trino SQL

-- 更新 Silver 表中 order_id=3 的 amount
UPDATE delta.silver.orders
SET amount = 100.0
WHERE order_id = 3;

-- 验证
SELECT * FROM delta.silver.orders WHERE order_id = 3;

5.3 DELETE — 删除数据

PySpark

from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, f"{BUCKET}/bronze/orders")

# 删除 amount 为 null 的记录(已被 silver 过滤,bronze 中清理)
delta_table.delete(condition = col("amount").isNull())

print("✅ Bronze 删除完成")
spark.read.format("delta").load(f"{BUCKET}/bronze/orders").show()

Trino SQL

-- 删除 Bronze 中 amount 为 null 的记录
DELETE FROM delta.bronze.orders
WHERE amount IS NULL;

-- 验证
SELECT * FROM delta.bronze.orders;

5.4 UPSERT — 合并更新(Merge)

生产中最常用的模式:有则更新,无则插入。

PySpark(DeltaTable merge)

from delta.tables import DeltaTable
from pyspark.sql.functions import lit

# 模拟上游推送的更新数据(order_id=1 修改,order_id=10 新增)
upsert_data = [
    (1,  "Alice", "2024-01-10", 999.0, "electronics"),  # 已存在,更新 amount
    (10, "Frank", "2024-01-16", 200.0, "clothing"),     # 不存在,新增
]

upsert_df = spark.createDataFrame(
    upsert_data,
    schema=["order_id", "customer", "order_date", "amount", "category"]
)

delta_table = DeltaTable.forPath(spark, f"{BUCKET}/silver/orders")

(
    delta_table.alias("target")
    .merge(
        upsert_df.alias("source"),
        condition="target.order_id = source.order_id"   # 匹配条件
    )
    .whenMatchedUpdateAll()    # 匹配到 → 更新所有字段
    .whenNotMatchedInsertAll() # 未匹配 → 插入新行
    .execute()
)

print("✅ Silver UPSERT 完成")
spark.read.format("delta").load(f"{BUCKET}/silver/orders") \
     .filter(col("order_id").isin([1, 10])).show()

Trino SQL(MERGE 语法)

-- Trino 支持标准 MERGE 语法
MERGE INTO delta.silver.orders AS target
USING (
    VALUES
        (1,  'Alice', DATE '2024-01-10', 999.0, 'electronics'),
        (10, 'Frank', DATE '2024-01-16', 200.0, 'clothing')
) AS source(order_id, customer, order_date, amount, category)
ON target.order_id = source.order_id
WHEN MATCHED THEN
    UPDATE SET amount = source.amount, category = source.category
WHEN NOT MATCHED THEN
    INSERT (order_id, customer, order_date, amount, category)
    VALUES (source.order_id, source.customer, source.order_date,
            source.amount, source.category);

-- 验证
SELECT * FROM delta.silver.orders WHERE order_id IN (1, 10);

5.5 Time Travel — 查看历史版本(Delta 独有)

PySpark

# 查看 Silver 表的变更历史
delta_table = DeltaTable.forPath(spark, f"{BUCKET}/silver/orders")
delta_table.history().select("version", "timestamp", "operation").show()

# 读取历史版本(version 0 = 初始状态)
old_df = (
    spark.read
    .format("delta")
    .option("versionAsOf", 0)
    .load(f"{BUCKET}/silver/orders")
)
print("📜 Version 0 数据:")
old_df.show()

Trino SQL

-- 查看 Delta 表历史
SELECT * FROM delta.silver."orders$history";

-- 按版本查询
SELECT * FROM delta.silver.orders FOR VERSION AS OF 0;

-- 按时间查询
SELECT * FROM delta.silver.orders
FOR TIMESTAMP AS OF TIMESTAMP '2024-01-11 00:00:00';

架构总结

  上游数据源
      │
      ▼
  ┌─────────────────────────────────────────────────┐
  │  BRONZE (s3a://lakehouse/bronze/)               │
  │  • 原始数据,不做任何修改                          │
  │  • 保留脏数据(null、异常值)                      │
  │  • append-only,数据可追溯                        │
  └───────────────────┬─────────────────────────────┘
                      │ Spark ETL(清洗规则)
                      ▼
  ┌─────────────────────────────────────────────────┐
  │  SILVER (s3a://lakehouse/silver/)               │
  │  • 清洗后的可信数据                               │
  │  • 类型正确、null 处理、标准化                     │
  │  • 支持 UPSERT / DELETE                         │
  └───────────────────┬─────────────────────────────┘
                      │ Spark Aggregation(业务逻辑)
                      ▼
  ┌─────────────────────────────────────────────────┐
  │  GOLD (s3a://lakehouse/gold/)                   │
  │  • 业务聚合表,直接面向消费                        │
  │  • BI 报表 / API / 机器学习特征                   │
  │  • 高性能读取,通常按业务维度分区                   │
  └─────────────────────────────────────────────────┘
                      │
                      ▼
               Trino(只读查询)
          CloudBeaver / BI 工具 / API
职责 Spark Trino
数据写入 ✅(有限支持)
ETL 转换
大规模聚合计算
即席 SQL 查询
跨表 JOIN 分析
连接 BI 工具
Time Travel
UPSERT / Merge ✅(推荐) ✅(支持)

为什么需要注册表?

核心原因一句话:Spark 和 Trino 是两个完全独立的系统,它们不共享任何"表的认知"。


先理解:数据 vs 元数据

当 Spark 把数据写到 MinIO 的时候,它实际上做了两件事,但很多人只注意到了第一件:

第一件事(你看得见的):把 Parquet 文件写到 s3a://lakehouse/silver/orders/ 这个路径下。这些文件是真实存在于 MinIO 中的,任何人都可以下载它们。

第二件事(你看不见的):生成了一个 _delta_log/ 文件夹,里面存放的是 Delta Table 的"元数据"——也就是这张表叫什么、有哪些列、每列是什么类型、哪些文件属于这张表、有哪些历史版本等等。

所以 MinIO 里存的东西实际上是这样的:

s3a://lakehouse/silver/orders/
├── _delta_log/           ← Delta 元数据(表的"说明书")
│   ├── 00000000000000000000.json
│   ├── 00000000000000000001.json
│   └── ...
├── part-00000-xxx.parquet   ← 真实数据文件
├── part-00001-xxx.parquet
└── ...

Spark 写完之后,它自己当然知道这张表是什么,因为是它自己写的。但 Trino 完全不知道这件事的发生。


Trino 的视角

Trino 是一个查询引擎,它本身不存储任何数据,也不主动去扫描 MinIO 里有什么。它的工作方式是:你告诉我有哪些表,我才去读。

你可以把 Trino 想象成一个图书馆的管理员。MinIO 是一个巨大的仓库,里面堆满了书(数据文件)。这个管理员不会主动去仓库里把每本书翻一遍看看有什么,它只维护一份"图书目录"(catalog)。你要查某本书,先查目录,目录里有记录才能找到对应的书架位置。

Spark 写数据相当于把书搬进了仓库,但没有告诉图书馆管理员,所以目录里没有这本书的记录。 注册表这个操作,就是你去告诉管理员:"嘿,仓库的这个位置 s3a://lakehouse/silver/orders/ 有一批书,帮我把它登记到目录里,名字叫 delta.silver.orders。"


那 Hive Metastore 是怎么回事?

你可能会问:我看很多 Lakehouse 架构里,Spark 写完数据之后 Trino 直接就能查,不需要注册,那是为什么?

答案是那些架构里有一个叫 Hive Metastore(HMS)的组件,它充当了共享的"图书目录"。Spark 写完数据后会自动把表的元数据登记到 HMS 里,Trino 配置了同一个 HMS,所以它立刻就能看到新表。两个系统通过 HMS 共享了元数据,所以不需要手动注册。

你现在的环境里没有 HMS,Spark 和 Trino 各自为政。Spark 有自己的内部目录,Trino 有自己的 delta catalog,两者之间没有任何通信渠道,所以必须手动告诉 Trino"这个路径下有一张表",这就是 register_table 的作用。


一个类比把这三者串起来

概念 类比
MinIO 仓库,存放真实的货物(Parquet 文件)
Delta Log(_delta_log/ 货物自带的说明书,记录货物的规格信息
Hive Metastore 仓库的共享目录系统,所有人共用
Trino Catalog Trino 自己维护的一份目录,需要人工同步
register_table 把"说明书上的信息"手动登记进 Trino 的目录

注册只需要做一次

最后一点值得强调:注册是一次性操作。 你注册完之后,之后 Spark 每次往这张表里追加新数据、更新数据,Trino 都能自动感知到——因为 Trino 每次查询时会去读 _delta_log/ 里的最新状态,而 _delta_log/ 是 Spark 每次写入时自动更新的。

所以注册表只是建立了一个"路径到表名的映射",之后数据的变化是 Delta Log 负责追踪的,Trino 能实时看到最新数据,不需要你重新注册。

整个流程的信息流是这样的:

Spark 写数据
    │
    ├─→ Parquet 文件写入 MinIO
    └─→ _delta_log/ 自动更新(记录变更)

register_table(只做一次)
    └─→ 告诉 Trino:"这个路径 = delta.silver.orders"

Trino 查询
    └─→ 找到路径 → 读 _delta_log/ 获取最新文件列表 → 读 Parquet 文件 → 返回结果

理解了这个之后,如果你之后引入 Hive Metastore,就会发现它解决的恰恰是"每次建新表都要手动注册"这个痛点——这也是为什么生产级 Lakehouse 几乎都有 HMS 的原因。