彻底搞懂 Apache Hive、Iceberg、Delta Lake 和 Hudi
——从数据仓库到现代 Lakehouse 的完整演化路线图
学习目标:读完本文后,你能够(1)清晰解释每种技术解决了什么问题;(2)在白板前画出每种格式的核心架构;(3)写出真实可运行的代码;(4)在面试中从架构师视角进行技术选型对比。
目录
- 为什么需要这些技术?—— 问题的根源
- Apache Hive —— 一切的起点
- Hive 的天花板 —— 为什么它不够用
- 开放表格式(Open Table Format)革命
- Apache Iceberg —— 深度解析
- Delta Lake —— 深度解析
- Apache Hudi —— 深度解析
- 三者横向对比 —— 特性矩阵与选型指南
- 与 Hive Metastore 的集成
- 在 Medallion 架构中的应用
- 真实代码示例
- 面试高频问题与标准答案
- Platform Owner 思维:战略总结
1. 为什么需要这些技术?
在理解这些技术之前,我们必须先理解它们在解决什么问题。这是工程师思维和架构师思维的根本区别:工程师问"怎么用",架构师问"为什么存在"。
1.1 数据存储的两个世界
传统上,数据基础设施被一道看不见的墙分成两个世界:
数据仓库(Data Warehouse)世界,如 Teradata、Snowflake、Redshift,其特点是:支持 ACID 事务、支持 SQL、支持行级更新/删除、Schema 严格管理、查询性能极强。但代价是:存储和计算价格昂贵、厂商锁定(数据格式私有)、对非结构化数据支持差、难以支持机器学习工作负载(ML 框架需要直接读文件)。
数据湖(Data Lake)世界,如 HDFS + Parquet,其特点是:存储成本极低、支持任意格式(结构化、半结构化、非结构化)、计算引擎灵活(Spark、Flink、Presto)、ML 工作负载友好(直接读 Parquet/ORC)。但代价是:没有 ACID 事务(读脏数据问题)、没有行级更新/删除(只能覆盖整个分区)、没有 Schema 演进保障、没有数据版本管理(误操作无法回滚)。
1.2 现实业务中的痛点
一个具体的痛苦场景:假设你在金融公司,每天有一个 Spark 作业向数据湖写入交易数据。同时,分析师在用 Presto 查询同一张表。会发生什么?
- Spark 写入中途,Presto 查到了半截数据(脏读)。
- GDPR 要求删除某用户所有记录,你需要重写整个分区(几 TB 的操作)。
- 两个 Spark 作业同时写同一个分区,数据互相覆盖(写写冲突)。
- 数据工程师误删了一个分区,没有办法恢复(无版本管理)。
- 业务字段增加了一列,旧数据和新数据 Schema 不兼容,查询报错。
这就是 Apache Iceberg、Delta Lake、Hudi 三种"开放表格式"要解决的核心问题集合:在廉价的对象存储(S3/MinIO/ADLS)上,实现数据仓库级别的数据管理能力。
2. Apache Hive
2.1 Hive 是什么?
Apache Hive 诞生于 2008 年的 Facebook,其核心使命只有一个:让工程师用 SQL 操作 HDFS 上的文件。
Hive 本身不存储数据,它只是一个翻译层:把 SQL 翻译成 MapReduce(后来是 Tez/Spark)任务,然后在 HDFS 上执行。
Hive 的核心组件有三个:
**HiveQL(查询语言)**是 Hive 提供的 SQL 方言,支持 SELECT、JOIN、GROUP BY、窗口函数等标准 SQL 操作,同时支持 CREATE TABLE、LOAD DATA 等 DDL 操作。
**Hive Metastore(HMS)**是整个架构中最重要的组件,也是对整个数据生态影响最深远的组件。HMS 是一个关系型数据库(默认 Derby,生产环境用 MySQL 或 PostgreSQL),它存储了所有表的元数据:表名、列名、列类型、分区信息、数据在 HDFS/S3 上的物理路径。HMS 已经成为整个数据湖生态的"目录服务"标准,Spark、Presto、Trino、Flink 都支持对接 HMS。
**SerDe(序列化/反序列化器)**是 Hive 读写文件的插件。通过 SerDe,Hive 可以读写 CSV、JSON、ORC、Parquet 等各种格式。
2.2 Hive 架构图
用户/BI工具
│
│ HiveQL
▼
┌─────────────────────────────────────┐
│ Hive Driver │
│ ┌──────────┐ ┌─────────────────┐ │
│ │ Compiler │ │ Optimizer │ │
│ └──────────┘ └─────────────────┘ │
└────────────────┬────────────────────┘
│
┌────────┴────────┐
│ │
▼ ▼
┌──────────────┐ ┌──────────────────┐
│ Hive │ │ Execution Engine│
│ Metastore │ │ (Tez / Spark) │
│ (MySQL/PG) │ └────────┬─────────┘
│ │ │
│ 存储: │ │ 读写
│ - 表定义 │ ▼
│ - 分区信息 │ ┌──────────────────┐
│ - 文件路径 │ │ HDFS / S3 │
└──────────────┘ │ (Parquet/ORC) │
└──────────────────┘
2.3 Hive 表的类型
**内部表(Internal/Managed Table)**是 Hive 完全管理的表。执行 DROP TABLE 时,数据文件也会一并删除。适合完全由 Hive 管理的数据集。
**外部表(External Table)**是 Hive 只管理元数据、不管理数据文件的表。执行 DROP TABLE 时,只删除 HMS 中的记录,S3/HDFS 上的文件不受影响。在生产环境中,几乎所有表都应该建为外部表,因为这样可以防止误删数据,并且允许多个引擎访问同一份数据。
-- 内部表
CREATE TABLE transactions (
id BIGINT,
amount DECIMAL(18, 2),
ts TIMESTAMP
)
STORED AS PARQUET
LOCATION '/data/warehouse/transactions';
-- 外部表(生产推荐)
CREATE EXTERNAL TABLE transactions_ext (
id BIGINT,
amount DECIMAL(18, 2),
ts TIMESTAMP
)
PARTITIONED BY (event_date STRING)
STORED AS PARQUET
LOCATION 's3://my-bucket/data/transactions/';
2.4 Hive 分区
分区是 Hive 最重要的性能优化手段。Hive 的分区通过目录结构实现,例如按日期分区的表,其 S3 路径结构如下:
s3://bucket/transactions/
event_date=2024-01-01/
part-00000.parquet
part-00001.parquet
event_date=2024-01-02/
part-00000.parquet
event_date=2024-01-03/
part-00000.parquet
查询时,WHERE event_date = '2024-01-01' 会让 Hive 只扫描 event_date=2024-01-01/ 目录下的文件,这称为分区裁剪(Partition Pruning),可以大幅减少 I/O。
-- 添加分区(告诉 Metastore 新分区的位置)
ALTER TABLE transactions_ext
ADD PARTITION (event_date='2024-01-04')
LOCATION 's3://bucket/transactions/event_date=2024-01-04/';
-- 自动发现分区(扫描目录,自动注册所有分区)
MSCK REPAIR TABLE transactions_ext;
2.5 Hive ACID(有限的事务支持)
Hive 在 0.14 版本后引入了 ACID 支持,但有严苛的限制条件:只支持 ORC 格式、必须是内部表(不支持外部表)、必须开启事务特性、性能开销显著。这些限制使得 Hive ACID 在实践中很少被采用,这也是 Iceberg/Delta/Hudi 出现的直接原因之一。
3. Hive 的天花板
Hive 的核心问题不是 SQL 能力不够,而是它的数据管理模型太过原始。具体来说,有以下几个根本性缺陷:
3.1 没有真正的事务(No True ACID)
在标准 Hive(无 ACID 模式)下,写入只是把文件放到 S3 的某个目录里。没有任何机制保证"写入过程中读者看不到中间状态"。当 Spark 正在写入一批数据时,如果另一个查询正在读,它会读到部分文件——这是**脏读(Dirty Read)**问题。
3.2 分区级别的最小操作粒度
Hive 的 UPDATE/DELETE 实现方式是:重写整个分区。如果一个分区有 100GB 的 Parquet 文件,你要修改其中 10 行记录,Hive 需要读出全部 100GB,修改 10 行,再写回 100GB——代价极其昂贵,在 GDPR 删除场景下几乎不可接受。
3.3 没有时间旅行(No Time Travel)
数据被覆盖后,历史状态永久丢失。误删了一个分区?没有快照,没有版本,无法恢复。
3.4 小文件问题(Small Files Problem)
流式数据写入(Kafka → Spark Streaming → Hive)会产生大量小文件,因为每次微批次写入都产生新文件。大量小文件会导致 NameNode 内存压力过大,查询时打开文件的开销远大于真正读数据的开销。Hive 没有自动的小文件合并机制。
3.5 Schema 演进的脆弱性
Hive 的 Schema 信息存储在 Metastore 中,而实际文件里的 Schema 与 Metastore 记录的可能不一致。字段改名、字段类型变更、列顺序调整,都可能导致查询读到错误数据甚至报错,没有任何验证机制。
3.6 并发写入冲突
多个 Spark 作业同时写同一个 Hive 分区,后写入的会覆盖先写入的,没有任何并发控制机制(Optimistic Concurrency Control 或 Pessimistic Locking 都没有)。
4. 开放表格式革命
理解了 Hive 的局限,我们就能理解 2018 年后崛起的"开放表格式"(Open Table Format)技术。
4.1 核心设计理念
开放表格式的本质是在文件之上增加一个元数据管理层(Metadata Layer)。这个层不改变底层的 Parquet/ORC 文件,而是在文件旁边维护一套精确的目录,记录:
- 哪些文件构成了表的当前状态(文件列表快照)
- 每次写入/修改/删除操作形成了哪个版本(版本历史)
- 每个文件的统计信息(min/max值、null数量)用于高效的文件剪枝
- Schema 的变更历史(Schema Evolution Log)
通过维护这个元数据层,可以在普通的对象存储(S3、MinIO、ADLS)上实现:
- 原子提交(Atomic Commits):写入完成前,其他读者看不到新文件
- 快照隔离(Snapshot Isolation):每次读取都基于某个一致的快照
- 时间旅行(Time Travel):可以查询历史版本的数据
- 高效的行级 DELETE/UPDATE:通过位图或位置向量标记删除,而非重写整个分区
- Schema 演进(Schema Evolution):以向后兼容的方式修改表结构
4.2 三大格式的诞生背景
Apache Iceberg 诞生于 Netflix,2017 年开源,2020 年成为 Apache 顶级项目。Netflix 面临的问题是:跨 PB 级数据集的高效查询、复杂的 Schema 演进需求、多个计算引擎(Spark、Presto、Flink)访问同一张表。Iceberg 从一开始就被设计为"引擎无关"的格式。
Delta Lake 诞生于 Databricks,2019 年开源,基金会由 Linux Foundation 管理。Databricks 面临的问题是:他们的客户需要在 Spark + S3 环境下实现类似 ACID 数据库的行为,特别是流批一体(Lambda Architecture → Kappa Architecture)的统一。Delta Lake 与 Apache Spark 深度集成。
Apache Hudi(Hadoop Upserts Deletes and Incrementals)诞生于 Uber,2016 年开源,2019 年成为 Apache 顶级项目。Uber 面临的问题是:每天需要处理数十亿行的 Upsert(Update+Insert),用于司机收益、订单状态的增量更新,并需要从数据湖高效地抽取增量变更(CDC 场景)。
5. Apache Iceberg
5.1 Iceberg 的设计哲学
Iceberg 的设计有一个核心原则:表是文件的集合,表的状态由元数据文件完全描述,而不依赖任何中央服务(如 HMS)。
这意味着:即使没有 Hive Metastore,你也可以仅凭一个指向元数据文件的路径,完整地读取一张 Iceberg 表的全部历史版本。HMS 在 Iceberg 中变成了可选的(只需存储"当前元数据文件在哪里"这一条信息),而不是必需的。
5.2 Iceberg 的四层元数据架构
这是 Iceberg 最重要的概念,必须完全理解:
┌─────────────────────────────────────────────────────────────┐
│ Catalog(目录服务) │
│ 存储:表名 → 当前 metadata.json 的路径 │
│ 实现:HMS、Glue、Nessie、JDBC、REST Catalog │
└──────────────────────────┬──────────────────────────────────┘
│ 指向
▼
┌─────────────────────────────────────────────────────────────┐
│ Metadata File(元数据文件) │
│ 文件名:metadata/v3.metadata.json │
│ 内容: │
│ - 表的 Schema 历史(schema-id → 列定义列表) │
│ - 分区规格(Partition Spec) │
│ - 当前快照 ID(current-snapshot-id) │
│ - 快照历史列表(snapshot-log) │
│ - 排序顺序(Sort Order) │
└──────────────────────────┬──────────────────────────────────┘
│ 引用
▼
┌─────────────────────────────────────────────────────────────┐
│ Snapshot(快照) │
│ 文件名:metadata/snap-3519.avro (也叫 manifest list) │
│ 内容: │
│ - 该快照包含哪些 manifest 文件 │
│ - 每个 manifest 文件覆盖的分区范围(用于分区裁剪) │
│ - 每个 manifest 中增加/删除的文件数统计 │
└──────────────────────────┬──────────────────────────────────┘
│ 引用
▼
┌─────────────────────────────────────────────────────────────┐
│ Manifest File(清单文件) │
│ 文件名:metadata/xxxxx-m0.avro │
│ 内容: │
│ - 一组数据文件的路径列表 │
│ - 每个文件的状态(ADDED / EXISTING / DELETED) │
│ - 每个文件的列统计(min、max、null_count) │
│ - 每个文件的分区值 │
└──────────────────────────┬──────────────────────────────────┘
│ 引用
▼
┌─────────────────────────────────────────────────────────────┐
│ Data Files(数据文件) │
│ 格式:Parquet、ORC、Avro │
│ 实际存储用户数据 │
└─────────────────────────────────────────────────────────────┘
为什么是四层而不是两层? 这是一个精妙的工程设计。如果每次写入都修改一个中央大文件,那么并发写入时会有严重的锁竞争。通过将元数据分散为多个不可变(Immutable)的 Avro 文件,每次提交只需原子性地替换 metadata.json 中的当前快照指针,这个操作可以用对象存储的条件写入(Conditional Put)来实现乐观并发控制。
5.3 Iceberg 如何实现 ACID
以一次 INSERT 操作为例:
第一步(写数据文件):Spark 写 Job 把新数据写成若干 Parquet 文件,放到数据目录下。这些文件对任何读者都不可见,因为没有任何元数据指向它们。
第二步(写 Manifest 文件):创建一个新的 Manifest 文件,列出刚写的所有新 Parquet 文件,状态标记为 ADDED。
第三步(写 Snapshot):创建一个新的 Snapshot(Manifest List),引用上一步的 Manifest 文件(可能还引用已有的其他 Manifest 文件),并记录操作类型(append)。
第四步(原子提交):更新 metadata.json,将 current-snapshot-id 指向新的 Snapshot ID。这一步通过 Catalog 的原子操作完成(例如,HMS 的事务、S3 的条件写入)。在这一步完成之前,新写入的数据对任何并发读者完全不可见;完成之后,立即对所有读者可见——这就是原子性(Atomicity)。
如果第四步失败(机器宕机、网络中断),那么 Parquet 文件已经写到 S3 了,但 metadata.json 没有更新,所以这些"孤儿文件"对用户完全不可见。可以通过周期性运行 expireSnapshots 和 removeOrphanFiles 来清理它们。
5.4 Iceberg 的时间旅行与快照管理
由于每次写入都创建一个新快照,而旧快照不会立即删除,Iceberg 天然支持时间旅行:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
.getOrCreate()
# 查询最新版本
df_current = spark.read.table("db.transactions")
# 按时间戳查询历史版本(时间旅行)
df_yesterday = spark.read \
.option("as-of-timestamp", "2024-01-01T00:00:00.000+0000") \
.table("db.transactions")
# 按快照 ID 查询(更精确)
df_snapshot = spark.read \
.option("snapshot-id", 3519835977348516541) \
.table("db.transactions")
# 查看快照历史
spark.sql("SELECT * FROM db.transactions.history").show()
# 查看所有快照的文件列表
spark.sql("SELECT * FROM db.transactions.snapshots").show()
# 查看文件级别的元数据
spark.sql("SELECT * FROM db.transactions.files").show()
5.5 Iceberg 的 Schema 演进
Iceberg 使用**列 ID(Column ID)**而不是列名来追踪列,这使得 Schema 演进异常安全:
# 当前 Schema:id, amount, ts
# 以下所有操作都是向后兼容的
# 1. 增加新列(安全:读旧文件时该列返回 null)
spark.sql("""
ALTER TABLE db.transactions
ADD COLUMN currency STRING AFTER amount
""")
# 2. 重命名列(安全:内部 ID 不变,只是改了显示名)
spark.sql("""
ALTER TABLE db.transactions
RENAME COLUMN ts TO event_time
""")
# 3. 修改列类型(有限支持:只允许安全的类型提升,如 INT → BIGINT)
spark.sql("""
ALTER TABLE db.transactions
ALTER COLUMN amount TYPE DECIMAL(20, 4)
""")
# 4. 删除列(安全:读旧文件时直接忽略该列的数据)
spark.sql("""
ALTER TABLE db.transactions
DROP COLUMN old_column
""")
相比之下,如果你用普通 Hive 表重命名一列,HMS 中改了名字,但 Parquet 文件里仍然是旧名字,下次读取可能会把这列读成全 null。
5.6 Iceberg 的分区演进
这是 Iceberg 最独特的特性之一。在 Hive 中,你无法在不重写所有数据的情况下修改分区规则。而 Iceberg 支持分区演进(Partition Evolution):
# 最初按月分区
spark.sql("""
CREATE TABLE db.events (
id BIGINT,
ts TIMESTAMP,
payload STRING
)
USING iceberg
PARTITIONED BY (months(ts))
""")
# 一年后数据量增长,改为按天分区
# 旧文件保持月分区,新文件使用日分区——两者共存,查询自动路由
spark.sql("""
ALTER TABLE db.events
REPLACE PARTITION FIELD months(ts) WITH days(ts)
""")
Iceberg 支持的分区转换函数有:identity(col)、year(ts)、month(ts)、day(ts)、hour(ts)、bucket(N, col)、truncate(W, col)。
5.7 Iceberg 的行级 DELETE(Position Delete vs. Equality Delete)
Iceberg v2 格式引入了两种删除文件(Delete File)机制,使行级删除不需要重写数据文件:
Position Delete File(位置删除文件):记录"第 X 个文件的第 Y 行被删除"。在读取时,引擎将数据文件和删除文件合并,跳过被标记的行。适合少量行的删除。
Equality Delete File(等值删除文件):记录"满足条件 id = 12345 的行被删除"。适合基于业务键的删除,例如 GDPR 删除用户数据。
# 行级 DELETE(不触发数据文件重写,只写删除文件)
spark.sql("DELETE FROM db.transactions WHERE user_id = 99999")
# 行级 UPDATE(读取 + 写删除文件 + 写新数据文件)
spark.sql("""
UPDATE db.transactions
SET status = 'REVERSED'
WHERE id = 12345
""")
# MERGE INTO(Upsert 操作)
spark.sql("""
MERGE INTO db.transactions AS t
USING staging.new_transactions AS s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
6. Delta Lake
6.1 Delta Lake 的设计哲学
Delta Lake 的设计有一个核心原则:所有元数据都存储在数据目录旁边的 _delta_log/ 子目录中,以 JSON 和 Parquet 文件的形式存储事务日志。
相对于 Iceberg 的"多层分散元数据"设计,Delta Lake 采用的是"集中式事务日志"设计:所有操作(insert、update、delete、schema change)都按顺序记录在 _delta_log/ 中,表的当前状态是从初始状态"重放"所有日志得到的。这在理念上更接近关系型数据库的 WAL(Write-Ahead Log)。
6.2 Delta Log 结构
s3://bucket/table/
├── _delta_log/
│ ├── 00000000000000000000.json # 第 0 号事务:CREATE TABLE
│ ├── 00000000000000000001.json # 第 1 号事务:第一次 INSERT
│ ├── 00000000000000000002.json # 第 2 号事务:第二次 INSERT
│ ├── 00000000000000000003.json # 第 3 号事务:DELETE
│ ├── ...
│ ├── 00000000000000000009.json # 第 9 号事务
│ └── 00000000000000000010.checkpoint.parquet # 检查点(每10个事务生成)
├── part-00000-xxx.snappy.parquet # 数据文件
├── part-00001-xxx.snappy.parquet
└── ...
每个 JSON 日志文件记录了该事务的变更,包含多种"操作"(Action):
{
"commitInfo": {
"timestamp": 1704067200000,
"operation": "WRITE",
"operationParameters": {"mode": "Append"},
"isolationLevel": "Serializable"
}
}
{
"add": {
"path": "part-00000-abc.snappy.parquet",
"partitionValues": {"date": "2024-01-01"},
"size": 1024576,
"modificationTime": 1704067200000,
"dataChange": true,
"stats": "{\"numRecords\":5000,\"minValues\":{\"amount\":0.01},\"maxValues\":{\"amount\":9999.99}}"
}
}
{
"remove": {
"path": "part-00000-old.snappy.parquet",
"deletionTimestamp": 1704067200000,
"dataChange": true
}
}
Checkpoint 文件是 Delta Lake 的性能关键。每当日志达到 10 个文件(可配置),Delta 会生成一个 Parquet 格式的 Checkpoint 文件,它是前 N 个事务日志的"压缩摘要",记录了当前所有活跃文件的列表。查询时只需读取最新 Checkpoint + 之后的少量 JSON 日志,而不需要重放全部历史。
6.3 Delta Lake 的 ACID 实现
Delta Lake 的并发控制依赖于写入日志文件时的乐观并发控制(Optimistic Concurrency Control,OCC),结合对象存储的原子操作(如 S3 的 put-if-absent)来保证事务的序列化。
事务 A(写入): 事务 B(写入):
│ │
├─ 读取当前版本 = 5 ├─ 读取当前版本 = 5
│ │
├─ 写入数据文件 ├─ 写入数据文件
│ │
├─ 尝试写入 00006.json ├─ 尝试写入 00006.json
│ (成功,S3 原子写入) │ (失败!00006.json 已存在)
│ │
│ 提交成功 ├─ 检测到冲突,重试
│ │ (检查是否真的冲突,例如
│ │ 操作的分区不重叠则可以提交)
│ │
│ └─ 写入 00007.json(成功)
6.4 Delta Lake 的核心功能
时间旅行:Delta Lake 通过读取历史日志状态实现时间旅行,语法与 Iceberg 类似:
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# 按版本号查询
df_v3 = spark.read.format("delta") \
.option("versionAsOf", 3) \
.load("s3://bucket/transactions")
# 按时间戳查询
df_ts = spark.read.format("delta") \
.option("timestampAsOf", "2024-01-01") \
.load("s3://bucket/transactions")
# SQL 语法
spark.sql("""
SELECT * FROM transactions TIMESTAMP AS OF '2024-01-01'
""")
spark.sql("""
SELECT * FROM transactions VERSION AS OF 3
""")
# 恢复到历史版本(Restore)
spark.sql("""
RESTORE TABLE transactions TO VERSION AS OF 5
""")
**MERGE INTO(Upsert)**是 Delta Lake 最常用的操作,特别是在 CDC(Change Data Capture)场景中:
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "s3://bucket/transactions")
# CDC Upsert:有则更新,无则插入,标记为已删除的则删除
delta_table.alias("target").merge(
source=df_changes.alias("source"),
condition="target.id = source.id"
).whenMatchedUpdate(
condition="source.op = 'U'",
set={"amount": "source.amount", "status": "source.status"}
).whenMatchedDelete(
condition="source.op = 'D'"
).whenNotMatchedInsert(
condition="source.op = 'I'",
values={"id": "source.id", "amount": "source.amount", "status": "source.status"}
).execute()
**OPTIMIZE(文件压缩)**解决 Delta Lake 的小文件问题:
# 压缩小文件(将多个小 Parquet 文件合并为大文件)
spark.sql("OPTIMIZE transactions")
# 压缩 + Z-Order 索引(按多列排序,提高多维查询的文件剪枝效率)
spark.sql("""
OPTIMIZE transactions
ZORDER BY (user_id, event_date)
""")
# 清理过期快照(保留最近 7 天的历史,释放存储空间)
spark.sql("VACUUM transactions RETAIN 168 HOURS")
Schema 强制与演进:Delta Lake 默认开启 Schema 强制(Schema Enforcement),如果写入的 DataFrame Schema 与表 Schema 不兼容,写入会失败,防止数据污染。
# Schema 强制(默认行为):不兼容的 Schema 写入会报错,保护数据质量
df_new.write.format("delta").mode("append").save("s3://bucket/transactions")
# Schema 合并(演进):允许新增列
df_with_new_col.write.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save("s3://bucket/transactions")
# 覆盖 Schema(危险操作,谨慎使用)
df_new_schema.write.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.save("s3://bucket/transactions")
**Change Data Feed(CDF)**是 Delta Lake 的一个独特功能,可以高效地查询两个版本之间的行级变更:
# 开启 CDF
spark.sql("""
ALTER TABLE transactions
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
# 查询增量变更(非常适合 ETL 管道:只处理上次运行后的变化)
df_changes = spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 5) \
.table("transactions")
# df_changes 包含额外列:_change_type(insert/update_preimage/update_postimage/delete)
# 非常适合下游系统的增量同步
6.5 Delta Lake 的隔离级别
Delta Lake 提供两种隔离级别(可以针对每张表单独设置):
**WriteSerializable(默认)**是大多数场景的最佳选择,保证写操作之间串行化,但允许读写并发(读操作看到一致的快照)。性能高于完全串行化。
**Serializable(完全串行化)**保证所有读写操作串行化,代价是更高的冲突概率(更多写入失败需要重试)。适用于对一致性要求极高的金融交易场景。
7. Apache Hudi
7.1 Hudi 的设计哲学
Hudi 的设计有一个鲜明的核心原则:为流式 Upsert 和增量消费而优化。如果说 Iceberg 在乎"大规模查询效率和引擎兼容性",Delta Lake 在乎"Spark 生态系统内的易用性和流批一体",那么 Hudi 在乎的是"高频写入(流式 Upsert)和下游的增量消费"。
Hudi 的核心抽象是Timeline(时间线),所有操作(commit、clean、compaction、rollback)都被记录在表的 .hoodie/ 目录下的 Timeline 中,以便追踪表的完整变更历史,并支持下游以增量方式消费数据("我只要 2024-01-01 10:00 之后的新增/变更数据")。
7.2 Hudi 的两种表类型
这是 Hudi 最重要的概念,也是 Hudi 区别于其他格式的核心设计:
Copy-on-Write(CoW,写时复制):每次 Upsert 操作,直接在最新版本中更新数据文件(将受影响的 Parquet 文件重写,合并新数据)。特点是读取性能好(标准 Parquet,无需合并),但写入开销大(每次写入都需要重写 Parquet 文件)。适合读多写少、批量写入的场景。
Merge-on-Read(MoR,读时合并):每次 Upsert 操作,将 Delta 数据(增量变更)写入行式格式的 Avro 文件(Delta Log),而不修改底层 Parquet 文件(Base File)。读取时,引擎实时合并 Base File 和 Delta Log。特点是写入性能极好(直接追加),但读取需要合并(有额外开销)。适合写多读少、流式写入的场景。需要定期运行 Compaction 将 Delta Log 合并回 Base File。
CoW 表结构:
s3://bucket/hudi_cow/
├── .hoodie/
│ ├── 20240101120000.commit # 提交元数据
│ ├── 20240101120000.commit.requested
│ └── hoodie.properties
├── date=2024-01-01/
│ ├── part-00000-xxx_20240101120000.parquet # 重写后的 Parquet
│ └── part-00001-xxx_20240101120000.parquet
└── date=2024-01-02/
└── part-00000-xxx_20240101120000.parquet
MoR 表结构:
s3://bucket/hudi_mor/
├── .hoodie/
│ ├── 20240101120000.deltacommit # Delta 提交(不是完整提交)
│ └── 20240101130000.compaction.requested # 压缩计划
├── date=2024-01-01/
│ ├── part-00000-xxx.parquet # Base File(历史数据)
│ └── part-00000-xxx_20240101120000.log # Delta Log(增量变更,Avro 格式)
└── date=2024-01-02/
└── part-00000-xxx.parquet
7.3 Hudi 的三种查询类型
Snapshot Query(快照查询):查询表的最新完整状态。对于 CoW 表,直接读 Parquet;对于 MoR 表,合并 Base + Delta Log。是最常用的查询类型。
Incremental Query(增量查询):只查询某个时间点之后的新增/变更记录。这是 Hudi 最独特的能力,极大地简化了增量 ETL 管道的构建:
# Hudi 增量查询
spark.read.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", "20240101000000") \
.option("hoodie.datasource.read.end.instanttime", "20240102000000") \
.load("s3://bucket/hudi_transactions")
Read Optimized Query(只读优化查询,仅 MoR 表):只读 Base File(Parquet),忽略 Delta Log,牺牲数据新鲜度换取最佳读取性能。适合不需要最新数据的历史分析场景。
7.4 Hudi 的核心写入操作
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.getOrCreate()
# 写入 CoW 表(Upsert 操作)
df_new_data.write.format("hudi") \
.option("hoodie.table.name", "transactions") \
.option("hoodie.datasource.write.table.type", "COPY_ON_WRITE") \
.option("hoodie.datasource.write.operation", "upsert") \
.option("hoodie.datasource.write.recordkey.field", "id") \
.option("hoodie.datasource.write.precombine.field", "ts") \
.option("hoodie.datasource.write.partitionpath.field", "event_date") \
.option("hoodie.datasource.hive_sync.enable", "true") \
.option("hoodie.datasource.hive_sync.table", "transactions") \
.option("hoodie.datasource.hive_sync.database", "finance_db") \
.mode("append") \
.save("s3://bucket/hudi_transactions")
# 写入 MoR 表(适合流式高频写入)
df_stream_data.write.format("hudi") \
.option("hoodie.table.name", "transactions_mor") \
.option("hoodie.datasource.write.table.type", "MERGE_ON_READ") \
.option("hoodie.datasource.write.operation", "upsert") \
.option("hoodie.datasource.write.recordkey.field", "id") \
.option("hoodie.datasource.write.precombine.field", "ts") \
.mode("append") \
.save("s3://bucket/hudi_transactions_mor")
# 删除操作
df_to_delete.write.format("hudi") \
.option("hoodie.datasource.write.operation", "delete") \
.option("hoodie.datasource.write.recordkey.field", "id") \
.mode("append") \
.save("s3://bucket/hudi_transactions")
7.5 Hudi 的 Timeline 和 Concurrency Control
Hudi 的 Timeline 存储在 .hoodie/ 目录下,记录表的完整操作历史。每个操作有三种状态:requested(计划阶段)、inflight(执行中)、completed(完成)。
Hudi 支持两种并发控制模式:单写多读(MVCC,默认),适合大多数批处理场景;多写多读(OCC,需要 ZooKeeper 或 HBase 支持),适合真正的多写并发场景(如多个 Flink Job 同时写入)。
8. 三者横向对比
8.1 核心特性对比矩阵
| 特性 | Apache Iceberg | Delta Lake | Apache Hudi |
|---|---|---|---|
| 诞生公司 | Netflix | Databricks | Uber |
| 主导引擎 | 引擎无关 | Apache Spark | Apache Spark |
| 元数据存储 | 分层 JSON/Avro 文件 | JSON 事务日志 + Checkpoint | Timeline(.hoodie/) |
| ACID 事务 | ✅ 完整支持 | ✅ 完整支持 | ✅ 完整支持 |
| 时间旅行 | ✅ 快照 ID / 时间戳 | ✅ 版本号 / 时间戳 | ✅ Instant Time |
| Schema 演进 | ✅ 最强(列 ID 追踪) | ✅ 强 | ✅ 中等 |
| 分区演进 | ✅ 无需重写数据 | ❌ 需重写数据 | ❌ 需重写数据 |
| 行级 DELETE | ✅ 删除文件机制 | ✅ 标记删除 | ✅ CoW/MoR |
| Upsert 性能 | 中等 | 中等 | ✅ 最强(MoR) |
| 增量查询 | 有限支持 | CDF(Change Data Feed) | ✅ 原生增量查询 |
| 小文件管理 | OPTIMIZE(手动/自动) | OPTIMIZE + Auto Optimize | 自动 Compaction |
| 多引擎支持 | ✅ 最佳(Spark/Flink/Trino/Hive/Presto) | ✅ 好(Spark优先,其他引擎逐步改善) | ✅ 好 |
| Z-Order 索引 | ✅ 排序顺序(Sort Order) | ✅ OPTIMIZE ZORDER BY | ✅ 多维索引 |
| 隐藏分区 | ✅ 支持(用户无感知分区) | ❌ 不支持 | ❌ 不支持 |
| 云原生程度 | ✅ 最佳(对 HMS 依赖最低) | ✅ 好 | ✅ 好 |
| Databricks 集成 | ✅ 支持 | ✅ 原生(内置) | ✅ 支持 |
| 社区生态 | 快速增长 | 最大(Databricks 驱动) | 中等 |
| 学习曲线 | 中等 | 低(文档最全) | 高(概念最多) |
8.2 选型指南
选 Apache Iceberg 的场景:
你需要最强的多引擎兼容性(如 Trino + Spark + Flink 同时读写同一张表),这在你的 SoloLakehouse 架构中尤为重要——Trino 作为查询引擎、Spark 作为处理引擎。你使用云原生架构,想减少对 Hive Metastore 的依赖(配合 REST Catalog 或 Nessie)。你的 Schema 经常变化,需要最安全的 Schema 演进(尤其是字段改名场景)。你需要隐藏分区(对下游用户透明地按时间字段分区)。你使用 AWS Glue 或 Nessie 作为 Catalog。
选 Delta Lake 的场景:
你深度使用 Databricks 平台(Delta Lake 在 Databricks 上是一等公民,有 Auto Optimize、Auto Compaction、Predictive I/O 等独家优化)。你需要 Change Data Feed(CDF)来驱动下游服务的实时同步。你的团队以 Spark 为核心技术栈,不需要用 Trino 直接查表。你在 Azure 生态(Azure Data Lake Storage + Databricks)。
选 Apache Hudi 的场景:
你有高频流式 Upsert 需求(如 Kafka → Hudi,每秒数千次 Upsert),MoR 表类型是最佳选择。你需要原生的增量查询 API(hoodie.datasource.query.type = incremental),驱动复杂的增量 ETL 管道。你在 AWS EMR 或 AWS S3 上构建(AWS 对 Hudi 有官方支持)。你的下游需要精确感知每一行数据的变更历史(比如构建准实时数据仓库)。
8.3 关于你的 SoloLakehouse 选型
在 SoloLakehouse 中选择 Apache Iceberg 在 Gold Layer 是一个经过深思熟虑的正确决策,理由如下:
首先,Iceberg 的多引擎支持是决定性因素。你的架构中 Trino 是 BI 查询引擎、Spark/Dagster 是处理引擎,未来可能还要加 Flink。Iceberg 是这个组合中唯一经过生产验证的通用格式,Delta Lake 的 Trino 支持虽然在改善,但远不如原生 Iceberg 稳定。
其次,Iceberg 与 REST Catalog 的结合让你可以逐步摆脱 Hive Metastore 的依赖,这是更云原生的演进方向,也是向面试官展示架构眼光的好素材。
第三,Gold Layer 是最终分析层,读多写少,Iceberg 的读性能和 Schema 演进能力在这里发挥最大价值。
你需要在 ADR 中记录这个决策:为什么在有 Delta Lake 的情况下选择 Iceberg,这是一个展示架构判断力的绝佳机会。
9. 与 Hive Metastore 的集成
理解 HMS 在现代 Lakehouse 中的角色至关重要,因为几乎所有公司都有 HMS 的历史遗产。
9.1 HMS 的现代角色
在现代 Lakehouse 中,HMS 不再负责存储完整的表元数据(那是 Iceberg/Delta/Hudi 的元数据层做的事情),而只需要存储一条记录:"表 X 的 Iceberg 元数据文件在哪里"(或对于 Delta:"这是一张 Delta 表,数据路径在哪里")。
现代架构中的 HMS 角色:
HMS(MySQL)存储:
表名 "db.transactions" →
table_type=EXTERNAL,
provider=ICEBERG,
metadata_location=s3://bucket/transactions/metadata/v5.metadata.json
Iceberg Catalog(HMS Backend):
当需要读 db.transactions 时:
1. 查 HMS:metadata_location = s3://bucket/...v5.metadata.json
2. 读 v5.metadata.json:current-snapshot-id = 3519
3. 读 snap-3519.avro(Manifest List)
4. 读相关的 Manifest 文件
5. 读数据文件
9.2 在 Spark 中使用 HMS 管理 Iceberg 表
spark = SparkSession.builder \
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
# 使用 HMS 作为 Iceberg Catalog 后端
.config("spark.sql.catalog.hive_prod",
"org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.hive_prod.type", "hive") \
.config("spark.sql.catalog.hive_prod.uri", "thrift://metastore:9083") \
# 默认 Catalog 也使用 HMS(兼容旧代码)
.config("spark.sql.catalog.spark_catalog",
"org.apache.iceberg.spark.SparkSessionCatalog") \
.config("spark.sql.catalog.spark_catalog.type", "hive") \
.getOrCreate()
# 创建 Iceberg 表(HMS 记录 metadata_location)
spark.sql("""
CREATE TABLE hive_prod.finance_db.transactions (
id BIGINT,
amount DECIMAL(18, 2),
event_time TIMESTAMP,
event_date DATE
)
USING iceberg
PARTITIONED BY (days(event_time))
LOCATION 's3://bucket/iceberg/transactions'
""")
# 将旧 Hive 表就地迁移为 Iceberg(生产中的常见操作)
spark.sql("""
CALL hive_prod.system.migrate('finance_db.old_hive_table')
""")
9.3 Hive 表与 Iceberg 表的共存
在企业中,旧有 Hive 表(Parquet/ORC 格式,无 Iceberg 元数据)和新 Iceberg 表会长期共存。以下是 Trino 配置示例,演示如何同时处理两者:
# trino/catalog/hive.properties(处理旧 Hive 表)
connector.name=hive
hive.metastore.uri=thrift://metastore:9083
hive.non-managed-table-writes-enabled=true
hive.max-partitions-per-scan=1000000
# trino/catalog/iceberg.properties(处理 Iceberg 表)
connector.name=iceberg
iceberg.catalog.type=hive_metastore
hive.metastore.uri=thrift://metastore:9083
# 用户查询时按 catalog 区分:
# SELECT * FROM hive.finance_db.old_table -- 旧 Hive 表
# SELECT * FROM iceberg.finance_db.new_table -- Iceberg 表
10. 在 Medallion 架构中的应用
10.1 标准 Medallion 架构的表格式选择
在 Bronze → Silver → Gold 的 Medallion 架构中,三种格式的选择建议如下:
Bronze Layer(原始数据):通常使用普通 Parquet(不使用开放表格式),原因是 Bronze 层几乎只有 Append 操作,不需要 Upsert/Delete,开销越小越好。但如果需要模式追踪和精确的数据血缘,可以用 Iceberg(开销极小)。
Silver Layer(清洗/标准化数据):这是开放表格式价值最大的层。需要处理迟到数据(Late Arriving Data)的 Upsert,需要修正错误数据的 DELETE/UPDATE,需要 Schema 演进(新业务字段),需要时间旅行(数据质量审计)。三种格式都适合,选一个与你的整体生态最兼容的即可。
Gold Layer(聚合/报表数据):读多写少,优先考虑读取性能。Iceberg 在这里特别合适,因为它的统计信息(min/max/null)结合 Trino 的谓词下推效果极好。如果你在 Databricks,Delta Lake 在这里也是极佳选择。
10.2 FinLakehouse 中的具体应用
以你的 FinLakehouse 项目为例,处理 ECB 利率、Euribor 等宏观经济数据的完整流程:
# ===== Bronze Layer:原始数据落地 =====
# 直接存 Parquet,保留原始格式,不做任何转换
raw_ecb_data.write \
.mode("append") \
.partitionBy("ingestion_date") \
.parquet("s3://finlakehouse/bronze/ecb_rates/")
# ===== Silver Layer:用 Iceberg,支持迟到数据修正 =====
spark.sql("""
MERGE INTO iceberg.finance.ecb_rates AS target
USING bronze_new_data AS source
ON target.rate_date = source.rate_date
AND target.rate_type = source.rate_type
WHEN MATCHED AND source.value != target.value THEN
UPDATE SET value = source.value, updated_at = current_timestamp()
WHEN NOT MATCHED THEN
INSERT (rate_date, rate_type, value, ingested_at)
VALUES (source.rate_date, source.rate_type, source.value, current_timestamp())
""")
# ===== Gold Layer:Iceberg + Trino 查询 =====
# 预计算 macro_state_daily 宏观状态表
spark.sql("""
CREATE OR REPLACE TABLE iceberg.gold.macro_state_daily
USING iceberg
PARTITIONED BY (days(calc_date))
AS
SELECT
calc_date,
euribor_3m,
ecb_deposit_rate,
hicp_yoy,
m3_growth_yoy,
CASE
WHEN ecb_deposit_rate > 3.0 AND hicp_yoy < 3.0 THEN 'TIGHT_EASING'
WHEN ecb_deposit_rate > 3.0 AND hicp_yoy > 3.0 THEN 'TIGHT_HIGH_INFLATION'
WHEN ecb_deposit_rate < 1.0 AND m3_growth_yoy > 8.0 THEN 'LOOSE_EXCESS_LIQUIDITY'
ELSE 'NEUTRAL'
END AS macro_regime
FROM iceberg.silver.macro_indicators_combined
""")
11. 真实代码示例
11.1 完整的本地开发环境搭建(MinIO + Iceberg + Trino)
以下代码对应 SoloLakehouse 架构,展示在 MinIO(S3 兼容)上使用 Iceberg:
# spark_session_factory.py
from pyspark.sql import SparkSession
def create_iceberg_spark_session(
app_name: str = "SoloLakehouse",
minio_endpoint: str = "http://minio:9000",
minio_access_key: str = "minioadmin",
minio_secret_key: str = "minioadmin",
metastore_uri: str = "thrift://hive-metastore:9083"
) -> SparkSession:
"""
创建支持 Iceberg + MinIO + Hive Metastore 的 SparkSession。
这对应 SoloLakehouse 的标准配置。
"""
return SparkSession.builder \
.appName(app_name) \
\
# Iceberg Spark 扩展(必须)
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
\
# 使用 HMS 作为 Iceberg Catalog 后端
.config("spark.sql.catalog.spark_catalog",
"org.apache.iceberg.spark.SparkSessionCatalog") \
.config("spark.sql.catalog.spark_catalog.type", "hive") \
\
# 自定义 Catalog(用于 iceberg.db.table 三段式命名)
.config("spark.sql.catalog.iceberg",
"org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.iceberg.type", "hive") \
.config("spark.sql.catalog.iceberg.uri", metastore_uri) \
\
# S3/MinIO 配置
.config("spark.hadoop.fs.s3a.endpoint", minio_endpoint) \
.config("spark.hadoop.fs.s3a.access.key", minio_access_key) \
.config("spark.hadoop.fs.s3a.secret.key", minio_secret_key) \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
\
# Iceberg 默认使用 s3a
.config("spark.sql.catalog.iceberg.s3.endpoint", minio_endpoint) \
.config("spark.sql.catalog.iceberg.s3.path-style-access", "true") \
\
.enableHiveSupport() \
.getOrCreate()
11.2 Iceberg 表的 CRUD 完整示例
# iceberg_crud_demo.py
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, LongType, DecimalType, TimestampType, StringType
import datetime
spark = create_iceberg_spark_session()
# ─────────────────────────────────────────
# 1. 建表
# ─────────────────────────────────────────
spark.sql("""
CREATE DATABASE IF NOT EXISTS iceberg.finance
LOCATION 's3a://sololakehouse/iceberg/finance/'
""")
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg.finance.transactions (
id BIGINT NOT NULL COMMENT '交易唯一ID',
user_id BIGINT NOT NULL COMMENT '用户ID',
amount DECIMAL(18, 4) NOT NULL COMMENT '交易金额(EUR)',
currency STRING NOT NULL COMMENT '货币代码',
status STRING NOT NULL COMMENT '状态:PENDING/COMPLETED/REVERSED',
event_time TIMESTAMP NOT NULL COMMENT '交易发生时间',
created_at TIMESTAMP NOT NULL COMMENT '记录创建时间'
)
USING iceberg
PARTITIONED BY (days(event_time))
LOCATION 's3a://sololakehouse/iceberg/finance/transactions'
TBLPROPERTIES (
'write.format.default' = 'parquet',
'write.parquet.compression-codec' = 'snappy',
'write.metadata.delete-after-commit.enabled' = 'true',
'write.metadata.previous-versions-max' = '10',
'history.expire.min-snapshots-to-keep' = '5'
)
""")
# ─────────────────────────────────────────
# 2. INSERT(批量写入)
# ─────────────────────────────────────────
schema = StructType([
StructField("id", LongType()),
StructField("user_id", LongType()),
StructField("amount", DecimalType(18, 4)),
StructField("currency", StringType()),
StructField("status", StringType()),
StructField("event_time", TimestampType()),
StructField("created_at", TimestampType()),
])
data = [
(1, 1001, 150.00, "EUR", "COMPLETED", datetime.datetime(2024, 1, 1, 10, 0), datetime.datetime.now()),
(2, 1002, 3500.00, "EUR", "PENDING", datetime.datetime(2024, 1, 1, 11, 0), datetime.datetime.now()),
(3, 1001, 75.50, "EUR", "COMPLETED", datetime.datetime(2024, 1, 2, 9, 0), datetime.datetime.now()),
]
df = spark.createDataFrame(data, schema)
df.writeTo("iceberg.finance.transactions").append()
# ─────────────────────────────────────────
# 3. UPDATE(修改状态)
# ─────────────────────────────────────────
spark.sql("""
UPDATE iceberg.finance.transactions
SET status = 'COMPLETED'
WHERE id = 2 AND status = 'PENDING'
""")
# ─────────────────────────────────────────
# 4. DELETE(GDPR 删除)
# ─────────────────────────────────────────
spark.sql("""
DELETE FROM iceberg.finance.transactions
WHERE user_id = 9999
""")
# ─────────────────────────────────────────
# 5. 时间旅行:查看删除之前的状态
# ─────────────────────────────────────────
# 首先查看快照历史
spark.sql("SELECT snapshot_id, committed_at, operation FROM iceberg.finance.transactions.history").show()
# 查询第一个快照的内容(插入后、删除前)
spark.sql("""
SELECT * FROM iceberg.finance.transactions
FOR VERSION AS OF 1 -- 替换为实际 snapshot_id
""").show()
# ─────────────────────────────────────────
# 6. 维护操作
# ─────────────────────────────────────────
# 合并小文件(生产环境中定期运行)
spark.sql("""
CALL iceberg.system.rewrite_data_files(
table => 'iceberg.finance.transactions',
strategy => 'sort',
sort_order => 'user_id ASC, event_time ASC',
options => map('target-file-size-bytes', '134217728') -- 128MB
)
""")
# 清理过期快照(保留最近 7 天)
spark.sql("""
CALL iceberg.system.expire_snapshots(
table => 'iceberg.finance.transactions',
older_than => TIMESTAMP '2024-01-01 00:00:00',
retain_last => 5
)
""")
# 删除孤儿文件(写入失败残留的文件)
spark.sql("""
CALL iceberg.system.remove_orphan_files(
table => 'iceberg.finance.transactions'
)
""")
11.3 Delta Lake Upsert + CDF 示例
# delta_cdc_pipeline.py
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# 创建主表并开启 CDF
spark.sql("""
CREATE TABLE IF NOT EXISTS finance.positions (
position_id BIGINT,
portfolio_id BIGINT,
isin STRING,
quantity DECIMAL(18, 4),
avg_cost_price DECIMAL(18, 4),
last_updated TIMESTAMP
)
USING delta
PARTITIONED BY (portfolio_id)
TBLPROPERTIES (
'delta.enableChangeDataFeed' = 'true',
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true',
'delta.isolationLevel' = 'WriteSerializable'
)
""")
# 从交易数据 Upsert 到持仓表
def update_positions_from_trades(trades_df):
"""
根据新交易数据更新持仓。
典型场景:收到一批成交记录,更新各持仓的持有量和均价。
"""
# 计算每个持仓的净变化
position_changes = trades_df.groupBy("portfolio_id", "isin").agg(
F.sum("quantity").alias("qty_delta"),
F.last("price").alias("last_price"),
F.max("trade_time").alias("last_updated")
)
positions_table = DeltaTable.forName(spark, "finance.positions")
positions_table.alias("pos").merge(
source=position_changes.alias("chg"),
condition="pos.portfolio_id = chg.portfolio_id AND pos.isin = chg.isin"
).whenMatchedUpdate(set={
"quantity": "pos.quantity + chg.qty_delta",
"avg_cost_price": """
CASE
WHEN chg.qty_delta > 0
THEN (pos.quantity * pos.avg_cost_price + chg.qty_delta * chg.last_price)
/ (pos.quantity + chg.qty_delta)
ELSE pos.avg_cost_price
END
""",
"last_updated": "chg.last_updated"
}).whenNotMatchedInsert(values={
"position_id": "monotonically_increasing_id()",
"portfolio_id": "chg.portfolio_id",
"isin": "chg.isin",
"quantity": "chg.qty_delta",
"avg_cost_price": "chg.last_price",
"last_updated": "chg.last_updated"
}).execute()
# 下游:通过 CDF 读取持仓变化,推送到风控系统
def get_position_changes_since(start_version: int):
"""
增量读取自 start_version 以来的持仓变化。
这替代了轮询整张表的低效做法。
"""
return spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", start_version) \
.table("finance.positions") \
.filter(F.col("_change_type").isin(["insert", "update_postimage"]))
# _change_type 的取值:
# insert → 新增持仓
# update_preimage → 更新前的旧值(用于审计)
# update_postimage → 更新后的新值(用于下游同步)
# delete → 删除(平仓)
12. 面试高频问题与标准答案
Q1:Iceberg、Delta、Hudi 都支持 ACID,它们的实现机制有何不同?
标准答案:三者都使用**乐观并发控制(OCC)**作为核心并发机制,但元数据管理方式不同。Iceberg 通过"原子替换 metadata.json 中的 current-snapshot-id 指针"来实现原子提交,元数据是分散的不可变文件(metadata.json → manifest list → manifest files);Delta Lake 通过"向 _delta_log/ 目录写入新的编号 JSON 文件"来实现,日志文件的写入利用对象存储的条件写入保证原子性;Hudi 通过 Timeline(.hoodie/ 目录)来记录每个操作的三阶段状态(requested → inflight → completed),以最终文件的存在与否来判断事务是否完成。三者都依赖对象存储的强一致性(AWS S3、MinIO 都提供强一致性保证)。
Q2:为什么 Iceberg 的 Schema 演进比 Delta 和 Hudi 更安全?
标准答案:Iceberg 使用**列 ID(Column ID)**而非列名来追踪列的身份。在 Parquet 文件内部,每个字段有一个唯一的字段 ID;在 Iceberg 元数据中,列名是这个 ID 的"别名"。因此,重命名一列只需修改 metadata.json 中的 Schema 定义(改名字映射),不需要修改任何 Parquet 文件。读旧文件时,引擎通过字段 ID 找到对应数据,不受列名改变影响。Delta Lake 在一定程度上也支持 Schema 演进,但不如 Iceberg 强大(Delta 的重命名列实际上是删除旧列、添加新列,旧数据中该列变为 null)。Hudi 的 Schema 演进能力更有限,通常需要更多手动干预。
Q3:什么情况下你会在 Gold Layer 用 Hudi 而不是 Iceberg?
标准答案:如果 Gold Layer 表本质上是一个需要高频 Upsert 的宽表(比如实时更新的用户画像表、实时的交易明细表),并且下游系统需要按时间窗口精确消费增量变更(而不是全量扫描),那么 Hudi 的 MoR 表类型 + 增量查询 API 是更好的选择。Iceberg 在 Upsert 性能上比 Hudi 的 MoR 差,因为 Iceberg 的 MERGE INTO 仍然需要重写受影响的数据文件(尽管通过 Equality Delete 文件可以延迟这个操作)。但如果 Gold Layer 是读多写少的聚合/报表表,Iceberg 的查询性能和多引擎兼容性更有优势。
Q4:解释 Iceberg 的 Manifest 文件为什么可以提升查询性能?
标准答案:Manifest 文件中存储了每个数据文件的列级统计信息(min value、max value、null count)。当查询引擎(如 Trino)执行 WHERE amount > 1000 时,它首先读取 Manifest 文件,检查每个数据文件的 amount 字段的 min/max:如果某个文件的 max(amount) = 500,那么这个文件肯定没有满足条件的行,可以完全跳过(这叫做数据文件剪枝,Data Skipping)。这与 Parquet 自身的 Row Group 级别统计形成两层过滤:Manifest 级别过滤掉整个文件,Parquet Row Group 级别过滤掉文件内的行组。而在传统 Hive 表中,引擎必须打开每个文件才能读取 Row Group 统计,代价高得多。
Q5:VACUUM 和 expireSnapshots 有什么区别?
标准答案:这两个操作针对不同的对象。expireSnapshots(Iceberg)/VACUUM(Delta Lake)清理的是过期的快照元数据以及该快照独占的数据文件(即没有任何其他快照引用的文件)。运行后,超出保留期的历史版本将无法再被时间旅行查询。而"孤儿文件"(Orphan Files)是指写入失败后留在 S3 上、没有被任何元数据引用的数据文件,需要通过 removeOrphanFiles(Iceberg)单独清理。两者都需要定期运行,否则 S3 存储成本会持续增长。在生产中,通常将这两个操作配置为定期 Dagster Asset 或 Airflow Task 自动执行。
13. Platform Owner 思维:战略总结
读到这里,如果你只记住一件事,那就记住这一件:这些技术不是工具,而是商业问题的工程答案。
Hive 解决了"如何用 SQL 查 Hadoop 上的文件"这个工程问题,背后的商业问题是 Facebook 需要让数百名分析师用他们熟悉的 SQL 分析海量日志数据。
Iceberg 解决了"如何在廉价存储上实现企业级数据管理"这个工程问题,背后的商业问题是 Netflix 需要在不锁定任何单一云厂商的前提下,用多个计算引擎处理 PB 级数据,同时满足内容版权和用户隐私的合规要求。
Delta Lake 解决了"如何让 Spark 开发者以最低学习成本获得数据仓库的可靠性"这个工程问题,背后的商业问题是 Databricks 需要让其平台从"ETL 工具"升级为"完整的数据平台",以提高客户粘性和 ARPU。
Hudi 解决了"如何以最低延迟将实时业务变更同步到分析系统"这个工程问题,背后的商业问题是 Uber 的司机收益计算、订单状态更新需要在几分钟内反映到数据仓库,而传统的 T+1 批处理远远不够。
作为 Platform Owner,你的核心能力不是"会用这些工具",而是"能够在商业目标驱动下做出正确的架构选型,并能够向技术和非技术利益相关者清晰解释这个选型的逻辑"。
在你的 ADR(Architecture Decision Record)中,写清楚:选择 Iceberg 的理由是"多引擎兼容性(Trino + Spark)是非功能性需求,而不仅仅是偏好";写清楚"假如未来我们需要替换 Trino 为 DuckDB,Iceberg 的开放格式让这个迁移代价最小"。这才是让面试官眼前一亮、让你区别于普通候选人的思维高度。
文档版本:v1.0 | 作者:SoloLakehouse Knowledge Base | 最后更新:2024