Medallion Architecture 完整构建指南
MinIO + PySpark (Delta Lake) + Trino | 从零开始
0. 环境假设
| 组件 | 配置 |
|---|---|
| MinIO endpoint | http://minio:9000 |
| Access Key | minioadmin |
| Secret Key | minioadmin |
| Spark Delta 版本 | delta-spark 3.x |
| Trino Delta connector | 已配置好 delta catalog 指向 MinIO |
| Bucket 名称 | lakehouse (新建) |
注意: 如果你已有 bucket(如截图中的test-bucket-lakehouse),
把下方所有s3a://lakehouse替换为s3a://test-bucket-lakehouse即可。
Step 1: 创建 MinIO Bucket 结构
方式 A:Python(推荐,可脚本化)
# setup_minio.py
# 依赖: pip install minio
from minio import Minio
client = Minio(
"localhost:9000", # MinIO 地址
access_key="minioadmin",
secret_key="minioadmin",
secure=False
)
BUCKET = "lakehouse"
# 1. 创建 bucket(如果不存在)
if not client.bucket_exists(BUCKET):
client.make_bucket(BUCKET)
print(f"✅ Bucket '{BUCKET}' 创建成功")
else:
print(f"ℹ️ Bucket '{BUCKET}' 已存在,跳过")
# 2. 在 bucket 内创建占位符,模拟三层目录结构
# MinIO 是对象存储,没有真正的"目录";用空对象模拟路径
import io
for layer in ["bronze/.keep", "silver/.keep", "gold/.keep"]:
client.put_object(
BUCKET, layer,
data=io.BytesIO(b""),
length=0
)
print(f" 📁 {BUCKET}/{layer.split('/')[0]}/ 目录占位创建")
print("\n✅ MinIO 结构初始化完成")
print(f" s3a://{BUCKET}/bronze/")
print(f" s3a://{BUCKET}/silver/")
print(f" s3a://{BUCKET}/gold/")
运行:
python setup_minio.py
方式 B:MinIO Client CLI(mc)
# 配置 mc 别名
mc alias set local http://localhost:9000 minioadmin minioadmin
# 创建 bucket
mc mb local/lakehouse
# 验证
mc ls local/
Step 2: Trino 创建 Catalog & Schema
说明: Trino 的deltacatalog 通常在配置文件中定义(etc/catalog/delta.properties)。
以下 SQL 假设 catalog 名为delta,已在 Trino 配置中指向 MinIO。
如果你需要动态创建 catalog,需通过 REST API(Trino 393+)或配置文件。
2.1 确认 Catalog 可用
-- 在 Trino / CloudBeaver 中执行
SHOW CATALOGS;
-- 应该看到 delta(对应你的 MinIO)
SHOW SCHEMAS FROM delta;
-- 查看现有 schema
2.2 创建三层 Schema
-- Bronze: 原始数据层
CREATE SCHEMA IF NOT EXISTS delta.bronze
WITH (location = 's3a://lakehouse/bronze');
-- Silver: 清洗层
CREATE SCHEMA IF NOT EXISTS delta.silver
WITH (location = 's3a://lakehouse/silver');
-- Gold: 聚合层
CREATE SCHEMA IF NOT EXISTS delta.gold
WITH (location = 's3a://lakehouse/gold');
-- 验证
SHOW SCHEMAS FROM delta;
执行结果预期:
Schema
--------
bronze
silver
gold
information_schema
Step 3: PySpark 写入三层数据
以下代码在 Jupyter / PySpark shell 中可直接运行。
假设 SparkSession 已配置好 S3A 连接到 MinIO。
# medallion_pipeline.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, avg, count, round as spark_round
from delta.tables import DeltaTable
# ──────────────────────────────────────────────────────────────────────────────
# SparkSession 初始化
# 如果已有 spark 对象(如在 Jupyter 中),跳过此段
# ──────────────────────────────────────────────────────────────────────────────
spark = (
SparkSession.builder
.appName("medallion-lakehouse")
.config("spark.sql.extensions",
"io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
# MinIO 连接(如果尚未在 spark-defaults.conf 中配置)
# .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
# .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
# .config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
# .config("spark.hadoop.fs.s3a.path.style.access", "true")
.getOrCreate()
)
BUCKET = "s3a://lakehouse"
# 如果用已有 bucket: BUCKET = "s3a://test-bucket-lakehouse"
# ==============================================================================
# BRONZE LAYER — 原始数据直接落地,保留所有字段,不做任何清洗
# 职责:数据保真,可追溯,幂等写入
# ==============================================================================
print("=" * 60)
print("📥 BRONZE: 写入原始订单数据")
print("=" * 60)
# 模拟来自上游系统的原始数据(含脏数据:null、格式不一致)
raw_orders = [
(1, "Alice", "2024-01-10", 250.0, "electronics"),
(2, "Bob", "2024-01-11", None, "clothing"), # amount = null
(3, "Charlie", "2024-01-11", 89.5, "electronics"),
(4, "Alice", "2024-01-12", 310.0, "clothing"),
(5, "Bob", "2024-01-12", 450.0, "ELECTRONICS"), # 大写(脏数据)
(6, "Diana", "2024-01-13", 120.0, None), # category = null
(7, "Charlie", "2024-01-14", -50.0, "electronics"), # 负数(异常值)
]
bronze_df = spark.createDataFrame(
raw_orders,
schema=["order_id", "customer", "order_date", "amount", "category"]
)
bronze_path = f"{BUCKET}/bronze/orders"
(
bronze_df
.write
.format("delta")
.mode("overwrite") # 生产环境改为 "append" 或用 merge
.save(bronze_path)
)
print(f"✅ Bronze 写入完成 → {bronze_path}")
spark.read.format("delta").load(bronze_path).show()
# ==============================================================================
# SILVER LAYER — 清洗、标准化、类型转换
# 职责:数据质量保障,业务规则执行
# ==============================================================================
print("\n" + "=" * 60)
print("🔧 SILVER: 清洗转换")
print("=" * 60)
bronze_read = spark.read.format("delta").load(bronze_path)
silver_df = (
bronze_read
# 1. 过滤无效行:amount 为 null 或负数
.filter(col("amount").isNotNull() & (col("amount") > 0))
# 2. 填充 null category
.fillna({"category": "unknown"})
# 3. 标准化 category 为小写
.withColumn("category", col("category").cast("string"))
.withColumn("category",
col("category").substr(1, 100)) # trim 长度
.withColumn("category",
col("category").cast("string"))
# 使用 selectExpr 做 lower
.selectExpr(
"order_id",
"customer",
"order_date",
"CAST(amount AS DOUBLE) AS amount",
"LOWER(category) AS category"
)
# 4. 日期类型转换
.withColumn("order_date", to_date(col("order_date"), "yyyy-MM-dd"))
)
silver_path = f"{BUCKET}/silver/orders"
(
silver_df
.write
.format("delta")
.mode("overwrite")
.save(silver_path)
)
print(f"✅ Silver 写入完成 → {silver_path}")
spark.read.format("delta").load(silver_path).show()
# ==============================================================================
# GOLD LAYER — 业务聚合,面向消费
# 职责:支撑 BI、报表、API,高性能读取
# ==============================================================================
print("\n" + "=" * 60)
print("🏅 GOLD: 聚合生成 customer_summary")
print("=" * 60)
silver_read = spark.read.format("delta").load(silver_path)
gold_df = (
silver_read
.groupBy("customer", "category")
.agg(
count("order_id").alias("order_count"),
spark_round(avg("amount"), 2).alias("avg_amount")
)
.orderBy("customer", "category")
)
gold_path = f"{BUCKET}/gold/customer_summary"
(
gold_df
.write
.format("delta")
.mode("overwrite")
.save(gold_path)
)
print(f"✅ Gold 写入完成 → {gold_path}")
spark.read.format("delta").load(gold_path).show()
print("\n🎉 Medallion Pipeline 完成!")
Step 4: Trino 注册表并查询
Spark 写入的 Delta 表,Trino 不会自动发现,需要手动注册一次。
4.1 注册表
-- 注册 Bronze 表
CALL delta.system.register_table(
schema_name => 'bronze',
table_name => 'orders',
table_location => 's3a://lakehouse/bronze/orders'
);
-- 注册 Silver 表
CALL delta.system.register_table(
schema_name => 'silver',
table_name => 'orders',
table_location => 's3a://lakehouse/silver/orders'
);
-- 注册 Gold 表
CALL delta.system.register_table(
schema_name => 'gold',
table_name => 'customer_summary',
table_location => 's3a://lakehouse/gold/customer_summary'
);
-- 验证
SHOW TABLES FROM delta.bronze;
SHOW TABLES FROM delta.silver;
SHOW TABLES FROM delta.gold;
4.2 基础查询
-- Bronze: 查看原始数据(含脏数据)
SELECT * FROM delta.bronze.orders;
-- Silver: 查看清洗后数据
SELECT * FROM delta.silver.orders;
-- Gold: 查看聚合结果
SELECT * FROM delta.gold.customer_summary;
4.3 业务查询示例
-- 1. 查询某类别的所有订单(Silver)
SELECT customer, order_date, amount
FROM delta.silver.orders
WHERE category = 'electronics'
ORDER BY amount DESC;
-- 2. 每个客户的总消费(Gold 直接读,无需重新聚合)
SELECT customer,
SUM(order_count) AS total_orders,
ROUND(AVG(avg_amount), 2) AS overall_avg
FROM delta.gold.customer_summary
GROUP BY customer
ORDER BY total_orders DESC;
-- 3. Bronze vs Silver 行数对比(验证清洗效果)
SELECT 'bronze' AS layer, COUNT(*) AS row_count FROM delta.bronze.orders
UNION ALL
SELECT 'silver' AS layer, COUNT(*) AS row_count FROM delta.silver.orders
UNION ALL
SELECT 'gold' AS layer, COUNT(*) AS row_count FROM delta.gold.customer_summary;
-- 4. 跨层 JOIN:Silver 明细 + Gold 聚合对比
SELECT
s.customer,
s.order_id,
s.amount,
g.avg_amount AS customer_avg,
ROUND(s.amount - g.avg_amount, 2) AS diff_from_avg
FROM delta.silver.orders s
JOIN delta.gold.customer_summary g
ON s.customer = g.customer
AND s.category = g.category
ORDER BY s.customer, s.order_id;
Step 5: 增删改示例 (CRUD)
5.1 INSERT — 新增数据
PySpark
# 新增一批订单到 Bronze(追加模式)
new_orders = [
(8, "Eve", "2024-01-15", 180.0, "clothing"),
(9, "Alice", "2024-01-15", 99.0, "electronics"),
]
new_df = spark.createDataFrame(
new_orders,
schema=["order_id", "customer", "order_date", "amount", "category"]
)
# append 模式:不覆盖已有数据
(
new_df
.write
.format("delta")
.mode("append")
.save(f"{BUCKET}/bronze/orders")
)
print("✅ Bronze 追加写入完成")
Trino SQL
-- 直接用 SQL 插入(Trino Delta connector 支持)
INSERT INTO delta.bronze.orders
VALUES
(8, 'Eve', DATE '2024-01-15', 180.0, 'clothing'),
(9, 'Alice', DATE '2024-01-15', 99.0, 'electronics');
-- 验证
SELECT * FROM delta.bronze.orders WHERE order_id >= 8;
5.2 UPDATE — 更新数据
PySpark(使用 DeltaTable merge API)
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, f"{BUCKET}/silver/orders")
# 将 order_id=3 的 amount 更新为 100.0
(
delta_table.update(
condition = col("order_id") == 3,
set = {"amount": "100.0"}
)
)
print("✅ Silver 更新完成")
spark.read.format("delta").load(f"{BUCKET}/silver/orders") \
.filter(col("order_id") == 3).show()
Trino SQL
-- 更新 Silver 表中 order_id=3 的 amount
UPDATE delta.silver.orders
SET amount = 100.0
WHERE order_id = 3;
-- 验证
SELECT * FROM delta.silver.orders WHERE order_id = 3;
5.3 DELETE — 删除数据
PySpark
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, f"{BUCKET}/bronze/orders")
# 删除 amount 为 null 的记录(已被 silver 过滤,bronze 中清理)
delta_table.delete(condition = col("amount").isNull())
print("✅ Bronze 删除完成")
spark.read.format("delta").load(f"{BUCKET}/bronze/orders").show()
Trino SQL
-- 删除 Bronze 中 amount 为 null 的记录
DELETE FROM delta.bronze.orders
WHERE amount IS NULL;
-- 验证
SELECT * FROM delta.bronze.orders;
5.4 UPSERT — 合并更新(Merge)
生产中最常用的模式:有则更新,无则插入。
PySpark(DeltaTable merge)
from delta.tables import DeltaTable
from pyspark.sql.functions import lit
# 模拟上游推送的更新数据(order_id=1 修改,order_id=10 新增)
upsert_data = [
(1, "Alice", "2024-01-10", 999.0, "electronics"), # 已存在,更新 amount
(10, "Frank", "2024-01-16", 200.0, "clothing"), # 不存在,新增
]
upsert_df = spark.createDataFrame(
upsert_data,
schema=["order_id", "customer", "order_date", "amount", "category"]
)
delta_table = DeltaTable.forPath(spark, f"{BUCKET}/silver/orders")
(
delta_table.alias("target")
.merge(
upsert_df.alias("source"),
condition="target.order_id = source.order_id" # 匹配条件
)
.whenMatchedUpdateAll() # 匹配到 → 更新所有字段
.whenNotMatchedInsertAll() # 未匹配 → 插入新行
.execute()
)
print("✅ Silver UPSERT 完成")
spark.read.format("delta").load(f"{BUCKET}/silver/orders") \
.filter(col("order_id").isin([1, 10])).show()
Trino SQL(MERGE 语法)
-- Trino 支持标准 MERGE 语法
MERGE INTO delta.silver.orders AS target
USING (
VALUES
(1, 'Alice', DATE '2024-01-10', 999.0, 'electronics'),
(10, 'Frank', DATE '2024-01-16', 200.0, 'clothing')
) AS source(order_id, customer, order_date, amount, category)
ON target.order_id = source.order_id
WHEN MATCHED THEN
UPDATE SET amount = source.amount, category = source.category
WHEN NOT MATCHED THEN
INSERT (order_id, customer, order_date, amount, category)
VALUES (source.order_id, source.customer, source.order_date,
source.amount, source.category);
-- 验证
SELECT * FROM delta.silver.orders WHERE order_id IN (1, 10);
5.5 Time Travel — 查看历史版本(Delta 独有)
PySpark
# 查看 Silver 表的变更历史
delta_table = DeltaTable.forPath(spark, f"{BUCKET}/silver/orders")
delta_table.history().select("version", "timestamp", "operation").show()
# 读取历史版本(version 0 = 初始状态)
old_df = (
spark.read
.format("delta")
.option("versionAsOf", 0)
.load(f"{BUCKET}/silver/orders")
)
print("📜 Version 0 数据:")
old_df.show()
Trino SQL
-- 查看 Delta 表历史
SELECT * FROM delta.silver."orders$history";
-- 按版本查询
SELECT * FROM delta.silver.orders FOR VERSION AS OF 0;
-- 按时间查询
SELECT * FROM delta.silver.orders
FOR TIMESTAMP AS OF TIMESTAMP '2024-01-11 00:00:00';
架构总结
上游数据源
│
▼
┌─────────────────────────────────────────────────┐
│ BRONZE (s3a://lakehouse/bronze/) │
│ • 原始数据,不做任何修改 │
│ • 保留脏数据(null、异常值) │
│ • append-only,数据可追溯 │
└───────────────────┬─────────────────────────────┘
│ Spark ETL(清洗规则)
▼
┌─────────────────────────────────────────────────┐
│ SILVER (s3a://lakehouse/silver/) │
│ • 清洗后的可信数据 │
│ • 类型正确、null 处理、标准化 │
│ • 支持 UPSERT / DELETE │
└───────────────────┬─────────────────────────────┘
│ Spark Aggregation(业务逻辑)
▼
┌─────────────────────────────────────────────────┐
│ GOLD (s3a://lakehouse/gold/) │
│ • 业务聚合表,直接面向消费 │
│ • BI 报表 / API / 机器学习特征 │
│ • 高性能读取,通常按业务维度分区 │
└─────────────────────────────────────────────────┘
│
▼
Trino(只读查询)
CloudBeaver / BI 工具 / API
| 职责 | Spark | Trino |
|---|---|---|
| 数据写入 | ✅ | ✅(有限支持) |
| ETL 转换 | ✅ | ❌ |
| 大规模聚合计算 | ✅ | ✅ |
| 即席 SQL 查询 | ❌ | ✅ |
| 跨表 JOIN 分析 | ❌ | ✅ |
| 连接 BI 工具 | ❌ | ✅ |
| Time Travel | ✅ | ✅ |
| UPSERT / Merge | ✅(推荐) | ✅(支持) |
为什么需要注册表?
核心原因一句话:Spark 和 Trino 是两个完全独立的系统,它们不共享任何"表的认知"。
先理解:数据 vs 元数据
当 Spark 把数据写到 MinIO 的时候,它实际上做了两件事,但很多人只注意到了第一件:
第一件事(你看得见的):把 Parquet 文件写到 s3a://lakehouse/silver/orders/ 这个路径下。这些文件是真实存在于 MinIO 中的,任何人都可以下载它们。
第二件事(你看不见的):生成了一个 _delta_log/ 文件夹,里面存放的是 Delta Table 的"元数据"——也就是这张表叫什么、有哪些列、每列是什么类型、哪些文件属于这张表、有哪些历史版本等等。
所以 MinIO 里存的东西实际上是这样的:
s3a://lakehouse/silver/orders/
├── _delta_log/ ← Delta 元数据(表的"说明书")
│ ├── 00000000000000000000.json
│ ├── 00000000000000000001.json
│ └── ...
├── part-00000-xxx.parquet ← 真实数据文件
├── part-00001-xxx.parquet
└── ...
Spark 写完之后,它自己当然知道这张表是什么,因为是它自己写的。但 Trino 完全不知道这件事的发生。
Trino 的视角
Trino 是一个查询引擎,它本身不存储任何数据,也不主动去扫描 MinIO 里有什么。它的工作方式是:你告诉我有哪些表,我才去读。
你可以把 Trino 想象成一个图书馆的管理员。MinIO 是一个巨大的仓库,里面堆满了书(数据文件)。这个管理员不会主动去仓库里把每本书翻一遍看看有什么,它只维护一份"图书目录"(catalog)。你要查某本书,先查目录,目录里有记录才能找到对应的书架位置。
Spark 写数据相当于把书搬进了仓库,但没有告诉图书馆管理员,所以目录里没有这本书的记录。 注册表这个操作,就是你去告诉管理员:"嘿,仓库的这个位置 s3a://lakehouse/silver/orders/ 有一批书,帮我把它登记到目录里,名字叫 delta.silver.orders。"
那 Hive Metastore 是怎么回事?
你可能会问:我看很多 Lakehouse 架构里,Spark 写完数据之后 Trino 直接就能查,不需要注册,那是为什么?
答案是那些架构里有一个叫 Hive Metastore(HMS)的组件,它充当了共享的"图书目录"。Spark 写完数据后会自动把表的元数据登记到 HMS 里,Trino 配置了同一个 HMS,所以它立刻就能看到新表。两个系统通过 HMS 共享了元数据,所以不需要手动注册。
你现在的环境里没有 HMS,Spark 和 Trino 各自为政。Spark 有自己的内部目录,Trino 有自己的 delta catalog,两者之间没有任何通信渠道,所以必须手动告诉 Trino"这个路径下有一张表",这就是 register_table 的作用。
一个类比把这三者串起来
| 概念 | 类比 |
|---|---|
| MinIO | 仓库,存放真实的货物(Parquet 文件) |
Delta Log(_delta_log/) |
货物自带的说明书,记录货物的规格信息 |
| Hive Metastore | 仓库的共享目录系统,所有人共用 |
| Trino Catalog | Trino 自己维护的一份目录,需要人工同步 |
register_table |
把"说明书上的信息"手动登记进 Trino 的目录 |
注册只需要做一次
最后一点值得强调:注册是一次性操作。 你注册完之后,之后 Spark 每次往这张表里追加新数据、更新数据,Trino 都能自动感知到——因为 Trino 每次查询时会去读 _delta_log/ 里的最新状态,而 _delta_log/ 是 Spark 每次写入时自动更新的。
所以注册表只是建立了一个"路径到表名的映射",之后数据的变化是 Delta Log 负责追踪的,Trino 能实时看到最新数据,不需要你重新注册。
整个流程的信息流是这样的:
Spark 写数据
│
├─→ Parquet 文件写入 MinIO
└─→ _delta_log/ 自动更新(记录变更)
register_table(只做一次)
└─→ 告诉 Trino:"这个路径 = delta.silver.orders"
Trino 查询
└─→ 找到路径 → 读 _delta_log/ 获取最新文件列表 → 读 Parquet 文件 → 返回结果
理解了这个之后,如果你之后引入 Hive Metastore,就会发现它解决的恰恰是"每次建新表都要手动注册"这个痛点——这也是为什么生产级 Lakehouse 几乎都有 HMS 的原因。