彻底搞懂 Apache Hive、Iceberg、Delta Lake 和 Hudi

——从数据仓库到现代 Lakehouse 的完整演化路线图

学习目标:读完本文后,你能够(1)清晰解释每种技术解决了什么问题;(2)在白板前画出每种格式的核心架构;(3)写出真实可运行的代码;(4)在面试中从架构师视角进行技术选型对比。

目录

  1. 为什么需要这些技术?—— 问题的根源
  2. Apache Hive —— 一切的起点
  3. Hive 的天花板 —— 为什么它不够用
  4. 开放表格式(Open Table Format)革命
  5. Apache Iceberg —— 深度解析
  6. Delta Lake —— 深度解析
  7. Apache Hudi —— 深度解析
  8. 三者横向对比 —— 特性矩阵与选型指南
  9. 与 Hive Metastore 的集成
  10. 在 Medallion 架构中的应用
  11. 真实代码示例
  12. 面试高频问题与标准答案
  13. Platform Owner 思维:战略总结

1. 为什么需要这些技术?

在理解这些技术之前,我们必须先理解它们在解决什么问题。这是工程师思维和架构师思维的根本区别:工程师问"怎么用",架构师问"为什么存在"

1.1 数据存储的两个世界

传统上,数据基础设施被一道看不见的墙分成两个世界:

数据仓库(Data Warehouse)世界,如 Teradata、Snowflake、Redshift,其特点是:支持 ACID 事务、支持 SQL、支持行级更新/删除、Schema 严格管理、查询性能极强。但代价是:存储和计算价格昂贵、厂商锁定(数据格式私有)、对非结构化数据支持差、难以支持机器学习工作负载(ML 框架需要直接读文件)。

数据湖(Data Lake)世界,如 HDFS + Parquet,其特点是:存储成本极低、支持任意格式(结构化、半结构化、非结构化)、计算引擎灵活(Spark、Flink、Presto)、ML 工作负载友好(直接读 Parquet/ORC)。但代价是:没有 ACID 事务(读脏数据问题)、没有行级更新/删除(只能覆盖整个分区)、没有 Schema 演进保障、没有数据版本管理(误操作无法回滚)。

1.2 现实业务中的痛点

一个具体的痛苦场景:假设你在金融公司,每天有一个 Spark 作业向数据湖写入交易数据。同时,分析师在用 Presto 查询同一张表。会发生什么?

  • Spark 写入中途,Presto 查到了半截数据(脏读)。
  • GDPR 要求删除某用户所有记录,你需要重写整个分区(几 TB 的操作)。
  • 两个 Spark 作业同时写同一个分区,数据互相覆盖(写写冲突)。
  • 数据工程师误删了一个分区,没有办法恢复(无版本管理)。
  • 业务字段增加了一列,旧数据和新数据 Schema 不兼容,查询报错。

这就是 Apache Iceberg、Delta Lake、Hudi 三种"开放表格式"要解决的核心问题集合:在廉价的对象存储(S3/MinIO/ADLS)上,实现数据仓库级别的数据管理能力


2. Apache Hive

2.1 Hive 是什么?

Apache Hive 诞生于 2008 年的 Facebook,其核心使命只有一个:让工程师用 SQL 操作 HDFS 上的文件

Hive 本身不存储数据,它只是一个翻译层:把 SQL 翻译成 MapReduce(后来是 Tez/Spark)任务,然后在 HDFS 上执行。

Hive 的核心组件有三个:

**HiveQL(查询语言)**是 Hive 提供的 SQL 方言,支持 SELECT、JOIN、GROUP BY、窗口函数等标准 SQL 操作,同时支持 CREATE TABLE、LOAD DATA 等 DDL 操作。

**Hive Metastore(HMS)**是整个架构中最重要的组件,也是对整个数据生态影响最深远的组件。HMS 是一个关系型数据库(默认 Derby,生产环境用 MySQL 或 PostgreSQL),它存储了所有表的元数据:表名、列名、列类型、分区信息、数据在 HDFS/S3 上的物理路径。HMS 已经成为整个数据湖生态的"目录服务"标准,Spark、Presto、Trino、Flink 都支持对接 HMS。

**SerDe(序列化/反序列化器)**是 Hive 读写文件的插件。通过 SerDe,Hive 可以读写 CSV、JSON、ORC、Parquet 等各种格式。

2.2 Hive 架构图

用户/BI工具
     │
     │ HiveQL
     ▼
┌─────────────────────────────────────┐
│          Hive Driver                │
│  ┌──────────┐  ┌─────────────────┐  │
│  │ Compiler │  │    Optimizer    │  │
│  └──────────┘  └─────────────────┘  │
└────────────────┬────────────────────┘
                 │
        ┌────────┴────────┐
        │                 │
        ▼                 ▼
┌──────────────┐  ┌──────────────────┐
│ Hive         │  │  Execution Engine│
│ Metastore    │  │  (Tez / Spark)   │
│ (MySQL/PG)   │  └────────┬─────────┘
│              │           │
│ 存储:        │           │ 读写
│ - 表定义     │           ▼
│ - 分区信息   │  ┌──────────────────┐
│ - 文件路径   │  │   HDFS / S3      │
└──────────────┘  │  (Parquet/ORC)   │
                  └──────────────────┘

2.3 Hive 表的类型

**内部表(Internal/Managed Table)**是 Hive 完全管理的表。执行 DROP TABLE 时,数据文件也会一并删除。适合完全由 Hive 管理的数据集。

**外部表(External Table)**是 Hive 只管理元数据、不管理数据文件的表。执行 DROP TABLE 时,只删除 HMS 中的记录,S3/HDFS 上的文件不受影响。在生产环境中,几乎所有表都应该建为外部表,因为这样可以防止误删数据,并且允许多个引擎访问同一份数据。

-- 内部表
CREATE TABLE transactions (
    id       BIGINT,
    amount   DECIMAL(18, 2),
    ts       TIMESTAMP
)
STORED AS PARQUET
LOCATION '/data/warehouse/transactions';

-- 外部表(生产推荐)
CREATE EXTERNAL TABLE transactions_ext (
    id       BIGINT,
    amount   DECIMAL(18, 2),
    ts       TIMESTAMP
)
PARTITIONED BY (event_date STRING)
STORED AS PARQUET
LOCATION 's3://my-bucket/data/transactions/';

2.4 Hive 分区

分区是 Hive 最重要的性能优化手段。Hive 的分区通过目录结构实现,例如按日期分区的表,其 S3 路径结构如下:

s3://bucket/transactions/
    event_date=2024-01-01/
        part-00000.parquet
        part-00001.parquet
    event_date=2024-01-02/
        part-00000.parquet
    event_date=2024-01-03/
        part-00000.parquet

查询时,WHERE event_date = '2024-01-01' 会让 Hive 只扫描 event_date=2024-01-01/ 目录下的文件,这称为分区裁剪(Partition Pruning),可以大幅减少 I/O。

-- 添加分区(告诉 Metastore 新分区的位置)
ALTER TABLE transactions_ext
ADD PARTITION (event_date='2024-01-04')
LOCATION 's3://bucket/transactions/event_date=2024-01-04/';

-- 自动发现分区(扫描目录,自动注册所有分区)
MSCK REPAIR TABLE transactions_ext;

2.5 Hive ACID(有限的事务支持)

Hive 在 0.14 版本后引入了 ACID 支持,但有严苛的限制条件:只支持 ORC 格式、必须是内部表(不支持外部表)、必须开启事务特性、性能开销显著。这些限制使得 Hive ACID 在实践中很少被采用,这也是 Iceberg/Delta/Hudi 出现的直接原因之一。


3. Hive 的天花板

Hive 的核心问题不是 SQL 能力不够,而是它的数据管理模型太过原始。具体来说,有以下几个根本性缺陷:

3.1 没有真正的事务(No True ACID)

在标准 Hive(无 ACID 模式)下,写入只是把文件放到 S3 的某个目录里。没有任何机制保证"写入过程中读者看不到中间状态"。当 Spark 正在写入一批数据时,如果另一个查询正在读,它会读到部分文件——这是**脏读(Dirty Read)**问题。

3.2 分区级别的最小操作粒度

Hive 的 UPDATE/DELETE 实现方式是:重写整个分区。如果一个分区有 100GB 的 Parquet 文件,你要修改其中 10 行记录,Hive 需要读出全部 100GB,修改 10 行,再写回 100GB——代价极其昂贵,在 GDPR 删除场景下几乎不可接受。

3.3 没有时间旅行(No Time Travel)

数据被覆盖后,历史状态永久丢失。误删了一个分区?没有快照,没有版本,无法恢复。

3.4 小文件问题(Small Files Problem)

流式数据写入(Kafka → Spark Streaming → Hive)会产生大量小文件,因为每次微批次写入都产生新文件。大量小文件会导致 NameNode 内存压力过大,查询时打开文件的开销远大于真正读数据的开销。Hive 没有自动的小文件合并机制。

3.5 Schema 演进的脆弱性

Hive 的 Schema 信息存储在 Metastore 中,而实际文件里的 Schema 与 Metastore 记录的可能不一致。字段改名、字段类型变更、列顺序调整,都可能导致查询读到错误数据甚至报错,没有任何验证机制。

3.6 并发写入冲突

多个 Spark 作业同时写同一个 Hive 分区,后写入的会覆盖先写入的,没有任何并发控制机制(Optimistic Concurrency Control 或 Pessimistic Locking 都没有)。


4. 开放表格式革命

理解了 Hive 的局限,我们就能理解 2018 年后崛起的"开放表格式"(Open Table Format)技术。

4.1 核心设计理念

开放表格式的本质是在文件之上增加一个元数据管理层(Metadata Layer)。这个层不改变底层的 Parquet/ORC 文件,而是在文件旁边维护一套精确的目录,记录:

  • 哪些文件构成了表的当前状态(文件列表快照)
  • 每次写入/修改/删除操作形成了哪个版本(版本历史)
  • 每个文件的统计信息(min/max值、null数量)用于高效的文件剪枝
  • Schema 的变更历史(Schema Evolution Log)

通过维护这个元数据层,可以在普通的对象存储(S3、MinIO、ADLS)上实现:

  • 原子提交(Atomic Commits):写入完成前,其他读者看不到新文件
  • 快照隔离(Snapshot Isolation):每次读取都基于某个一致的快照
  • 时间旅行(Time Travel):可以查询历史版本的数据
  • 高效的行级 DELETE/UPDATE:通过位图或位置向量标记删除,而非重写整个分区
  • Schema 演进(Schema Evolution):以向后兼容的方式修改表结构

4.2 三大格式的诞生背景

Apache Iceberg 诞生于 Netflix,2017 年开源,2020 年成为 Apache 顶级项目。Netflix 面临的问题是:跨 PB 级数据集的高效查询、复杂的 Schema 演进需求、多个计算引擎(Spark、Presto、Flink)访问同一张表。Iceberg 从一开始就被设计为"引擎无关"的格式。

Delta Lake 诞生于 Databricks,2019 年开源,基金会由 Linux Foundation 管理。Databricks 面临的问题是:他们的客户需要在 Spark + S3 环境下实现类似 ACID 数据库的行为,特别是流批一体(Lambda Architecture → Kappa Architecture)的统一。Delta Lake 与 Apache Spark 深度集成。

Apache Hudi(Hadoop Upserts Deletes and Incrementals)诞生于 Uber,2016 年开源,2019 年成为 Apache 顶级项目。Uber 面临的问题是:每天需要处理数十亿行的 Upsert(Update+Insert),用于司机收益、订单状态的增量更新,并需要从数据湖高效地抽取增量变更(CDC 场景)。


5. Apache Iceberg

5.1 Iceberg 的设计哲学

Iceberg 的设计有一个核心原则:表是文件的集合,表的状态由元数据文件完全描述,而不依赖任何中央服务(如 HMS)

这意味着:即使没有 Hive Metastore,你也可以仅凭一个指向元数据文件的路径,完整地读取一张 Iceberg 表的全部历史版本。HMS 在 Iceberg 中变成了可选的(只需存储"当前元数据文件在哪里"这一条信息),而不是必需的。

5.2 Iceberg 的四层元数据架构

这是 Iceberg 最重要的概念,必须完全理解:

┌─────────────────────────────────────────────────────────────┐
│                   Catalog(目录服务)                         │
│  存储:表名 → 当前 metadata.json 的路径                       │
│  实现:HMS、Glue、Nessie、JDBC、REST Catalog                  │
└──────────────────────────┬──────────────────────────────────┘
                           │ 指向
                           ▼
┌─────────────────────────────────────────────────────────────┐
│              Metadata File(元数据文件)                      │
│  文件名:metadata/v3.metadata.json                           │
│  内容:                                                       │
│  - 表的 Schema 历史(schema-id → 列定义列表)                 │
│  - 分区规格(Partition Spec)                                 │
│  - 当前快照 ID(current-snapshot-id)                        │
│  - 快照历史列表(snapshot-log)                              │
│  - 排序顺序(Sort Order)                                    │
└──────────────────────────┬──────────────────────────────────┘
                           │ 引用
                           ▼
┌─────────────────────────────────────────────────────────────┐
│              Snapshot(快照)                                 │
│  文件名:metadata/snap-3519.avro (也叫 manifest list)        │
│  内容:                                                       │
│  - 该快照包含哪些 manifest 文件                               │
│  - 每个 manifest 文件覆盖的分区范围(用于分区裁剪)           │
│  - 每个 manifest 中增加/删除的文件数统计                     │
└──────────────────────────┬──────────────────────────────────┘
                           │ 引用
                           ▼
┌─────────────────────────────────────────────────────────────┐
│              Manifest File(清单文件)                        │
│  文件名:metadata/xxxxx-m0.avro                              │
│  内容:                                                       │
│  - 一组数据文件的路径列表                                    │
│  - 每个文件的状态(ADDED / EXISTING / DELETED)              │
│  - 每个文件的列统计(min、max、null_count)                  │
│  - 每个文件的分区值                                          │
└──────────────────────────┬──────────────────────────────────┘
                           │ 引用
                           ▼
┌─────────────────────────────────────────────────────────────┐
│              Data Files(数据文件)                           │
│  格式:Parquet、ORC、Avro                                    │
│  实际存储用户数据                                            │
└─────────────────────────────────────────────────────────────┘

为什么是四层而不是两层? 这是一个精妙的工程设计。如果每次写入都修改一个中央大文件,那么并发写入时会有严重的锁竞争。通过将元数据分散为多个不可变(Immutable)的 Avro 文件,每次提交只需原子性地替换 metadata.json 中的当前快照指针,这个操作可以用对象存储的条件写入(Conditional Put)来实现乐观并发控制。

5.3 Iceberg 如何实现 ACID

以一次 INSERT 操作为例:

第一步(写数据文件):Spark 写 Job 把新数据写成若干 Parquet 文件,放到数据目录下。这些文件对任何读者都不可见,因为没有任何元数据指向它们。

第二步(写 Manifest 文件):创建一个新的 Manifest 文件,列出刚写的所有新 Parquet 文件,状态标记为 ADDED

第三步(写 Snapshot):创建一个新的 Snapshot(Manifest List),引用上一步的 Manifest 文件(可能还引用已有的其他 Manifest 文件),并记录操作类型(append)。

第四步(原子提交):更新 metadata.json,将 current-snapshot-id 指向新的 Snapshot ID。这一步通过 Catalog 的原子操作完成(例如,HMS 的事务、S3 的条件写入)。在这一步完成之前,新写入的数据对任何并发读者完全不可见;完成之后,立即对所有读者可见——这就是原子性(Atomicity)

如果第四步失败(机器宕机、网络中断),那么 Parquet 文件已经写到 S3 了,但 metadata.json 没有更新,所以这些"孤儿文件"对用户完全不可见。可以通过周期性运行 expireSnapshotsremoveOrphanFiles 来清理它们。

5.4 Iceberg 的时间旅行与快照管理

由于每次写入都创建一个新快照,而旧快照不会立即删除,Iceberg 天然支持时间旅行:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
    .getOrCreate()

# 查询最新版本
df_current = spark.read.table("db.transactions")

# 按时间戳查询历史版本(时间旅行)
df_yesterday = spark.read \
    .option("as-of-timestamp", "2024-01-01T00:00:00.000+0000") \
    .table("db.transactions")

# 按快照 ID 查询(更精确)
df_snapshot = spark.read \
    .option("snapshot-id", 3519835977348516541) \
    .table("db.transactions")

# 查看快照历史
spark.sql("SELECT * FROM db.transactions.history").show()

# 查看所有快照的文件列表
spark.sql("SELECT * FROM db.transactions.snapshots").show()

# 查看文件级别的元数据
spark.sql("SELECT * FROM db.transactions.files").show()

5.5 Iceberg 的 Schema 演进

Iceberg 使用**列 ID(Column ID)**而不是列名来追踪列,这使得 Schema 演进异常安全:

# 当前 Schema:id, amount, ts
# 以下所有操作都是向后兼容的

# 1. 增加新列(安全:读旧文件时该列返回 null)
spark.sql("""
    ALTER TABLE db.transactions 
    ADD COLUMN currency STRING AFTER amount
""")

# 2. 重命名列(安全:内部 ID 不变,只是改了显示名)
spark.sql("""
    ALTER TABLE db.transactions 
    RENAME COLUMN ts TO event_time
""")

# 3. 修改列类型(有限支持:只允许安全的类型提升,如 INT → BIGINT)
spark.sql("""
    ALTER TABLE db.transactions 
    ALTER COLUMN amount TYPE DECIMAL(20, 4)
""")

# 4. 删除列(安全:读旧文件时直接忽略该列的数据)
spark.sql("""
    ALTER TABLE db.transactions 
    DROP COLUMN old_column
""")

相比之下,如果你用普通 Hive 表重命名一列,HMS 中改了名字,但 Parquet 文件里仍然是旧名字,下次读取可能会把这列读成全 null。

5.6 Iceberg 的分区演进

这是 Iceberg 最独特的特性之一。在 Hive 中,你无法在不重写所有数据的情况下修改分区规则。而 Iceberg 支持分区演进(Partition Evolution)

# 最初按月分区
spark.sql("""
    CREATE TABLE db.events (
        id BIGINT,
        ts TIMESTAMP,
        payload STRING
    )
    USING iceberg
    PARTITIONED BY (months(ts))
""")

# 一年后数据量增长,改为按天分区
# 旧文件保持月分区,新文件使用日分区——两者共存,查询自动路由
spark.sql("""
    ALTER TABLE db.events 
    REPLACE PARTITION FIELD months(ts) WITH days(ts)
""")

Iceberg 支持的分区转换函数有:identity(col)year(ts)month(ts)day(ts)hour(ts)bucket(N, col)truncate(W, col)

5.7 Iceberg 的行级 DELETE(Position Delete vs. Equality Delete)

Iceberg v2 格式引入了两种删除文件(Delete File)机制,使行级删除不需要重写数据文件:

Position Delete File(位置删除文件):记录"第 X 个文件的第 Y 行被删除"。在读取时,引擎将数据文件和删除文件合并,跳过被标记的行。适合少量行的删除。

Equality Delete File(等值删除文件):记录"满足条件 id = 12345 的行被删除"。适合基于业务键的删除,例如 GDPR 删除用户数据。

# 行级 DELETE(不触发数据文件重写,只写删除文件)
spark.sql("DELETE FROM db.transactions WHERE user_id = 99999")

# 行级 UPDATE(读取 + 写删除文件 + 写新数据文件)
spark.sql("""
    UPDATE db.transactions 
    SET status = 'REVERSED' 
    WHERE id = 12345
""")

# MERGE INTO(Upsert 操作)
spark.sql("""
    MERGE INTO db.transactions AS t
    USING staging.new_transactions AS s
    ON t.id = s.id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
""")

6. Delta Lake

6.1 Delta Lake 的设计哲学

Delta Lake 的设计有一个核心原则:所有元数据都存储在数据目录旁边的 _delta_log/ 子目录中,以 JSON 和 Parquet 文件的形式存储事务日志

相对于 Iceberg 的"多层分散元数据"设计,Delta Lake 采用的是"集中式事务日志"设计:所有操作(insert、update、delete、schema change)都按顺序记录在 _delta_log/ 中,表的当前状态是从初始状态"重放"所有日志得到的。这在理念上更接近关系型数据库的 WAL(Write-Ahead Log)。

6.2 Delta Log 结构

s3://bucket/table/
├── _delta_log/
│   ├── 00000000000000000000.json    # 第 0 号事务:CREATE TABLE
│   ├── 00000000000000000001.json    # 第 1 号事务:第一次 INSERT
│   ├── 00000000000000000002.json    # 第 2 号事务:第二次 INSERT
│   ├── 00000000000000000003.json    # 第 3 号事务:DELETE
│   ├── ...
│   ├── 00000000000000000009.json    # 第 9 号事务
│   └── 00000000000000000010.checkpoint.parquet  # 检查点(每10个事务生成)
├── part-00000-xxx.snappy.parquet    # 数据文件
├── part-00001-xxx.snappy.parquet
└── ...

每个 JSON 日志文件记录了该事务的变更,包含多种"操作"(Action):

{
  "commitInfo": {
    "timestamp": 1704067200000,
    "operation": "WRITE",
    "operationParameters": {"mode": "Append"},
    "isolationLevel": "Serializable"
  }
}
{
  "add": {
    "path": "part-00000-abc.snappy.parquet",
    "partitionValues": {"date": "2024-01-01"},
    "size": 1024576,
    "modificationTime": 1704067200000,
    "dataChange": true,
    "stats": "{\"numRecords\":5000,\"minValues\":{\"amount\":0.01},\"maxValues\":{\"amount\":9999.99}}"
  }
}
{
  "remove": {
    "path": "part-00000-old.snappy.parquet",
    "deletionTimestamp": 1704067200000,
    "dataChange": true
  }
}

Checkpoint 文件是 Delta Lake 的性能关键。每当日志达到 10 个文件(可配置),Delta 会生成一个 Parquet 格式的 Checkpoint 文件,它是前 N 个事务日志的"压缩摘要",记录了当前所有活跃文件的列表。查询时只需读取最新 Checkpoint + 之后的少量 JSON 日志,而不需要重放全部历史。

6.3 Delta Lake 的 ACID 实现

Delta Lake 的并发控制依赖于写入日志文件时的乐观并发控制(Optimistic Concurrency Control,OCC),结合对象存储的原子操作(如 S3 的 put-if-absent)来保证事务的序列化。

事务 A(写入):                 事务 B(写入):
│                                │
├─ 读取当前版本 = 5              ├─ 读取当前版本 = 5
│                                │
├─ 写入数据文件                  ├─ 写入数据文件
│                                │
├─ 尝试写入 00006.json           ├─ 尝试写入 00006.json
│  (成功,S3 原子写入)          │  (失败!00006.json 已存在)
│                                │
│  提交成功                      ├─ 检测到冲突,重试
│                                │  (检查是否真的冲突,例如
│                                │   操作的分区不重叠则可以提交)
│                                │
│                                └─ 写入 00007.json(成功)

6.4 Delta Lake 的核心功能

时间旅行:Delta Lake 通过读取历史日志状态实现时间旅行,语法与 Iceberg 类似:

from delta.tables import DeltaTable
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# 按版本号查询
df_v3 = spark.read.format("delta") \
    .option("versionAsOf", 3) \
    .load("s3://bucket/transactions")

# 按时间戳查询
df_ts = spark.read.format("delta") \
    .option("timestampAsOf", "2024-01-01") \
    .load("s3://bucket/transactions")

# SQL 语法
spark.sql("""
    SELECT * FROM transactions TIMESTAMP AS OF '2024-01-01'
""")

spark.sql("""
    SELECT * FROM transactions VERSION AS OF 3
""")

# 恢复到历史版本(Restore)
spark.sql("""
    RESTORE TABLE transactions TO VERSION AS OF 5
""")

**MERGE INTO(Upsert)**是 Delta Lake 最常用的操作,特别是在 CDC(Change Data Capture)场景中:

from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "s3://bucket/transactions")

# CDC Upsert:有则更新,无则插入,标记为已删除的则删除
delta_table.alias("target").merge(
    source=df_changes.alias("source"),
    condition="target.id = source.id"
).whenMatchedUpdate(
    condition="source.op = 'U'",
    set={"amount": "source.amount", "status": "source.status"}
).whenMatchedDelete(
    condition="source.op = 'D'"
).whenNotMatchedInsert(
    condition="source.op = 'I'",
    values={"id": "source.id", "amount": "source.amount", "status": "source.status"}
).execute()

**OPTIMIZE(文件压缩)**解决 Delta Lake 的小文件问题:

# 压缩小文件(将多个小 Parquet 文件合并为大文件)
spark.sql("OPTIMIZE transactions")

# 压缩 + Z-Order 索引(按多列排序,提高多维查询的文件剪枝效率)
spark.sql("""
    OPTIMIZE transactions 
    ZORDER BY (user_id, event_date)
""")

# 清理过期快照(保留最近 7 天的历史,释放存储空间)
spark.sql("VACUUM transactions RETAIN 168 HOURS")

Schema 强制与演进:Delta Lake 默认开启 Schema 强制(Schema Enforcement),如果写入的 DataFrame Schema 与表 Schema 不兼容,写入会失败,防止数据污染。

# Schema 强制(默认行为):不兼容的 Schema 写入会报错,保护数据质量
df_new.write.format("delta").mode("append").save("s3://bucket/transactions")

# Schema 合并(演进):允许新增列
df_with_new_col.write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save("s3://bucket/transactions")

# 覆盖 Schema(危险操作,谨慎使用)
df_new_schema.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("s3://bucket/transactions")

**Change Data Feed(CDF)**是 Delta Lake 的一个独特功能,可以高效地查询两个版本之间的行级变更:

# 开启 CDF
spark.sql("""
    ALTER TABLE transactions 
    SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

# 查询增量变更(非常适合 ETL 管道:只处理上次运行后的变化)
df_changes = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 5) \
    .table("transactions")

# df_changes 包含额外列:_change_type(insert/update_preimage/update_postimage/delete)
# 非常适合下游系统的增量同步

6.5 Delta Lake 的隔离级别

Delta Lake 提供两种隔离级别(可以针对每张表单独设置):

**WriteSerializable(默认)**是大多数场景的最佳选择,保证写操作之间串行化,但允许读写并发(读操作看到一致的快照)。性能高于完全串行化。

**Serializable(完全串行化)**保证所有读写操作串行化,代价是更高的冲突概率(更多写入失败需要重试)。适用于对一致性要求极高的金融交易场景。


7. Apache Hudi

7.1 Hudi 的设计哲学

Hudi 的设计有一个鲜明的核心原则:为流式 Upsert 和增量消费而优化。如果说 Iceberg 在乎"大规模查询效率和引擎兼容性",Delta Lake 在乎"Spark 生态系统内的易用性和流批一体",那么 Hudi 在乎的是"高频写入(流式 Upsert)和下游的增量消费"。

Hudi 的核心抽象是Timeline(时间线),所有操作(commit、clean、compaction、rollback)都被记录在表的 .hoodie/ 目录下的 Timeline 中,以便追踪表的完整变更历史,并支持下游以增量方式消费数据("我只要 2024-01-01 10:00 之后的新增/变更数据")。

7.2 Hudi 的两种表类型

这是 Hudi 最重要的概念,也是 Hudi 区别于其他格式的核心设计:

Copy-on-Write(CoW,写时复制):每次 Upsert 操作,直接在最新版本中更新数据文件(将受影响的 Parquet 文件重写,合并新数据)。特点是读取性能好(标准 Parquet,无需合并),但写入开销大(每次写入都需要重写 Parquet 文件)。适合读多写少、批量写入的场景。

Merge-on-Read(MoR,读时合并):每次 Upsert 操作,将 Delta 数据(增量变更)写入行式格式的 Avro 文件(Delta Log),而不修改底层 Parquet 文件(Base File)。读取时,引擎实时合并 Base File 和 Delta Log。特点是写入性能极好(直接追加),但读取需要合并(有额外开销)。适合写多读少、流式写入的场景。需要定期运行 Compaction 将 Delta Log 合并回 Base File。

CoW 表结构:
s3://bucket/hudi_cow/
├── .hoodie/
│   ├── 20240101120000.commit        # 提交元数据
│   ├── 20240101120000.commit.requested
│   └── hoodie.properties
├── date=2024-01-01/
│   ├── part-00000-xxx_20240101120000.parquet   # 重写后的 Parquet
│   └── part-00001-xxx_20240101120000.parquet
└── date=2024-01-02/
    └── part-00000-xxx_20240101120000.parquet

MoR 表结构:
s3://bucket/hudi_mor/
├── .hoodie/
│   ├── 20240101120000.deltacommit   # Delta 提交(不是完整提交)
│   └── 20240101130000.compaction.requested  # 压缩计划
├── date=2024-01-01/
│   ├── part-00000-xxx.parquet             # Base File(历史数据)
│   └── part-00000-xxx_20240101120000.log  # Delta Log(增量变更,Avro 格式)
└── date=2024-01-02/
    └── part-00000-xxx.parquet

7.3 Hudi 的三种查询类型

Snapshot Query(快照查询):查询表的最新完整状态。对于 CoW 表,直接读 Parquet;对于 MoR 表,合并 Base + Delta Log。是最常用的查询类型。

Incremental Query(增量查询):只查询某个时间点之后的新增/变更记录。这是 Hudi 最独特的能力,极大地简化了增量 ETL 管道的构建:

# Hudi 增量查询
spark.read.format("hudi") \
    .option("hoodie.datasource.query.type", "incremental") \
    .option("hoodie.datasource.read.begin.instanttime", "20240101000000") \
    .option("hoodie.datasource.read.end.instanttime", "20240102000000") \
    .load("s3://bucket/hudi_transactions")

Read Optimized Query(只读优化查询,仅 MoR 表):只读 Base File(Parquet),忽略 Delta Log,牺牲数据新鲜度换取最佳读取性能。适合不需要最新数据的历史分析场景。

7.4 Hudi 的核心写入操作

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
    .getOrCreate()

# 写入 CoW 表(Upsert 操作)
df_new_data.write.format("hudi") \
    .option("hoodie.table.name", "transactions") \
    .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE") \
    .option("hoodie.datasource.write.operation", "upsert") \
    .option("hoodie.datasource.write.recordkey.field", "id") \
    .option("hoodie.datasource.write.precombine.field", "ts") \
    .option("hoodie.datasource.write.partitionpath.field", "event_date") \
    .option("hoodie.datasource.hive_sync.enable", "true") \
    .option("hoodie.datasource.hive_sync.table", "transactions") \
    .option("hoodie.datasource.hive_sync.database", "finance_db") \
    .mode("append") \
    .save("s3://bucket/hudi_transactions")

# 写入 MoR 表(适合流式高频写入)
df_stream_data.write.format("hudi") \
    .option("hoodie.table.name", "transactions_mor") \
    .option("hoodie.datasource.write.table.type", "MERGE_ON_READ") \
    .option("hoodie.datasource.write.operation", "upsert") \
    .option("hoodie.datasource.write.recordkey.field", "id") \
    .option("hoodie.datasource.write.precombine.field", "ts") \
    .mode("append") \
    .save("s3://bucket/hudi_transactions_mor")

# 删除操作
df_to_delete.write.format("hudi") \
    .option("hoodie.datasource.write.operation", "delete") \
    .option("hoodie.datasource.write.recordkey.field", "id") \
    .mode("append") \
    .save("s3://bucket/hudi_transactions")

7.5 Hudi 的 Timeline 和 Concurrency Control

Hudi 的 Timeline 存储在 .hoodie/ 目录下,记录表的完整操作历史。每个操作有三种状态:requested(计划阶段)、inflight(执行中)、completed(完成)。

Hudi 支持两种并发控制模式:单写多读(MVCC,默认),适合大多数批处理场景;多写多读(OCC,需要 ZooKeeper 或 HBase 支持),适合真正的多写并发场景(如多个 Flink Job 同时写入)。


8. 三者横向对比

8.1 核心特性对比矩阵

特性 Apache Iceberg Delta Lake Apache Hudi
诞生公司 Netflix Databricks Uber
主导引擎 引擎无关 Apache Spark Apache Spark
元数据存储 分层 JSON/Avro 文件 JSON 事务日志 + Checkpoint Timeline(.hoodie/
ACID 事务 ✅ 完整支持 ✅ 完整支持 ✅ 完整支持
时间旅行 ✅ 快照 ID / 时间戳 ✅ 版本号 / 时间戳 ✅ Instant Time
Schema 演进 ✅ 最强(列 ID 追踪) ✅ 强 ✅ 中等
分区演进 ✅ 无需重写数据 ❌ 需重写数据 ❌ 需重写数据
行级 DELETE ✅ 删除文件机制 ✅ 标记删除 ✅ CoW/MoR
Upsert 性能 中等 中等 ✅ 最强(MoR)
增量查询 有限支持 CDF(Change Data Feed) ✅ 原生增量查询
小文件管理 OPTIMIZE(手动/自动) OPTIMIZE + Auto Optimize 自动 Compaction
多引擎支持 ✅ 最佳(Spark/Flink/Trino/Hive/Presto) ✅ 好(Spark优先,其他引擎逐步改善) ✅ 好
Z-Order 索引 ✅ 排序顺序(Sort Order) ✅ OPTIMIZE ZORDER BY ✅ 多维索引
隐藏分区 ✅ 支持(用户无感知分区) ❌ 不支持 ❌ 不支持
云原生程度 ✅ 最佳(对 HMS 依赖最低) ✅ 好 ✅ 好
Databricks 集成 ✅ 支持 ✅ 原生(内置) ✅ 支持
社区生态 快速增长 最大(Databricks 驱动) 中等
学习曲线 中等 低(文档最全) 高(概念最多)

8.2 选型指南

选 Apache Iceberg 的场景:

你需要最强的多引擎兼容性(如 Trino + Spark + Flink 同时读写同一张表),这在你的 SoloLakehouse 架构中尤为重要——Trino 作为查询引擎、Spark 作为处理引擎。你使用云原生架构,想减少对 Hive Metastore 的依赖(配合 REST Catalog 或 Nessie)。你的 Schema 经常变化,需要最安全的 Schema 演进(尤其是字段改名场景)。你需要隐藏分区(对下游用户透明地按时间字段分区)。你使用 AWS Glue 或 Nessie 作为 Catalog。

选 Delta Lake 的场景:

你深度使用 Databricks 平台(Delta Lake 在 Databricks 上是一等公民,有 Auto Optimize、Auto Compaction、Predictive I/O 等独家优化)。你需要 Change Data Feed(CDF)来驱动下游服务的实时同步。你的团队以 Spark 为核心技术栈,不需要用 Trino 直接查表。你在 Azure 生态(Azure Data Lake Storage + Databricks)。

选 Apache Hudi 的场景:

你有高频流式 Upsert 需求(如 Kafka → Hudi,每秒数千次 Upsert),MoR 表类型是最佳选择。你需要原生的增量查询 API(hoodie.datasource.query.type = incremental),驱动复杂的增量 ETL 管道。你在 AWS EMR 或 AWS S3 上构建(AWS 对 Hudi 有官方支持)。你的下游需要精确感知每一行数据的变更历史(比如构建准实时数据仓库)。

8.3 关于你的 SoloLakehouse 选型

在 SoloLakehouse 中选择 Apache Iceberg 在 Gold Layer 是一个经过深思熟虑的正确决策,理由如下:

首先,Iceberg 的多引擎支持是决定性因素。你的架构中 Trino 是 BI 查询引擎、Spark/Dagster 是处理引擎,未来可能还要加 Flink。Iceberg 是这个组合中唯一经过生产验证的通用格式,Delta Lake 的 Trino 支持虽然在改善,但远不如原生 Iceberg 稳定。

其次,Iceberg 与 REST Catalog 的结合让你可以逐步摆脱 Hive Metastore 的依赖,这是更云原生的演进方向,也是向面试官展示架构眼光的好素材。

第三,Gold Layer 是最终分析层,读多写少,Iceberg 的读性能和 Schema 演进能力在这里发挥最大价值。

你需要在 ADR 中记录这个决策:为什么在有 Delta Lake 的情况下选择 Iceberg,这是一个展示架构判断力的绝佳机会。


9. 与 Hive Metastore 的集成

理解 HMS 在现代 Lakehouse 中的角色至关重要,因为几乎所有公司都有 HMS 的历史遗产。

9.1 HMS 的现代角色

在现代 Lakehouse 中,HMS 不再负责存储完整的表元数据(那是 Iceberg/Delta/Hudi 的元数据层做的事情),而只需要存储一条记录:"表 X 的 Iceberg 元数据文件在哪里"(或对于 Delta:"这是一张 Delta 表,数据路径在哪里")。

现代架构中的 HMS 角色:

HMS(MySQL)存储:
  表名 "db.transactions" → 
      table_type=EXTERNAL, 
      provider=ICEBERG,
      metadata_location=s3://bucket/transactions/metadata/v5.metadata.json

Iceberg Catalog(HMS Backend):
  当需要读 db.transactions 时:
  1. 查 HMS:metadata_location = s3://bucket/...v5.metadata.json
  2. 读 v5.metadata.json:current-snapshot-id = 3519
  3. 读 snap-3519.avro(Manifest List)
  4. 读相关的 Manifest 文件
  5. 读数据文件

9.2 在 Spark 中使用 HMS 管理 Iceberg 表

spark = SparkSession.builder \
    .config("spark.sql.extensions", 
            "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    # 使用 HMS 作为 Iceberg Catalog 后端
    .config("spark.sql.catalog.hive_prod", 
            "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.hive_prod.type", "hive") \
    .config("spark.sql.catalog.hive_prod.uri", "thrift://metastore:9083") \
    # 默认 Catalog 也使用 HMS(兼容旧代码)
    .config("spark.sql.catalog.spark_catalog", 
            "org.apache.iceberg.spark.SparkSessionCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hive") \
    .getOrCreate()

# 创建 Iceberg 表(HMS 记录 metadata_location)
spark.sql("""
    CREATE TABLE hive_prod.finance_db.transactions (
        id BIGINT,
        amount DECIMAL(18, 2),
        event_time TIMESTAMP,
        event_date DATE
    )
    USING iceberg
    PARTITIONED BY (days(event_time))
    LOCATION 's3://bucket/iceberg/transactions'
""")

# 将旧 Hive 表就地迁移为 Iceberg(生产中的常见操作)
spark.sql("""
    CALL hive_prod.system.migrate('finance_db.old_hive_table')
""")

9.3 Hive 表与 Iceberg 表的共存

在企业中,旧有 Hive 表(Parquet/ORC 格式,无 Iceberg 元数据)和新 Iceberg 表会长期共存。以下是 Trino 配置示例,演示如何同时处理两者:

# trino/catalog/hive.properties(处理旧 Hive 表)
connector.name=hive
hive.metastore.uri=thrift://metastore:9083
hive.non-managed-table-writes-enabled=true
hive.max-partitions-per-scan=1000000

# trino/catalog/iceberg.properties(处理 Iceberg 表)
connector.name=iceberg
iceberg.catalog.type=hive_metastore
hive.metastore.uri=thrift://metastore:9083

# 用户查询时按 catalog 区分:
# SELECT * FROM hive.finance_db.old_table      -- 旧 Hive 表
# SELECT * FROM iceberg.finance_db.new_table   -- Iceberg 表

10. 在 Medallion 架构中的应用

10.1 标准 Medallion 架构的表格式选择

在 Bronze → Silver → Gold 的 Medallion 架构中,三种格式的选择建议如下:

Bronze Layer(原始数据):通常使用普通 Parquet(不使用开放表格式),原因是 Bronze 层几乎只有 Append 操作,不需要 Upsert/Delete,开销越小越好。但如果需要模式追踪和精确的数据血缘,可以用 Iceberg(开销极小)。

Silver Layer(清洗/标准化数据):这是开放表格式价值最大的层。需要处理迟到数据(Late Arriving Data)的 Upsert,需要修正错误数据的 DELETE/UPDATE,需要 Schema 演进(新业务字段),需要时间旅行(数据质量审计)。三种格式都适合,选一个与你的整体生态最兼容的即可。

Gold Layer(聚合/报表数据):读多写少,优先考虑读取性能。Iceberg 在这里特别合适,因为它的统计信息(min/max/null)结合 Trino 的谓词下推效果极好。如果你在 Databricks,Delta Lake 在这里也是极佳选择。

10.2 FinLakehouse 中的具体应用

以你的 FinLakehouse 项目为例,处理 ECB 利率、Euribor 等宏观经济数据的完整流程:

# ===== Bronze Layer:原始数据落地 =====
# 直接存 Parquet,保留原始格式,不做任何转换
raw_ecb_data.write \
    .mode("append") \
    .partitionBy("ingestion_date") \
    .parquet("s3://finlakehouse/bronze/ecb_rates/")

# ===== Silver Layer:用 Iceberg,支持迟到数据修正 =====
spark.sql("""
    MERGE INTO iceberg.finance.ecb_rates AS target
    USING bronze_new_data AS source
    ON target.rate_date = source.rate_date 
    AND target.rate_type = source.rate_type
    WHEN MATCHED AND source.value != target.value THEN
        UPDATE SET value = source.value, updated_at = current_timestamp()
    WHEN NOT MATCHED THEN
        INSERT (rate_date, rate_type, value, ingested_at)
        VALUES (source.rate_date, source.rate_type, source.value, current_timestamp())
""")

# ===== Gold Layer:Iceberg + Trino 查询 =====
# 预计算 macro_state_daily 宏观状态表
spark.sql("""
    CREATE OR REPLACE TABLE iceberg.gold.macro_state_daily
    USING iceberg
    PARTITIONED BY (days(calc_date))
    AS
    SELECT 
        calc_date,
        euribor_3m,
        ecb_deposit_rate,
        hicp_yoy,
        m3_growth_yoy,
        CASE 
            WHEN ecb_deposit_rate > 3.0 AND hicp_yoy < 3.0 THEN 'TIGHT_EASING'
            WHEN ecb_deposit_rate > 3.0 AND hicp_yoy > 3.0 THEN 'TIGHT_HIGH_INFLATION'
            WHEN ecb_deposit_rate < 1.0 AND m3_growth_yoy > 8.0 THEN 'LOOSE_EXCESS_LIQUIDITY'
            ELSE 'NEUTRAL'
        END AS macro_regime
    FROM iceberg.silver.macro_indicators_combined
""")

11. 真实代码示例

11.1 完整的本地开发环境搭建(MinIO + Iceberg + Trino)

以下代码对应 SoloLakehouse 架构,展示在 MinIO(S3 兼容)上使用 Iceberg:

# spark_session_factory.py
from pyspark.sql import SparkSession

def create_iceberg_spark_session(
    app_name: str = "SoloLakehouse",
    minio_endpoint: str = "http://minio:9000",
    minio_access_key: str = "minioadmin",
    minio_secret_key: str = "minioadmin",
    metastore_uri: str = "thrift://hive-metastore:9083"
) -> SparkSession:
    """
    创建支持 Iceberg + MinIO + Hive Metastore 的 SparkSession。
    这对应 SoloLakehouse 的标准配置。
    """
    return SparkSession.builder \
        .appName(app_name) \
        \
        # Iceberg Spark 扩展(必须)
        .config("spark.sql.extensions",
                "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
        \
        # 使用 HMS 作为 Iceberg Catalog 后端
        .config("spark.sql.catalog.spark_catalog",
                "org.apache.iceberg.spark.SparkSessionCatalog") \
        .config("spark.sql.catalog.spark_catalog.type", "hive") \
        \
        # 自定义 Catalog(用于 iceberg.db.table 三段式命名)
        .config("spark.sql.catalog.iceberg",
                "org.apache.iceberg.spark.SparkCatalog") \
        .config("spark.sql.catalog.iceberg.type", "hive") \
        .config("spark.sql.catalog.iceberg.uri", metastore_uri) \
        \
        # S3/MinIO 配置
        .config("spark.hadoop.fs.s3a.endpoint", minio_endpoint) \
        .config("spark.hadoop.fs.s3a.access.key", minio_access_key) \
        .config("spark.hadoop.fs.s3a.secret.key", minio_secret_key) \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.impl",
                "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.aws.credentials.provider",
                "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
        \
        # Iceberg 默认使用 s3a
        .config("spark.sql.catalog.iceberg.s3.endpoint", minio_endpoint) \
        .config("spark.sql.catalog.iceberg.s3.path-style-access", "true") \
        \
        .enableHiveSupport() \
        .getOrCreate()

11.2 Iceberg 表的 CRUD 完整示例

# iceberg_crud_demo.py
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, LongType, DecimalType, TimestampType, StringType
import datetime

spark = create_iceberg_spark_session()

# ─────────────────────────────────────────
# 1. 建表
# ─────────────────────────────────────────
spark.sql("""
    CREATE DATABASE IF NOT EXISTS iceberg.finance
    LOCATION 's3a://sololakehouse/iceberg/finance/'
""")

spark.sql("""
    CREATE TABLE IF NOT EXISTS iceberg.finance.transactions (
        id          BIGINT      NOT NULL COMMENT '交易唯一ID',
        user_id     BIGINT      NOT NULL COMMENT '用户ID',
        amount      DECIMAL(18, 4) NOT NULL COMMENT '交易金额(EUR)',
        currency    STRING      NOT NULL COMMENT '货币代码',
        status      STRING      NOT NULL COMMENT '状态:PENDING/COMPLETED/REVERSED',
        event_time  TIMESTAMP   NOT NULL COMMENT '交易发生时间',
        created_at  TIMESTAMP   NOT NULL COMMENT '记录创建时间'
    )
    USING iceberg
    PARTITIONED BY (days(event_time))
    LOCATION 's3a://sololakehouse/iceberg/finance/transactions'
    TBLPROPERTIES (
        'write.format.default'          = 'parquet',
        'write.parquet.compression-codec' = 'snappy',
        'write.metadata.delete-after-commit.enabled' = 'true',
        'write.metadata.previous-versions-max'       = '10',
        'history.expire.min-snapshots-to-keep'       = '5'
    )
""")

# ─────────────────────────────────────────
# 2. INSERT(批量写入)
# ─────────────────────────────────────────
schema = StructType([
    StructField("id", LongType()),
    StructField("user_id", LongType()),
    StructField("amount", DecimalType(18, 4)),
    StructField("currency", StringType()),
    StructField("status", StringType()),
    StructField("event_time", TimestampType()),
    StructField("created_at", TimestampType()),
])

data = [
    (1, 1001, 150.00, "EUR", "COMPLETED", datetime.datetime(2024, 1, 1, 10, 0), datetime.datetime.now()),
    (2, 1002, 3500.00, "EUR", "PENDING",   datetime.datetime(2024, 1, 1, 11, 0), datetime.datetime.now()),
    (3, 1001, 75.50,  "EUR", "COMPLETED", datetime.datetime(2024, 1, 2, 9, 0),  datetime.datetime.now()),
]

df = spark.createDataFrame(data, schema)
df.writeTo("iceberg.finance.transactions").append()

# ─────────────────────────────────────────
# 3. UPDATE(修改状态)
# ─────────────────────────────────────────
spark.sql("""
    UPDATE iceberg.finance.transactions
    SET status = 'COMPLETED'
    WHERE id = 2 AND status = 'PENDING'
""")

# ─────────────────────────────────────────
# 4. DELETE(GDPR 删除)
# ─────────────────────────────────────────
spark.sql("""
    DELETE FROM iceberg.finance.transactions
    WHERE user_id = 9999
""")

# ─────────────────────────────────────────
# 5. 时间旅行:查看删除之前的状态
# ─────────────────────────────────────────
# 首先查看快照历史
spark.sql("SELECT snapshot_id, committed_at, operation FROM iceberg.finance.transactions.history").show()

# 查询第一个快照的内容(插入后、删除前)
spark.sql("""
    SELECT * FROM iceberg.finance.transactions 
    FOR VERSION AS OF 1  -- 替换为实际 snapshot_id
""").show()

# ─────────────────────────────────────────
# 6. 维护操作
# ─────────────────────────────────────────
# 合并小文件(生产环境中定期运行)
spark.sql("""
    CALL iceberg.system.rewrite_data_files(
        table => 'iceberg.finance.transactions',
        strategy => 'sort',
        sort_order => 'user_id ASC, event_time ASC',
        options => map('target-file-size-bytes', '134217728')  -- 128MB
    )
""")

# 清理过期快照(保留最近 7 天)
spark.sql("""
    CALL iceberg.system.expire_snapshots(
        table => 'iceberg.finance.transactions',
        older_than => TIMESTAMP '2024-01-01 00:00:00',
        retain_last => 5
    )
""")

# 删除孤儿文件(写入失败残留的文件)
spark.sql("""
    CALL iceberg.system.remove_orphan_files(
        table => 'iceberg.finance.transactions'
    )
""")

11.3 Delta Lake Upsert + CDF 示例

# delta_cdc_pipeline.py
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# 创建主表并开启 CDF
spark.sql("""
    CREATE TABLE IF NOT EXISTS finance.positions (
        position_id     BIGINT,
        portfolio_id    BIGINT,
        isin            STRING,
        quantity        DECIMAL(18, 4),
        avg_cost_price  DECIMAL(18, 4),
        last_updated    TIMESTAMP
    )
    USING delta
    PARTITIONED BY (portfolio_id)
    TBLPROPERTIES (
        'delta.enableChangeDataFeed'            = 'true',
        'delta.autoOptimize.optimizeWrite'      = 'true',
        'delta.autoOptimize.autoCompact'        = 'true',
        'delta.isolationLevel'                  = 'WriteSerializable'
    )
""")

# 从交易数据 Upsert 到持仓表
def update_positions_from_trades(trades_df):
    """
    根据新交易数据更新持仓。
    典型场景:收到一批成交记录,更新各持仓的持有量和均价。
    """
    # 计算每个持仓的净变化
    position_changes = trades_df.groupBy("portfolio_id", "isin").agg(
        F.sum("quantity").alias("qty_delta"),
        F.last("price").alias("last_price"),
        F.max("trade_time").alias("last_updated")
    )
    
    positions_table = DeltaTable.forName(spark, "finance.positions")
    
    positions_table.alias("pos").merge(
        source=position_changes.alias("chg"),
        condition="pos.portfolio_id = chg.portfolio_id AND pos.isin = chg.isin"
    ).whenMatchedUpdate(set={
        "quantity": "pos.quantity + chg.qty_delta",
        "avg_cost_price": """
            CASE 
                WHEN chg.qty_delta > 0 
                THEN (pos.quantity * pos.avg_cost_price + chg.qty_delta * chg.last_price) 
                     / (pos.quantity + chg.qty_delta)
                ELSE pos.avg_cost_price
            END
        """,
        "last_updated": "chg.last_updated"
    }).whenNotMatchedInsert(values={
        "position_id": "monotonically_increasing_id()",
        "portfolio_id": "chg.portfolio_id",
        "isin": "chg.isin",
        "quantity": "chg.qty_delta",
        "avg_cost_price": "chg.last_price",
        "last_updated": "chg.last_updated"
    }).execute()

# 下游:通过 CDF 读取持仓变化,推送到风控系统
def get_position_changes_since(start_version: int):
    """
    增量读取自 start_version 以来的持仓变化。
    这替代了轮询整张表的低效做法。
    """
    return spark.read.format("delta") \
        .option("readChangeFeed", "true") \
        .option("startingVersion", start_version) \
        .table("finance.positions") \
        .filter(F.col("_change_type").isin(["insert", "update_postimage"]))
        # _change_type 的取值:
        # insert → 新增持仓
        # update_preimage  → 更新前的旧值(用于审计)
        # update_postimage → 更新后的新值(用于下游同步)
        # delete → 删除(平仓)

12. 面试高频问题与标准答案

Q1:Iceberg、Delta、Hudi 都支持 ACID,它们的实现机制有何不同?

标准答案:三者都使用**乐观并发控制(OCC)**作为核心并发机制,但元数据管理方式不同。Iceberg 通过"原子替换 metadata.json 中的 current-snapshot-id 指针"来实现原子提交,元数据是分散的不可变文件(metadata.json → manifest list → manifest files);Delta Lake 通过"向 _delta_log/ 目录写入新的编号 JSON 文件"来实现,日志文件的写入利用对象存储的条件写入保证原子性;Hudi 通过 Timeline(.hoodie/ 目录)来记录每个操作的三阶段状态(requested → inflight → completed),以最终文件的存在与否来判断事务是否完成。三者都依赖对象存储的强一致性(AWS S3、MinIO 都提供强一致性保证)。

Q2:为什么 Iceberg 的 Schema 演进比 Delta 和 Hudi 更安全?

标准答案:Iceberg 使用**列 ID(Column ID)**而非列名来追踪列的身份。在 Parquet 文件内部,每个字段有一个唯一的字段 ID;在 Iceberg 元数据中,列名是这个 ID 的"别名"。因此,重命名一列只需修改 metadata.json 中的 Schema 定义(改名字映射),不需要修改任何 Parquet 文件。读旧文件时,引擎通过字段 ID 找到对应数据,不受列名改变影响。Delta Lake 在一定程度上也支持 Schema 演进,但不如 Iceberg 强大(Delta 的重命名列实际上是删除旧列、添加新列,旧数据中该列变为 null)。Hudi 的 Schema 演进能力更有限,通常需要更多手动干预。

Q3:什么情况下你会在 Gold Layer 用 Hudi 而不是 Iceberg?

标准答案:如果 Gold Layer 表本质上是一个需要高频 Upsert 的宽表(比如实时更新的用户画像表、实时的交易明细表),并且下游系统需要按时间窗口精确消费增量变更(而不是全量扫描),那么 Hudi 的 MoR 表类型 + 增量查询 API 是更好的选择。Iceberg 在 Upsert 性能上比 Hudi 的 MoR 差,因为 Iceberg 的 MERGE INTO 仍然需要重写受影响的数据文件(尽管通过 Equality Delete 文件可以延迟这个操作)。但如果 Gold Layer 是读多写少的聚合/报表表,Iceberg 的查询性能和多引擎兼容性更有优势。

Q4:解释 Iceberg 的 Manifest 文件为什么可以提升查询性能?

标准答案:Manifest 文件中存储了每个数据文件的列级统计信息(min value、max value、null count)。当查询引擎(如 Trino)执行 WHERE amount > 1000 时,它首先读取 Manifest 文件,检查每个数据文件的 amount 字段的 min/max:如果某个文件的 max(amount) = 500,那么这个文件肯定没有满足条件的行,可以完全跳过(这叫做数据文件剪枝,Data Skipping)。这与 Parquet 自身的 Row Group 级别统计形成两层过滤:Manifest 级别过滤掉整个文件,Parquet Row Group 级别过滤掉文件内的行组。而在传统 Hive 表中,引擎必须打开每个文件才能读取 Row Group 统计,代价高得多。

Q5:VACUUM 和 expireSnapshots 有什么区别?

标准答案:这两个操作针对不同的对象。expireSnapshots(Iceberg)/VACUUM(Delta Lake)清理的是过期的快照元数据以及该快照独占的数据文件(即没有任何其他快照引用的文件)。运行后,超出保留期的历史版本将无法再被时间旅行查询。而"孤儿文件"(Orphan Files)是指写入失败后留在 S3 上、没有被任何元数据引用的数据文件,需要通过 removeOrphanFiles(Iceberg)单独清理。两者都需要定期运行,否则 S3 存储成本会持续增长。在生产中,通常将这两个操作配置为定期 Dagster Asset 或 Airflow Task 自动执行。


13. Platform Owner 思维:战略总结

读到这里,如果你只记住一件事,那就记住这一件:这些技术不是工具,而是商业问题的工程答案

Hive 解决了"如何用 SQL 查 Hadoop 上的文件"这个工程问题,背后的商业问题是 Facebook 需要让数百名分析师用他们熟悉的 SQL 分析海量日志数据。

Iceberg 解决了"如何在廉价存储上实现企业级数据管理"这个工程问题,背后的商业问题是 Netflix 需要在不锁定任何单一云厂商的前提下,用多个计算引擎处理 PB 级数据,同时满足内容版权和用户隐私的合规要求。

Delta Lake 解决了"如何让 Spark 开发者以最低学习成本获得数据仓库的可靠性"这个工程问题,背后的商业问题是 Databricks 需要让其平台从"ETL 工具"升级为"完整的数据平台",以提高客户粘性和 ARPU。

Hudi 解决了"如何以最低延迟将实时业务变更同步到分析系统"这个工程问题,背后的商业问题是 Uber 的司机收益计算、订单状态更新需要在几分钟内反映到数据仓库,而传统的 T+1 批处理远远不够。

作为 Platform Owner,你的核心能力不是"会用这些工具",而是"能够在商业目标驱动下做出正确的架构选型,并能够向技术和非技术利益相关者清晰解释这个选型的逻辑"。

在你的 ADR(Architecture Decision Record)中,写清楚:选择 Iceberg 的理由是"多引擎兼容性(Trino + Spark)是非功能性需求,而不仅仅是偏好";写清楚"假如未来我们需要替换 Trino 为 DuckDB,Iceberg 的开放格式让这个迁移代价最小"。这才是让面试官眼前一亮、让你区别于普通候选人的思维高度。


文档版本:v1.0 | 作者:SoloLakehouse Knowledge Base | 最后更新:2024