SoloLakehouse 实验路线图:从零到 Databricks 全功能复刻

目标:通过 25 个渐进式实验,用自建开源组件完整复刻 Databricks 平台的核心功能。最终整合为一个端到端的 Frankfurt Airport Taxi Time Prediction 项目,展示 Platform Architect 级别的系统设计能力。

总览:Databricks 功能 → SoloLakehouse 映射

Databricks 功能 SoloLakehouse 替代 实验编号 难度
DBFS (文件系统) MinIO (S3-compatible) EXP-01
Databricks Notebooks JupyterLab + PySpark EXP-02
Delta Lake 基础 Apache Iceberg on MinIO EXP-03
Medallion Architecture Spark + Iceberg (Bronze/Silver/Gold) EXP-04 ⭐⭐
Databricks SQL Trino + CloudBeaver EXP-05 ⭐⭐
Workflows / Jobs Apache Airflow DAGs EXP-06 ⭐⭐
Auto Loader Spark + MinIO Event Detection EXP-07 ⭐⭐
Schema Evolution Iceberg Schema Evolution API EXP-08 ⭐⭐⭐
Time Travel Iceberg Snapshots + Rollback EXP-09 ⭐⭐⭐
Data Quality (Expectations) Great Expectations + Soda EXP-10 ⭐⭐⭐
Structured Streaming Spark SS + Redpanda EXP-11 ⭐⭐⭐
Delta Live Tables (DLT) Declarative ETL Framework EXP-12 ⭐⭐⭐
Unity Catalog Apache Gravitino EXP-13 ⭐⭐⭐⭐
Data Lineage OpenLineage + Marquez EXP-14 ⭐⭐⭐⭐
Secrets Management HashiCorp Vault EXP-15 ⭐⭐⭐⭐
Experiment Tracking MLflow Tracking Server EXP-16 ⭐⭐
Feature Store Feast (Offline + Online) EXP-17 ⭐⭐⭐
Model Registry MLflow Model Registry EXP-18 ⭐⭐⭐
Model Serving MLflow Serve + BentoML EXP-19 ⭐⭐⭐⭐
A/B Testing & Monitoring Evidently AI + Custom EXP-20 ⭐⭐⭐⭐
Dashboards Grafana + Superset EXP-21 ⭐⭐⭐
Platform Monitoring Prometheus + Grafana Stack EXP-22 ⭐⭐⭐
CI/CD for Pipelines GitHub Actions + dbt-style EXP-23 ⭐⭐⭐⭐
Delta Sharing delta-sharing-server EXP-24 ⭐⭐⭐⭐
Vector Search / RAG Qdrant + LangChain EXP-25 ⭐⭐⭐⭐⭐

实验路线图

实验按照从易到难分为六个等级(Level),每个 Level 内部的实验可以并行完成,但 Level 之间存在依赖关系。

Level 1 (基础层)          Level 2 (数据工程)         Level 3 (高级数据工程)
┌──────────────┐        ┌──────────────┐          ┌──────────────┐
│ EXP-01 MinIO │──┐     │ EXP-04 Medal.│──┐       │ EXP-08 Schema│
│ EXP-02 Spark │──┼────►│ EXP-05 Trino │  ├──────►│ EXP-09 TimeT.│
│ EXP-03 Iceb. │──┘     │ EXP-06 Airfl.│──┘       │ EXP-10 DQ    │
                         │ EXP-07 AutoL.│          │ EXP-11 Stream│
                         └──────────────┘          │ EXP-12 DLT   │
                                                   └──────┬───────┘
                                                          │
Level 4 (治理层)          Level 5 (ML平台)           Level 6 (运维层)
┌──────────────┐        ┌──────────────┐          ┌──────────────┐
│ EXP-13 Catal.│        │ EXP-16 Track.│          │ EXP-21 Dashb.│
│ EXP-14 Linea.│◄───────│ EXP-17 Feast │          │ EXP-22 Monit.│
│ EXP-15 Vault │        │ EXP-18 Regis.│──────────│ EXP-23 CI/CD │
└──────────────┘        │ EXP-19 Serve │          │ EXP-24 Share │
                         │ EXP-20 A/B   │          │ EXP-25 RAG   │
                         └──────────────┘          └──────────────┘

                         ╔═══════════════════════════╗
                         ║  FINAL: End-to-End 整合   ║
                         ║  Frankfurt Airport Project ║
                         ╚═══════════════════════════╝

Level 1:基础层 — 平台地基 (Week 1-2)

架构师思维:这一层对应 Databricks 的 Data Plane。你不是在"装软件",而是在设计一个存算分离的平台基础。每个组件的选型都要写 ADR (Architecture Decision Record)。面试时你说"我选择 Iceberg 因为..."比"我用了 Iceberg"有价值 10 倍。

EXP-01:对象存储层 — MinIO 作为 DBFS 替代

对标 Databricks 功能:DBFS (Databricks File System)

学习目标:理解存算分离架构中,为什么对象存储是 Lakehouse 的基石。Databricks 用 DBFS 抽象了 S3/ADLS/GCS,你用 MinIO 提供完全相同的 S3-compatible 接口。

实验步骤

Step 1 — 创建标准化 Bucket 结构

Databricks 的 DBFS 内部也是按 catalog/schema/table 的层次组织数据。你需要在 MinIO 中建立同样的逻辑分层。这不仅仅是"创建几个 bucket",而是在定义你的平台数据合约。

# 使用 mc (MinIO Client) 创建标准化存储层
# 注意:bucket 命名采用 <zone>-<purpose> 模式,这是云原生最佳实践
mc alias set solo http://minio:9000 $MINIO_ACCESS_KEY $MINIO_SECRET_KEY

# 数据湖核心 Buckets — 对应 Medallion Architecture 的物理隔离
mc mb solo/raw-landing          # 原始数据着陆区(外部数据进入的唯一入口)
mc mb solo/bronze-warehouse     # Bronze 层:原始数据 + 元数据增强
mc mb solo/silver-warehouse     # Silver 层:清洗、标准化后的数据
mc mb solo/gold-warehouse       # Gold 层:业务聚合,可直接消费

# 平台服务 Buckets — 支撑 ML 和治理
mc mb solo/mlflow-artifacts     # MLflow 模型和实验 artifacts
mc mb solo/checkpoints          # Spark Structured Streaming checkpoints
mc mb solo/tmp-processing       # 临时处理区(设置生命周期自动清理)

Step 2 — 配置 Bucket 策略和生命周期

企业级平台不仅要存数据,还要管理数据的生命周期。这是很多候选人忽略的点。

# 为 raw-landing 设置 30 天生命周期(数据被处理到 Bronze 后就可以清理)
mc ilm rule add solo/raw-landing --expiry-days 30

# 为 tmp-processing 设置 7 天生命周期
mc ilm rule add solo/tmp-processing --expiry-days 7

# 配置版本控制(对象级别的"Time Travel")
mc version enable solo/bronze-warehouse
mc version enable solo/silver-warehouse
mc version enable solo/gold-warehouse

Step 3 — 验证 S3 API 兼容性

# test_minio_s3.py — 验证 MinIO 的 S3 API 兼容性
import boto3

s3 = boto3.client(
    "s3",
    endpoint_url="http://minio:9000",
    aws_access_key_id="your-access-key",
    aws_secret_access_key="your-secret-key",
)

# 列出所有 buckets — 相当于 Databricks 里 dbutils.fs.ls("/")
for bucket in s3.list_buckets()["Buckets"]:
    print(f"  {bucket['Name']} (created: {bucket['CreationDate']})")

# 上传测试文件 — 相当于 dbutils.fs.put()
s3.put_object(
    Bucket="raw-landing",
    Key="test/hello.txt",
    Body=b"SoloLakehouse is alive!",
)

# 读取验证 — 相当于 dbutils.fs.head()
response = s3.get_object(Bucket="raw-landing", Key="test/hello.txt")
print(response["Body"].read().decode())

交付物

  • MinIO 存储拓扑图(bucket 结构 + 生命周期策略)
  • S3 兼容性测试脚本 + 通过截图
  • ADR-001: 为什么选择 MinIO 而不是直接用本地文件系统

100K+ 信号:⭐⭐ — 存储层设计是基础能力,单独不能体现高级技能,但如果你能说清楚"存算分离"和"生命周期管理"的设计理念,会加分。


EXP-02:计算引擎层 — PySpark + JupyterLab 交互开发

对标 Databricks 功能:Databricks Notebooks + Spark Clusters

学习目标:Databricks 的 Notebook 本质就是连接到 Spark 集群的 Jupyter。这个实验让你理解 Notebook → Spark Driver → Executor 的连接模型。更重要的是,你要能像 Databricks 一样让 Spark "感知" MinIO 存储。

实验步骤

Step 1 — 在 JupyterLab 中初始化 SparkSession(带完整 Iceberg + S3 配置)

这段代码是你整个平台的"启动器"。Databricks 隐藏了这些配置,但作为 Platform Engineer,你必须理解每一行。

# spark_session_factory.py — 可复用的 SparkSession 工厂
# 设计原则:一个 Notebook 一个 SparkSession,配置集中管理
from pyspark.sql import SparkSession

def create_spark_session(app_name: str = "SoloLakehouse") -> SparkSession:
    """
    创建连接到 SoloLakehouse 完整基础设施的 SparkSession。
    
    等价于 Databricks 的 spark = SparkSession.builder.getOrCreate()
    区别在于:Databricks 自动注入了所有配置,你需要显式声明。
    这恰恰是你作为 Platform Engineer 的价值 — 你理解这些配置的含义。
    """
    return (
        SparkSession.builder
        .appName(app_name)
        .master("spark://spark-master:7077")  # 连接 Spark Standalone 集群
        
        # === S3/MinIO 连接配置 ===
        # Databricks 用 dbfs:/ 抽象,你用 s3a:// 直连
        .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
        .config("spark.hadoop.fs.s3a.access.key", "your-access-key")
        .config("spark.hadoop.fs.s3a.secret.key", "your-secret-key")
        .config("spark.hadoop.fs.s3a.path.style.access", "true")
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        
        # === Iceberg Catalog 配置 ===
        # 等价于 Databricks Unity Catalog 的表注册功能
        .config("spark.sql.catalog.lakehouse", "org.apache.iceberg.spark.SparkCatalog")
        .config("spark.sql.catalog.lakehouse.type", "hive")
        .config("spark.sql.catalog.lakehouse.uri", "thrift://hive-metastore:9083")
        .config("spark.sql.catalog.lakehouse.warehouse", "s3a://bronze-warehouse/")
        
        # === Iceberg 扩展 ===
        # 启用 Iceberg 的 SQL 扩展(MERGE INTO, CALL 系统过程等)
        .config(
            "spark.sql.extensions",
            "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
        )
        
        # === 性能调优(适合单机环境)===
        .config("spark.sql.adaptive.enabled", "true")  # AQE 自适应查询
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
        .config("spark.driver.memory", "4g")
        .config("spark.executor.memory", "4g")
        
        .getOrCreate()
    )

# 使用方式 — 在任何 Notebook 中只需一行
spark = create_spark_session("EXP-02-Test")
spark.sql("SELECT 'SoloLakehouse Spark is ready!' AS status").show()

Step 2 — 验证 Spark ↔ MinIO ↔ Iceberg 完整链路

# 创建第一个 Iceberg 表 — 等价于 Databricks 的 CREATE TABLE
spark.sql("""
    CREATE DATABASE IF NOT EXISTS lakehouse.test_db
""")

spark.sql("""
    CREATE TABLE IF NOT EXISTS lakehouse.test_db.hello_world (
        id INT,
        message STRING,
        created_at TIMESTAMP
    )
    USING iceberg
    LOCATION 's3a://bronze-warehouse/test_db/hello_world'
""")

# 写入数据
from pyspark.sql.functions import current_timestamp, lit
test_data = spark.createDataFrame(
    [(1, "Hello from SoloLakehouse!"), (2, "Iceberg is working!")],
    ["id", "message"]
).withColumn("created_at", current_timestamp())

test_data.writeTo("lakehouse.test_db.hello_world").append()

# 读取验证 — 同时测试 Iceberg 元数据
spark.table("lakehouse.test_db.hello_world").show()
spark.sql("SELECT * FROM lakehouse.test_db.hello_world.snapshots").show()

交付物

  • spark_session_factory.py — 可复用的 SparkSession 配置模块
  • Notebook: 01_spark_connectivity_test.ipynb
  • 验证截图:Spark UI → MinIO → Iceberg 完整链路

100K+ 信号:⭐⭐⭐ — 能清晰解释 SparkSession 配置的每个参数说明你理解平台内部机制,而不只是"会用 Databricks"。


EXP-03:表格式层 — Apache Iceberg 核心操作

对标 Databricks 功能:Delta Lake (Delta Tables)

学习目标:Databricks 的核心差异化就在于 Delta Lake。Iceberg 提供完全等价的能力(ACID、Schema Evolution、Time Travel),而且引擎兼容性更好。这个实验要让你像操作 Delta Table 一样操作 Iceberg Table。

实验步骤

Step 1 — CRUD 全套操作

# === INSERT (追加写入) ===
# Databricks: df.write.format("delta").mode("append").saveAsTable("...")
# SoloLakehouse: 使用 Iceberg 的 writeTo API
df.writeTo("lakehouse.test_db.flights").append()

# === MERGE INTO (Upsert) ===
# 这是 Lakehouse 的杀手级功能 — CDC 场景必备
# Databricks 和 SoloLakehouse 语法完全一致
spark.sql("""
    MERGE INTO lakehouse.test_db.flights AS target
    USING updates AS source
    ON target.flight_id = source.flight_id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
""")

# === DELETE (条件删除) ===
spark.sql("""
    DELETE FROM lakehouse.test_db.flights
    WHERE departure_date < '2024-01-01'
""")

# === UPDATE (条件更新) ===
spark.sql("""
    UPDATE lakehouse.test_db.flights
    SET status = 'CANCELLED'
    WHERE delay_minutes > 180
""")

Step 2 — 分区策略(Iceberg 的隐式分区比 Delta Lake 更优雅)

# Iceberg 的隐式分区 — 不需要像 Hive/Delta 那样手动管理分区列
# 这是你面试时可以展示的 Iceberg 优势
spark.sql("""
    CREATE TABLE lakehouse.silver.flights_partitioned (
        flight_id STRING,
        airline STRING,
        departure_time TIMESTAMP,
        arrival_airport STRING,
        delay_minutes INT
    )
    USING iceberg
    PARTITIONED BY (
        days(departure_time),    -- 按天自动分区(隐式转换!)
        bucket(16, airline)      -- 航空公司按 hash bucket 分区
    )
""")
# 写入时不需要关心分区列 — Iceberg 自动处理
# 对比 Delta Lake:需要显式 .partitionBy("date_column")

交付物

  • Notebook: 02_iceberg_fundamentals.ipynb — 完整 CRUD 演示
  • 性能对比测试:Iceberg 隐式分区 vs 无分区 的查询性能差异
  • ADR-002: Iceberg vs Delta Lake 选型决策

100K+ 信号:⭐⭐⭐ — 能对比 Delta Lake 和 Iceberg 的技术差异(特别是分区策略),说明你不是只会用一个工具,而是理解底层原理。


Level 2:数据工程层 — 构建生产级 Pipeline (Week 3-5)

架构师思维:这一层对应 Databricks 的 Data Engineering 工作流。你不是在"写 ETL 脚本",而是在设计一个可观测、可恢复、可审计的数据处理平台。每个 Pipeline 都要有 SLA、数据质量门控、和失败重试策略。

EXP-04:Medallion Architecture — Bronze/Silver/Gold 数据分层

对标 Databricks 功能:Medallion Architecture (Databricks 最佳实践)

学习目标:Medallion 不仅仅是"三个文件夹"。它是一个数据质量逐层提升的契约体系。Bronze 层的"不丢数据"和 Gold 层的"业务可信赖"之间,有严格的转换规则。

实验步骤

Step 1 — Bronze Layer(原始数据保真入库)

Bronze 层的设计原则是:原始数据原封不动保存,只增加审计元数据。这样即使下游处理逻辑有 bug,你也可以从 Bronze 重新处理。

from pyspark.sql.functions import (
    current_timestamp, input_file_name, lit, col
)

def ingest_to_bronze(
    spark,
    source_path: str,
    table_name: str,
    source_format: str = "csv",
    source_system: str = "unknown",
):
    """
    Bronze 层通用摄入函数。
    
    设计原则(对标 Databricks Auto Loader 理念):
    1. 原始数据不做任何转换
    2. 自动添加审计字段(什么时候、从哪里、谁摄入的)
    3. 支持幂等写入(相同数据不重复入库)
    """
    # 读取原始数据 — 注意 header/inferSchema 的选择
    raw_df = (
        spark.read
        .format(source_format)
        .option("header", "true")
        .option("inferSchema", "true")
        .load(source_path)
    )
    
    # 添加 Bronze 审计元数据 — 这些字段在 Databricks 中由 Auto Loader 自动注入
    bronze_df = (
        raw_df
        .withColumn("_ingestion_timestamp", current_timestamp())
        .withColumn("_source_file", input_file_name())
        .withColumn("_source_system", lit(source_system))
        .withColumn("_bronze_version", lit(1))
    )
    
    # 写入 Iceberg 表 — append 模式保证不丢数据
    bronze_df.writeTo(f"lakehouse.bronze.{table_name}").append()
    
    print(f"✅ Ingested {bronze_df.count()} rows to bronze.{table_name}")
    return bronze_df

# 使用示例 — 摄入 Frankfurt Airport 航班数据
ingest_to_bronze(
    spark,
    source_path="s3a://raw-landing/flights/2025/*.csv",
    table_name="flights_raw",
    source_format="csv",
    source_system="eurocontrol",
)

Step 2 — Silver Layer(清洗 + 标准化 + 去重)

Silver 层是数据工程的核心。这里的每一个转换规则都应该是可测试、可审计的。

from pyspark.sql.functions import (
    col, when, trim, upper, to_timestamp, regexp_replace
)

def bronze_to_silver_flights(spark):
    """
    Bronze → Silver 转换:航班数据清洗。
    
    Silver 层设计原则(对标 Databricks 最佳实践):
    1. 数据类型标准化(string → timestamp, 编码统一)
    2. 去重(按业务主键)
    3. 空值处理(有明确策略,不是随意填充)
    4. 添加 Silver 层审计字段
    """
    bronze_df = spark.table("lakehouse.bronze.flights_raw")
    
    silver_df = (
        bronze_df
        # === 数据类型标准化 ===
        .withColumn("departure_time",
            to_timestamp(col("departure_time"), "yyyy-MM-dd HH:mm:ss"))
        .withColumn("arrival_time",
            to_timestamp(col("arrival_time"), "yyyy-MM-dd HH:mm:ss"))
        .withColumn("airline_code", upper(trim(col("airline_code"))))
        .withColumn("airport_icao", upper(trim(col("airport_icao"))))
        
        # === 空值处理(策略:关键字段为空则标记,非关键字段给默认值)===
        .withColumn("delay_minutes",
            when(col("delay_minutes").isNull(), 0)
            .otherwise(col("delay_minutes").cast("int")))
        .withColumn("is_valid",
            when(
                col("flight_id").isNull() | col("departure_time").isNull(),
                False
            ).otherwise(True))
        
        # === 去重:按 flight_id + departure_time 取最新记录 ===
        .dropDuplicates(["flight_id", "departure_time"])
        
        # === Silver 审计字段 ===
        .withColumn("_silver_timestamp", current_timestamp())
        .withColumn("_silver_version", lit(1))
    )
    
    # 使用 MERGE INTO 实现幂等写入(Upsert)
    silver_df.createOrReplaceTempView("silver_updates")
    
    spark.sql("""
        MERGE INTO lakehouse.silver.flights AS target
        USING silver_updates AS source
        ON target.flight_id = source.flight_id
           AND target.departure_time = source.departure_time
        WHEN MATCHED AND source._ingestion_timestamp > target._ingestion_timestamp
            THEN UPDATE SET *
        WHEN NOT MATCHED
            THEN INSERT *
    """)
    
    print(f"✅ Silver layer updated: {silver_df.count()} records processed")

bronze_to_silver_flights(spark)

Step 3 — Gold Layer(业务聚合 + 特征工程)

Gold 层直接服务于业务分析和 ML 模型。这里的表设计要和业务问题对齐。

def silver_to_gold_airport_metrics(spark):
    """
    Silver → Gold:机场运营指标聚合表。
    
    Gold 层设计原则:
    1. 面向特定业务问题(不是通用数据集)
    2. 预计算聚合(减少下游查询成本)
    3. 维度建模思维(事实表 + 维度表)
    """
    spark.sql("""
        CREATE OR REPLACE TABLE lakehouse.gold.airport_daily_metrics AS
        SELECT
            airport_icao,
            DATE(departure_time) AS report_date,
            
            -- 航班量统计
            COUNT(*) AS total_flights,
            COUNT(CASE WHEN is_valid THEN 1 END) AS valid_flights,
            
            -- 延误分析
            AVG(delay_minutes) AS avg_delay_min,
            PERCENTILE_APPROX(delay_minutes, 0.5) AS median_delay_min,
            PERCENTILE_APPROX(delay_minutes, 0.95) AS p95_delay_min,
            COUNT(CASE WHEN delay_minutes > 15 THEN 1 END) AS delayed_flights,
            
            -- 运营效率指标
            ROUND(
                COUNT(CASE WHEN delay_minutes > 15 THEN 1 END) * 100.0 / COUNT(*),
                2
            ) AS delay_rate_pct,
            
            -- 元数据
            CURRENT_TIMESTAMP() AS _gold_timestamp
        FROM lakehouse.silver.flights
        WHERE is_valid = TRUE
        GROUP BY airport_icao, DATE(departure_time)
    """)
    
    print("✅ Gold layer: airport_daily_metrics updated")

silver_to_gold_airport_metrics(spark)

交付物

  • Notebook: 03_medallion_architecture.ipynb — 完整 Bronze → Silver → Gold 流程
  • 数据流图:展示每层的输入/输出/转换规则
  • 数据质量报告:每层的记录数、空值率、去重率

100K+ 信号:⭐⭐⭐⭐ — Medallion Architecture 是面试必问。能说清 "为什么分三层" 以及 "每层的设计原则是什么" 是高级工程师的标志。


EXP-05:SQL 分析层 — Trino 作为 Databricks SQL 替代

对标 Databricks 功能:Databricks SQL Warehouse

学习目标:Databricks SQL 的核心是让分析师不需要写 Spark 代码就能查询 Lakehouse 数据。Trino 提供了完全相同的能力——它直接读取 Iceberg 表,提供毫秒级交互式查询。

实验步骤

Step 1 — Trino 连接 Iceberg Catalog

-- trino-cli 或 CloudBeaver 中执行
-- 验证 Trino 能看到 Spark 写入的 Iceberg 表
SHOW CATALOGS;        -- 应该看到 'iceberg' catalog
SHOW SCHEMAS IN iceberg;
SHOW TABLES IN iceberg.silver;

-- 直接查询 Silver 层数据 — 无需 Spark!
SELECT
    airport_icao,
    COUNT(*) AS flight_count,
    AVG(delay_minutes) AS avg_delay
FROM iceberg.silver.flights
WHERE departure_time >= TIMESTAMP '2025-01-01'
GROUP BY airport_icao
ORDER BY avg_delay DESC
LIMIT 20;

Step 2 — 跨层 JOIN 查询(Gold + Silver)

-- Databricks SQL 的典型用法:分析师写 SQL 查询 Gold 层
-- 当 Gold 层不够时,JOIN 回 Silver 层获取明细
SELECT
    g.airport_icao,
    g.report_date,
    g.avg_delay_min,
    g.delay_rate_pct,
    s.airline_code,
    s.flight_id
FROM iceberg.gold.airport_daily_metrics g
JOIN iceberg.silver.flights s
    ON g.airport_icao = s.airport_icao
    AND g.report_date = DATE(s.departure_time)
WHERE g.delay_rate_pct > 30  -- 延误率超过30%的日期
ORDER BY g.delay_rate_pct DESC;

Step 3 — 创建 Trino View(等价于 Databricks SQL View)

-- 为分析师创建业务友好的视图
CREATE OR REPLACE VIEW iceberg.gold.v_airport_performance AS
SELECT
    airport_icao AS airport,
    report_date,
    total_flights,
    delayed_flights,
    delay_rate_pct AS on_time_performance,
    avg_delay_min,
    CASE
        WHEN delay_rate_pct < 10 THEN 'EXCELLENT'
        WHEN delay_rate_pct < 20 THEN 'GOOD'
        WHEN delay_rate_pct < 30 THEN 'FAIR'
        ELSE 'POOR'
    END AS performance_grade
FROM iceberg.gold.airport_daily_metrics;

交付物

  • SQL 文件: trino_analytics_queries.sql
  • CloudBeaver 查询截图
  • 性能对比:同一查询在 Spark SQL vs Trino 的延迟对比

100K+ 信号:⭐⭐⭐ — 展示你理解 "SQL Warehouse" 和 "Spark Cluster" 的使用场景差异。Trino 用于交互式分析(秒级),Spark 用于批处理和 ML(分钟级)。


EXP-06:工作流编排 — Airflow 作为 Databricks Workflows 替代

对标 Databricks 功能:Databricks Workflows / Jobs

学习目标:Databricks Workflows 让你调度 Notebook 和 Spark 作业。Airflow 提供更灵活的 DAG 编排,支持跨系统的依赖管理。这个实验要设计一个生产级的 Medallion Pipeline DAG。

实验步骤

Step 1 — Medallion Pipeline DAG

# dags/medallion_pipeline.py
"""
SoloLakehouse Medallion Pipeline — 等价于 Databricks Workflow

设计原则:
1. 每个 Task 是幂等的(可安全重试)
2. 依赖关系明确(Bronze → Silver → Gold)
3. 失败通知 + 自动重试
4. SLA 监控
"""
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

default_args = {
    "owner": "sololakehouse",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "email_on_failure": True,
    "email": ["bill@sololakehouse.dev"],
    "sla": timedelta(hours=2),  # 整个 Pipeline 必须在 2 小时内完成
}

with DAG(
    dag_id="medallion_pipeline",
    default_args=default_args,
    description="Bronze → Silver → Gold ETL Pipeline",
    schedule_interval="@daily",       # 每天运行一次
    start_date=days_ago(1),
    catchup=False,
    tags=["etl", "medallion", "production"],
) as dag:

    # Task 1: Raw → Bronze(数据摄入)
    ingest_to_bronze = SparkSubmitOperator(
        task_id="ingest_to_bronze",
        application="/opt/spark-jobs/bronze_ingest.py",
        conn_id="spark_default",
        conf={
            "spark.sql.catalog.lakehouse": "org.apache.iceberg.spark.SparkCatalog",
            "spark.sql.catalog.lakehouse.type": "hive",
        },
        application_args=["--date", "{{ ds }}"],  # 传递执行日期
    )

    # Task 2: Bronze → Silver(清洗 + 标准化)
    transform_to_silver = SparkSubmitOperator(
        task_id="transform_to_silver",
        application="/opt/spark-jobs/silver_transform.py",
        conn_id="spark_default",
        application_args=["--date", "{{ ds }}"],
    )

    # Task 3: Silver → Gold(业务聚合)
    aggregate_to_gold = SparkSubmitOperator(
        task_id="aggregate_to_gold",
        application="/opt/spark-jobs/gold_aggregate.py",
        conn_id="spark_default",
        application_args=["--date", "{{ ds }}"],
    )

    # Task 4: 数据质量检查(门控)
    quality_check = PythonOperator(
        task_id="data_quality_check",
        python_callable=run_quality_checks,  # 见 EXP-10
    )

    # 依赖链
    ingest_to_bronze >> transform_to_silver >> aggregate_to_gold >> quality_check

交付物

  • Airflow DAG: medallion_pipeline.py
  • Spark 作业脚本: bronze_ingest.py, silver_transform.py, gold_aggregate.py
  • Airflow UI 截图:DAG 依赖图 + 执行历史

100K+ 信号:⭐⭐⭐⭐ — 能设计带 SLA、重试策略、质量门控的 DAG,说明你理解生产级工程标准。


EXP-07:自动化数据摄入 — Auto Loader 模式

对标 Databricks 功能:Auto Loader (cloudFiles)

学习目标:Databricks Auto Loader 监控云存储中的新文件并自动摄入。你需要用 MinIO 事件通知 + Airflow 传感器实现相同效果。

实验步骤

Step 1 — MinIO 事件通知配置

# 配置 MinIO 在 raw-landing 有新文件时发送 Webhook
mc event add solo/raw-landing arn:minio:sqs::1:webhook \
    --event put --suffix .csv --suffix .parquet --suffix .json

Step 2 — Airflow File Sensor(轮询方案,更稳定)

# dags/auto_loader_dag.py
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

with DAG("auto_loader", schedule_interval="*/5 * * * *") as dag:
    
    # 每 5 分钟检查 raw-landing 是否有新文件
    check_new_files = S3KeySensor(
        task_id="check_new_files",
        bucket_name="raw-landing",
        bucket_key="flights/incoming/*.csv",
        aws_conn_id="minio_s3",
        wildcard_match=True,
        poke_interval=60,       # 每分钟检查一次
        timeout=300,            # 5 分钟超时
        mode="reschedule",      # 释放 Worker 资源
    )
    
    # 检测到新文件后触发 Bronze 摄入
    trigger_bronze = SparkSubmitOperator(
        task_id="auto_ingest_bronze",
        application="/opt/spark-jobs/bronze_ingest.py",
        application_args=["--mode", "incremental"],
    )
    
    # 摄入后移动文件到已处理目录(防止重复处理)
    archive_files = PythonOperator(
        task_id="archive_processed",
        python_callable=move_to_archive,
    )
    
    check_new_files >> trigger_bronze >> archive_files

交付物

  • DAG: auto_loader_dag.py
  • MinIO 事件通知配置文档
  • 端到端演示:上传文件 → 自动检测 → 自动摄入 → 归档

100K+ 信号:⭐⭐⭐ — Auto Loader 是 Databricks 的高频功能。能解释为什么需要幂等性和文件归档策略,说明你有生产经验意识。


Level 3:高级数据工程 — 进阶能力 (Week 6-9)

架构师思维:这一层开始涉及"平台能力"而不仅仅是"数据处理"。Schema Evolution 和 Time Travel 是 Lakehouse 的核心卖点,Streaming 是实时数据平台的门槛,DLT 是声明式 ETL 的未来方向。你能在面试中对比"命令式 ETL vs 声明式 ETL"的优劣,就已经超越了 90% 的候选人。

EXP-08:Schema Evolution — 数据表结构无损变更

对标 Databricks 功能:Delta Lake Schema Evolution

学习目标:真实业务中,数据源的 schema 会不断变化(新增字段、修改类型等)。Databricks 用 Delta Lake 的 schema evolution 处理这类问题。Iceberg 提供了更强大的 schema evolution 能力。

实验步骤

# === 场景 1:安全添加新列 ===
# 上游数据突然多了一个 "weather_code" 字段
spark.sql("""
    ALTER TABLE lakehouse.silver.flights
    ADD COLUMN weather_code STRING AFTER arrival_airport
""")

# 新数据带 weather_code,旧数据自动填 NULL — 零停机!
# 在 Databricks 中需要 .option("mergeSchema", "true")
# Iceberg 原生支持,无需额外配置

# === 场景 2:列重命名(Iceberg 独有优势,Delta Lake 不支持!)===
spark.sql("""
    ALTER TABLE lakehouse.silver.flights
    RENAME COLUMN delay_minutes TO delay_min
""")
# Iceberg 通过列 ID 追踪(不是列名),所以重命名不影响历史数据

# === 场景 3:类型提升 ===
spark.sql("""
    ALTER TABLE lakehouse.silver.flights
    ALTER COLUMN delay_min TYPE BIGINT  -- INT → BIGINT,安全提升
""")

# === 验证 Schema 变更历史 ===
spark.sql("""
    SELECT * FROM lakehouse.silver.flights.metadata_log_entries
""").show(truncate=False)

交付物

  • Notebook: 04_schema_evolution.ipynb — 完整场景演示
  • Schema 变更时间线图

100K+ 信号:⭐⭐⭐⭐ — 能对比 Iceberg vs Delta Lake 的 schema evolution 能力差异(特别是列重命名),说明你深入理解了表格式的内部实现。


EXP-09:Time Travel — 数据版本回溯

对标 Databricks 功能:Delta Lake Time Travel

实验步骤

# === 查看表的所有快照(版本历史)===
snapshots = spark.sql("""
    SELECT 
        snapshot_id,
        committed_at,
        operation,
        summary
    FROM lakehouse.silver.flights.snapshots
    ORDER BY committed_at DESC
""")
snapshots.show(truncate=False)

# === 按时间回溯 ===
# Databricks: spark.read.format("delta").option("timestampAsOf", ...).load(...)
# SoloLakehouse(Iceberg 等价语法):
df_yesterday = spark.sql("""
    SELECT * FROM lakehouse.silver.flights
    FOR SYSTEM_TIME AS OF TIMESTAMP '2025-07-15 00:00:00'
""")

# === 按快照 ID 回溯 ===
df_v3 = spark.sql("""
    SELECT * FROM lakehouse.silver.flights
    FOR SYSTEM_VERSION AS OF 3
""")

# === 数据回滚(Iceberg 独有的过程调用)===
# 场景:发现今天的 Silver 处理有 bug,需要回滚到昨天的状态
spark.sql("""
    CALL lakehouse.system.rollback_to_snapshot(
        'silver.flights', 
        <snapshot_id_from_yesterday>
    )
""")

# === 增量读取(Change Data Feed)===
# Databricks: spark.readStream.format("delta").option("readChangeFeed", "true")
# Iceberg: 使用 incremental scan
spark.read.format("iceberg") \
    .option("start-snapshot-id", "123") \
    .option("end-snapshot-id", "456") \
    .load("lakehouse.silver.flights") \
    .show()

交付物

  • Notebook: 05_time_travel.ipynb
  • 回滚演练文档(模拟线上事故 → 发现 → 回滚 → 验证)

100K+ 信号:⭐⭐⭐⭐ — 数据回滚是生产环境的救命稻草。能演示完整的回滚流程说明你有运维意识。


EXP-10:数据质量框架 — Data Contracts & Quality Gates

对标 Databricks 功能:Databricks Expectations / Lakehouse Monitoring

实验步骤

# === 方案 1: 使用 Great Expectations ===
import great_expectations as gx

context = gx.get_context()

# 定义数据质量期望 — 这就是你的"数据契约"
validator = context.sources.pandas_default.read_dataframe(silver_df.toPandas())
validator.expect_column_to_exist("flight_id")
validator.expect_column_values_to_not_be_null("flight_id")
validator.expect_column_values_to_not_be_null("departure_time")
validator.expect_column_values_to_be_between(
    "delay_minutes", min_value=-60, max_value=1440
)
validator.expect_column_values_to_match_regex(
    "airport_icao", r"^[A-Z]{4}$"
)

results = validator.validate()
if not results.success:
    raise ValueError(f"Data quality check FAILED: {results}")

# === 方案 2: 轻量级自定义质量框架 ===
class DataQualityCheck:
    """
    轻量级数据质量检查器。
    设计原则:每个检查返回 pass/fail + 指标,集成到 Airflow DAG 中。
    """
    def __init__(self, spark, table_name):
        self.df = spark.table(table_name)
        self.results = []
    
    def check_not_null(self, column, threshold=0.95):
        """列非空率必须高于阈值"""
        total = self.df.count()
        non_null = self.df.filter(col(column).isNotNull()).count()
        rate = non_null / total if total > 0 else 0
        passed = rate >= threshold
        self.results.append({
            "check": f"not_null({column})",
            "rate": round(rate, 4),
            "threshold": threshold,
            "passed": passed,
        })
        return self
    
    def check_uniqueness(self, columns):
        """组合键唯一性检查"""
        total = self.df.count()
        distinct = self.df.dropDuplicates(columns).count()
        rate = distinct / total if total > 0 else 0
        self.results.append({
            "check": f"unique({','.join(columns)})",
            "rate": round(rate, 4),
            "threshold": 1.0,
            "passed": rate == 1.0,
        })
        return self
    
    def report(self):
        """生成质量报告"""
        import json
        all_passed = all(r["passed"] for r in self.results)
        print(json.dumps(self.results, indent=2))
        if not all_passed:
            failed = [r for r in self.results if not r["passed"]]
            raise ValueError(f"Quality gate FAILED: {failed}")
        print("✅ All quality checks passed")

# 使用
(DataQualityCheck(spark, "lakehouse.silver.flights")
    .check_not_null("flight_id")
    .check_not_null("departure_time")
    .check_not_null("delay_minutes", threshold=0.90)
    .check_uniqueness(["flight_id", "departure_time"])
    .report())

交付物

  • 数据质量框架代码 + 集成到 Airflow DAG
  • 质量报告仪表板(Grafana)

100K+ 信号:⭐⭐⭐⭐⭐ — 数据质量是区分 "数据工程师" 和 "数据平台架构师" 的分水岭。在 Frankfurt 金融行业,数据质量更是监管合规的硬性要求。


EXP-11:流处理 — Structured Streaming + Redpanda

对标 Databricks 功能:Structured Streaming + Kafka 连接器

实验步骤

# === 从 Redpanda 读取实时事件流 ===
# Redpanda 是 Kafka 的替代品,API 完全兼容
stream_df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "redpanda:9092")
    .option("subscribe", "flight-events")
    .option("startingOffsets", "latest")
    .load()
)

# === 解析 JSON 消息 ===
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import from_json

flight_schema = StructType([
    StructField("flight_id", StringType()),
    StructField("airport_icao", StringType()),
    StructField("event_type", StringType()),  # DEPARTURE, ARRIVAL, DELAY
    StructField("delay_minutes", IntegerType()),
    StructField("event_time", StringType()),
])

parsed_stream = (
    stream_df
    .selectExpr("CAST(value AS STRING) as json_str")
    .select(from_json("json_str", flight_schema).alias("data"))
    .select("data.*")
    .withColumn("event_time", to_timestamp("event_time"))
)

# === 窗口聚合:每 5 分钟的机场延误统计 ===
from pyspark.sql.functions import window

windowed_delays = (
    parsed_stream
    .withWatermark("event_time", "10 minutes")  # 允许 10 分钟迟到数据
    .groupBy(
        window("event_time", "5 minutes"),
        "airport_icao"
    )
    .agg(
        {"delay_minutes": "avg", "*": "count"}
    )
)

# === 写入 Iceberg(流式写入 Bronze 层)===
query = (
    parsed_stream
    .writeStream
    .format("iceberg")
    .outputMode("append")
    .option("checkpointLocation", "s3a://checkpoints/flight-events/")
    .toTable("lakehouse.bronze.flight_events_stream")
)

query.awaitTermination()

交付物

  • Streaming DAG + Redpanda producer 脚本
  • Notebook: 06_structured_streaming.ipynb
  • Grafana 实时延误仪表板

100K+ 信号:⭐⭐⭐⭐ — 流处理是从"数据工程师"升级到"数据平台工程师"的关键技能。Lambda/Kappa 架构的理解更是加分项。


EXP-12:声明式 ETL 框架 — Delta Live Tables (DLT) 模式

对标 Databricks 功能:Delta Live Tables

学习目标:DLT 是 Databricks 的声明式 ETL 框架——你只定义"数据应该是什么样",而不是"如何处理数据"。这是 ETL 的未来方向。你需要用 Python 实现一个简化版的声明式 Pipeline 框架。

实验步骤

# declarative_etl/pipeline.py
"""
SoloLakehouse Declarative ETL Framework — DLT-Inspired

核心理念:用装饰器声明数据转换关系,框架自动处理:
1. 依赖解析和执行顺序
2. 增量处理 vs 全量刷新
3. 数据质量期望(Expectations)
4. 自动表创建和 Schema 管理
"""
from functools import wraps
from typing import List, Callable, Optional
from pyspark.sql import DataFrame

# 全局 Pipeline 注册表
_pipeline_registry = {}

def table(
    name: str,
    comment: str = "",
    expectations: Optional[dict] = None,
    mode: str = "append",  # append | overwrite | merge
):
    """
    声明式表定义装饰器 — 等价于 @dlt.table() in Databricks DLT
    
    Usage:
        @table("silver.flights_clean", 
               expectations={"valid_id": "flight_id IS NOT NULL"})
        def clean_flights(spark):
            return spark.table("bronze.flights_raw").filter(...)
    """
    def decorator(func: Callable):
        _pipeline_registry[name] = {
            "func": func,
            "comment": comment,
            "expectations": expectations or {},
            "mode": mode,
            "dependencies": [],  # 自动检测
        }
        
        @wraps(func)
        def wrapper(*args, **kwargs):
            return func(*args, **kwargs)
        return wrapper
    return decorator


def expect(name: str, condition: str, action: str = "warn"):
    """
    数据质量期望 — 等价于 @dlt.expect() in Databricks DLT
    
    action: "warn" (记录但不阻止), "drop" (过滤掉), "fail" (终止 Pipeline)
    """
    return {name: {"condition": condition, "action": action}}


def run_pipeline(spark, pipeline_name: str = "default"):
    """执行声明式 Pipeline"""
    for table_name, config in _pipeline_registry.items():
        print(f"📊 Processing: {table_name}")
        
        # 1. 执行转换函数
        result_df: DataFrame = config["func"](spark)
        
        # 2. 应用数据质量期望
        for exp_name, exp_config in config["expectations"].items():
            condition = exp_config["condition"]
            action = exp_config["action"]
            
            violations = result_df.filter(f"NOT ({condition})")
            violation_count = violations.count()
            
            if violation_count > 0:
                print(f"  ⚠️ Expectation '{exp_name}': {violation_count} violations")
                if action == "drop":
                    result_df = result_df.filter(condition)
                elif action == "fail":
                    raise ValueError(f"Expectation '{exp_name}' failed")
        
        # 3. 写入目标表
        full_table = f"lakehouse.{table_name}"
        if config["mode"] == "overwrite":
            result_df.writeTo(full_table).createOrReplace()
        else:
            result_df.writeTo(full_table).append()
        
        print(f"  ✅ {table_name}: {result_df.count()} rows written")


# === 使用示例:声明式定义 Medallion Pipeline ===

@table("bronze.flights_raw", mode="append")
def raw_flights(spark):
    return (spark.read.format("csv")
        .option("header", "true")
        .load("s3a://raw-landing/flights/incoming/*.csv"))


@table(
    "silver.flights_clean",
    comment="Cleaned and validated flight records",
    expectations={
        "valid_flight_id": {"condition": "flight_id IS NOT NULL", "action": "drop"},
        "valid_delay": {"condition": "delay_minutes BETWEEN -60 AND 1440", "action": "warn"},
    },
    mode="overwrite",
)
def clean_flights(spark):
    return (spark.table("lakehouse.bronze.flights_raw")
        .withColumn("departure_time", to_timestamp("departure_time"))
        .dropDuplicates(["flight_id", "departure_time"]))


@table("gold.airport_kpis", mode="overwrite")
def airport_kpis(spark):
    return spark.sql("""
        SELECT airport_icao, DATE(departure_time) AS dt,
               COUNT(*) AS flights, AVG(delay_minutes) AS avg_delay
        FROM lakehouse.silver.flights_clean
        GROUP BY 1, 2
    """)


# 一键运行整个 Pipeline
# run_pipeline(spark)

交付物

  • 声明式 ETL 框架代码
  • Notebook: 07_declarative_etl.ipynb
  • 对比文档:命令式 ETL vs 声明式 ETL 的优劣分析

100K+ 信号:⭐⭐⭐⭐⭐ — 自己设计 ETL 框架展示的不是"会写 Pipeline",而是"会设计 Pipeline 平台"。这是 Platform Engineer 的核心能力。


Level 4:数据治理层 — Governance & Security (Week 10-12)

架构师思维:Frankfurt 金融行业受 DORA、MiFID III、BaFin 监管。数据治理不是"nice to have",而是合规刚需。这一层对应 Databricks Unity Catalog 的能力。能讲清楚 "数据血缘 → 审计 → 合规" 的闭环,是拿到金融科技 100K+ offer 的关键差异化。

EXP-13:统一数据目录 — Unity Catalog 替代

对标 Databricks 功能:Unity Catalog

实验步骤

# docker-compose.gravitino.yml
# Apache Gravitino — 统一元数据管理,等价于 Unity Catalog
services:
  gravitino:
    image: apache/gravitino:0.6.0
    ports:
      - "8090:8090"
    environment:
      - GRAVITINO_HOME=/opt/gravitino
    volumes:
      - ./gravitino/conf:/opt/gravitino/conf
# 通过 Gravitino API 注册 Iceberg Catalog
import requests

# 创建 Metalake(等价于 Unity Catalog 的 Metastore)
requests.post("http://gravitino:8090/api/metalakes", json={
    "name": "sololakehouse",
    "comment": "SoloLakehouse unified data catalog",
})

# 注册 Iceberg Catalog(等价于 Unity Catalog 的 External Catalog)
requests.post("http://gravitino:8090/api/metalakes/sololakehouse/catalogs", json={
    "name": "lakehouse",
    "type": "RELATIONAL",
    "provider": "lakehouse-iceberg",
    "comment": "Main data lakehouse",
    "properties": {
        "catalog-backend": "hive",
        "uri": "thrift://hive-metastore:9083",
        "warehouse": "s3a://bronze-warehouse/",
    },
})

# 设置 Tag(用于数据分类,类似 Unity Catalog 的 Tags)
requests.post(
    "http://gravitino:8090/api/metalakes/sololakehouse/tags",
    json={"name": "PII", "comment": "Contains Personally Identifiable Information"},
)

交付物

  • Gravitino 部署配置 + Docker Compose
  • Catalog 注册脚本
  • 数据分类标签方案(PII, Confidential, Public)

100K+ 信号:⭐⭐⭐⭐⭐ — Unity Catalog 是 Databricks 面试的高频话题。能自建等价物说明你理解元数据管理的底层逻辑。


EXP-14:数据血缘 — OpenLineage 集成

对标 Databricks 功能:Unity Catalog Lineage

实验步骤

# docker-compose.lineage.yml
services:
  marquez:
    image: marquezproject/marquez:latest
    ports:
      - "5000:5000"     # API
      - "5001:5001"     # Web UI
    environment:
      - MARQUEZ_DB_HOST=postgres
# Airflow 集成 OpenLineage — 自动追踪数据血缘
# airflow.cfg
[openlineage]
transport = {"type": "http", "url": "http://marquez:5000"}
namespace = "sololakehouse"

# 每个 Airflow Task 自动发送血缘事件到 Marquez
# 你可以在 Marquez UI 中看到:
# raw-landing/*.csv → bronze.flights_raw → silver.flights → gold.airport_kpis

交付物

  • Marquez + OpenLineage 部署
  • Lineage 可视化截图
  • 血缘追踪覆盖率报告

100K+ 信号:⭐⭐⭐⭐⭐ — 数据血缘在金融监管(DORA Article 15)中是强制要求。这是 Frankfurt 金融科技岗位的杀手级技能。


EXP-15:密钥管理 — HashiCorp Vault

对标 Databricks 功能:Databricks Secrets

实验步骤

# docker-compose.vault.yml
services:
  vault:
    image: hashicorp/vault:1.15
    ports:
      - "8200:8200"
    cap_add:
      - IPC_LOCK
    environment:
      - VAULT_DEV_ROOT_TOKEN_ID=dev-token
# vault_integration.py — 集成到 Spark 和 Airflow
import hvac

client = hvac.Client(url="http://vault:8200", token="dev-token")

# 存储 MinIO 凭证(等价于 Databricks dbutils.secrets.put)
client.secrets.kv.v2.create_or_update_secret(
    path="minio",
    secret={"access_key": "...", "secret_key": "..."},
    mount_point="secret",
)

# 读取(等价于 dbutils.secrets.get("scope", "key"))
secret = client.secrets.kv.v2.read_secret_version(
    path="minio", mount_point="secret"
)
minio_key = secret["data"]["data"]["access_key"]

# 在 SparkSession 中使用
spark_conf = {
    "spark.hadoop.fs.s3a.access.key": minio_key,
    "spark.hadoop.fs.s3a.secret.key": minio_secret,
}

交付物

  • Vault 部署 + 初始化脚本
  • Spark/Airflow Vault 集成代码
  • 密钥轮换流程文档

100K+ 信号:⭐⭐⭐⭐ — 密钥管理是企业级安全的基本要求。能演示密钥轮换流程说明你有安全工程意识。


Level 5:ML 平台层 — MLOps 全生命周期 (Week 13-17)

架构师思维:这一层对应 Databricks 的 Mosaic AI 和 MLflow 托管服务。你不是在"训练模型",而是在设计"模型从实验到生产的完整生命周期管理系统"。能讲清楚 "Feature Store → Training → Registry → Serving → Monitoring" 的端到端闭环,是拿到 AI Platform Engineer 职位的核心要求。

EXP-16:实验追踪 — MLflow Tracking Server

对标 Databricks 功能:MLflow Managed Tracking

实验步骤

import mlflow
from mlflow.models import infer_signature

# 连接 SoloLakehouse 的 MLflow Server
mlflow.set_tracking_uri("http://mlflow:5000")
mlflow.set_experiment("frankfurt-airport-taxi-time")

with mlflow.start_run(run_name="xgboost-v1") as run:
    # === 记录参数 ===
    mlflow.log_params({
        "model_type": "xgboost",
        "n_estimators": 200,
        "max_depth": 6,
        "learning_rate": 0.1,
        "feature_set": "v3-with-weather",
    })
    
    # === 训练模型 ===
    import xgboost as xgb
    model = xgb.XGBRegressor(n_estimators=200, max_depth=6, learning_rate=0.1)
    model.fit(X_train, y_train)
    
    # === 记录指标 ===
    predictions = model.predict(X_test)
    mlflow.log_metrics({
        "rmse": mean_squared_error(y_test, predictions, squared=False),
        "mae": mean_absolute_error(y_test, predictions),
        "r2": r2_score(y_test, predictions),
    })
    
    # === 记录模型 + Signature(用于后续 Serving)===
    signature = infer_signature(X_test, predictions)
    mlflow.sklearn.log_model(
        model,
        "taxi_time_predictor",
        signature=signature,
        registered_model_name="frankfurt-taxi-time",  # 自动注册到 Model Registry
    )
    
    # === 记录数据集信息(可追溯性)===
    mlflow.log_params({
        "training_table": "lakehouse.gold.taxi_time_features",
        "training_rows": len(X_train),
        "feature_count": X_train.shape[1],
    })

交付物

  • MLflow 实验对比截图
  • 模型版本管理流程文档

100K+ 信号:⭐⭐⭐ — MLflow 是基础能力。但如果你能展示"实验管理策略"(命名规范、参数记录标准、模型签名),就是进阶能力。


EXP-17:特征存储 — Feast Feature Store

对标 Databricks 功能:Databricks Feature Store

实验步骤

# feature_repo/feature_definitions.py
from feast import Entity, Feature, FeatureView, FileSource, ValueType
from datetime import timedelta

# 实体定义 — "特征属于谁"
airport = Entity(
    name="airport_icao",
    value_type=ValueType.STRING,
    description="ICAO airport code",
)

# 离线特征源(从 Iceberg/Parquet 读取)
airport_features_source = FileSource(
    path="s3://gold-warehouse/features/airport_hourly_stats/",
    timestamp_field="event_timestamp",
    s3_endpoint_override="http://minio:9000",
)

# 特征视图定义
airport_hourly_stats = FeatureView(
    name="airport_hourly_stats",
    entities=[airport],
    ttl=timedelta(hours=24),
    features=[
        Feature(name="avg_delay_1h", dtype=ValueType.FLOAT),
        Feature(name="flight_count_1h", dtype=ValueType.INT64),
        Feature(name="delay_rate_1h", dtype=ValueType.FLOAT),
        Feature(name="avg_taxi_time_1h", dtype=ValueType.FLOAT),
        Feature(name="runway_utilization_1h", dtype=ValueType.FLOAT),
    ],
    source=airport_features_source,
)
# 在训练中使用 Feature Store(point-in-time join)
from feast import FeatureStore

store = FeatureStore(repo_path="./feature_repo")

# Point-in-time 正确的特征获取 — 防止数据泄露!
training_df = store.get_historical_features(
    entity_df=entity_df,  # 包含 airport_icao + event_timestamp
    features=[
        "airport_hourly_stats:avg_delay_1h",
        "airport_hourly_stats:flight_count_1h",
        "airport_hourly_stats:delay_rate_1h",
    ],
).to_df()

# 在线推理时获取最新特征
online_features = store.get_online_features(
    features=["airport_hourly_stats:avg_delay_1h"],
    entity_rows=[{"airport_icao": "EDDF"}],  # Frankfurt Airport
).to_dict()

交付物

  • Feast Feature Store 完整配置
  • 特征定义文件 + 物化脚本
  • Point-in-time Join 正确性验证

100K+ 信号:⭐⭐⭐⭐⭐ — Feature Store 是区分 "ML Engineer" 和 "ML Platform Engineer" 的关键。能解释 point-in-time join 防止数据泄露,是高级 ML 工程能力的标志。


EXP-18:模型注册与版本管理 — MLflow Model Registry

对标 Databricks 功能:MLflow Model Registry (Managed)

实验步骤

from mlflow import MlflowClient

client = MlflowClient()

# === 模型阶段管理(Staging → Production)===
# 等价于 Databricks UI 中的"Transition to Production"
client.transition_model_version_stage(
    name="frankfurt-taxi-time",
    version=3,
    stage="Production",
    archive_existing_versions=True,  # 自动归档旧版本
)

# === 模型审批流程(自定义标签模拟)===
client.set_model_version_tag(
    name="frankfurt-taxi-time",
    version=3,
    key="approval_status",
    value="approved_by:bill",
)
client.set_model_version_tag(
    name="frankfurt-taxi-time",
    version=3,
    key="compliance_check",
    value="passed:DORA-Article-12",
)

# === 加载 Production 模型 ===
# Databricks: mlflow.pyfunc.load_model("models:/model-name/Production")
# SoloLakehouse: 完全相同的 API
model = mlflow.pyfunc.load_model("models:/frankfurt-taxi-time/Production")
predictions = model.predict(new_data)

交付物

  • 模型生命周期管理流程图
  • 模型审批 + 合规标签方案

100K+ 信号:⭐⭐⭐⭐ — 模型治理(审批、合规标签)在金融监管场景下是硬性要求。


EXP-19:模型服务 — Model Serving Endpoint

对标 Databricks 功能:Databricks Model Serving / Mosaic AI

实验步骤

# === 方案 1: MLflow Serve(快速部署)===
# 命令行一键启动
# mlflow models serve -m "models:/frankfurt-taxi-time/Production" -p 8501

# === 方案 2: BentoML(生产级,支持批处理 + 自动缩放)===
# serving/service.py
import bentoml
import mlflow

# 从 MLflow 导入模型到 BentoML
model_uri = "models:/frankfurt-taxi-time/Production"
model = mlflow.pyfunc.load_model(model_uri)
bento_model = bentoml.mlflow.import_model("taxi-time-predictor", model_uri)

@bentoml.service(
    resources={"cpu": "2", "memory": "4Gi"},
    traffic={"timeout": 30},
)
class TaxiTimeService:
    
    def __init__(self):
        self.model = bentoml.mlflow.load_model(bento_model)
    
    @bentoml.api
    def predict(self, input_data: dict) -> dict:
        """
        预测 Frankfurt Airport Taxi Time
        
        Input: {"airport_icao": "EDDF", "airline": "LH", "hour": 14, ...}
        Output: {"predicted_taxi_time_min": 12.5, "confidence": 0.85}
        """
        import pandas as pd
        df = pd.DataFrame([input_data])
        prediction = self.model.predict(df)
        return {
            "predicted_taxi_time_min": float(prediction[0]),
            "model_version": "v3",
            "served_at": datetime.now().isoformat(),
        }
# docker-compose.serving.yml
services:
  taxi-time-service:
    build: ./serving
    ports:
      - "8501:3000"
    environment:
      - MLFLOW_TRACKING_URI=http://mlflow:5000

交付物

  • Model Serving 端点(BentoML 或 MLflow Serve)
  • API 文档(OpenAPI spec)
  • 负载测试结果(locust)

100K+ 信号:⭐⭐⭐⭐ — 能部署模型到 REST API 并做负载测试,说明你理解"模型是产品"的思维。


EXP-20:模型监控与 A/B 测试

对标 Databricks 功能:Lakehouse Monitoring for ML / Inference Tables

实验步骤

# === 使用 Evidently AI 做模型监控 ===
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, RegressionPreset

# 1. 数据漂移检测
drift_report = Report(metrics=[DataDriftPreset()])
drift_report.run(
    reference_data=training_data,    # 训练时的数据分布
    current_data=production_data,    # 最近 24 小时的推理数据
)
drift_report.save_html("reports/drift_report.html")

# 2. 模型性能衰减检测
performance_report = Report(metrics=[RegressionPreset()])
performance_report.run(
    reference_data=training_predictions,
    current_data=production_predictions,
)

# 3. 自动告警(集成 Grafana)
if drift_report.as_dict()["metrics"][0]["result"]["dataset_drift"]:
    # 发送告警到 Grafana/Slack
    alert_model_drift("frankfurt-taxi-time", drift_score=0.35)
# === A/B 测试框架 ===
import random

class ABRouter:
    """简单的 A/B 测试路由器"""
    def __init__(self, model_a_uri, model_b_uri, traffic_split=0.9):
        self.model_a = mlflow.pyfunc.load_model(model_a_uri)  # Champion
        self.model_b = mlflow.pyfunc.load_model(model_b_uri)  # Challenger
        self.split = traffic_split
    
    def predict(self, input_data):
        if random.random() < self.split:
            pred = self.model_a.predict(input_data)
            variant = "champion"
        else:
            pred = self.model_b.predict(input_data)
            variant = "challenger"
        
        # 记录到推理日志表(用于后续分析)
        log_inference(input_data, pred, variant)
        return pred, variant

交付物

  • Evidently 监控报告
  • A/B 测试框架 + 分析 Notebook
  • Grafana 模型监控仪表板

100K+ 信号:⭐⭐⭐⭐⭐ — 模型监控和 A/B 测试是 MLOps 成熟度的最高标准。在 Frankfurt 金融行业,模型漂移检测是 DORA 合规要求。


Level 6:平台运维层 — Production Operations (Week 18-22)

架构师思维:你现在不是在"运行一个项目",而是在"运营一个平台"。可观测性、CI/CD、数据共享是平台成熟度的最终标志。能讲清楚 "Observe → Alert → Respond → Prevent" 的运维闭环,是 Platform Architect 的核心能力。

EXP-21:数据可视化 — Business Dashboards

对标 Databricks 功能:Databricks SQL Dashboards / Lakeview

实验步骤:使用 Grafana 连接 Trino,创建 Frankfurt Airport 运营仪表板。

-- Grafana Data Source: Trino (PostgreSQL protocol)
-- Dashboard: Frankfurt Airport Operations

-- Panel 1: 实时航班延误率(过去 24 小时)
SELECT
    date_trunc('hour', departure_time) AS hour,
    COUNT(*) AS total_flights,
    COUNT(CASE WHEN delay_minutes > 15 THEN 1 END) AS delayed,
    ROUND(COUNT(CASE WHEN delay_minutes > 15 THEN 1 END) * 100.0 / COUNT(*), 1) AS delay_rate
FROM iceberg.silver.flights
WHERE departure_time > CURRENT_TIMESTAMP - INTERVAL '24' HOUR
GROUP BY 1
ORDER BY 1;

-- Panel 2: Taxi Time 预测 vs 实际 对比
SELECT
    prediction_time,
    predicted_taxi_min,
    actual_taxi_min,
    ABS(predicted_taxi_min - actual_taxi_min) AS error_min
FROM iceberg.gold.taxi_time_predictions
WHERE prediction_time > CURRENT_TIMESTAMP - INTERVAL '7' DAY;

交付物

  • Grafana Dashboard JSON
  • 业务仪表板截图

EXP-22:平台可观测性 — Prometheus + Grafana 全栈监控

对标 Databricks 功能:Cluster Monitoring / System Tables

实验步骤

# prometheus/prometheus.yml — 全栈监控配置
scrape_configs:
  # Spark 集群指标
  - job_name: "spark-master"
    static_configs:
      - targets: ["spark-master:8080"]
  
  # MinIO 存储指标
  - job_name: "minio"
    metrics_path: /minio/v2/metrics/cluster
    static_configs:
      - targets: ["minio:9000"]
  
  # Trino 查询指标
  - job_name: "trino"
    static_configs:
      - targets: ["trino:8080"]
  
  # Airflow 调度器指标
  - job_name: "airflow"
    static_configs:
      - targets: ["airflow-webserver:8080"]
  
  # MLflow 自定义指标
  - job_name: "mlflow-custom"
    static_configs:
      - targets: ["mlflow-exporter:9090"]

关键监控仪表板

仪表板 关键指标 告警阈值
Spark Cluster Active executors, Task failures, GC time Executor < 1: Critical
MinIO Storage Bucket size, Request rate, Error rate Error rate > 1%: Warning
Trino Queries Query latency P99, Failed queries P99 > 30s: Warning
Airflow DAGs DAG success rate, Task duration Success rate < 95%: Critical
ML Models Inference latency, Prediction drift Drift score > 0.3: Warning

交付物

  • Prometheus 配置 + Grafana Dashboard
  • 告警规则文件
  • 运维 Runbook(故障排查手册)

100K+ 信号:⭐⭐⭐⭐ — 可观测性是 Platform Engineer 的核心能力。


EXP-23:CI/CD for Data Pipelines

对标 Databricks 功能:Databricks Asset Bundles / Repos

实验步骤

# .github/workflows/data-pipeline-ci.yml
name: Data Pipeline CI/CD

on:
  push:
    paths:
      - "spark-jobs/**"
      - "dags/**"
      - "feature_repo/**"
  pull_request:

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      # 单元测试(PySpark 本地模式)
      - name: Run Pipeline Tests
        run: |
          pip install pyspark pytest great-expectations
          pytest tests/ -v --tb=short
      
      # 数据质量契约测试
      - name: Validate Data Contracts
        run: python scripts/validate_contracts.py
      
      # Schema 兼容性检查
      - name: Check Schema Compatibility
        run: python scripts/check_schema_compat.py
  
  deploy:
    needs: test
    if: github.ref == 'refs/heads/main'
    steps:
      # 部署 DAG 到 Airflow
      - name: Deploy Airflow DAGs
        run: rsync -avz dags/ $AIRFLOW_HOST:/opt/airflow/dags/
      
      # 部署 Spark Jobs
      - name: Deploy Spark Jobs
        run: |
          mc cp spark-jobs/*.py solo/spark-jobs/
      
      # 部署 Feature Definitions
      - name: Apply Feature Store
        run: feast apply

交付物

  • CI/CD Pipeline 配置
  • 测试套件(单元测试 + 集成测试 + 契约测试)
  • 部署流程文档

100K+ 信号:⭐⭐⭐⭐⭐ — Data Pipeline 的 CI/CD 是从"手动运维"到"平台工程"的关键跨越。


EXP-24:数据共享 — Delta Sharing

对标 Databricks 功能:Delta Sharing

# 使用 delta-sharing-server 实现跨组织数据共享
# 场景:将 Gold 层航班延误数据共享给合作伙伴

100K+ 信号:⭐⭐⭐ — 了解即可,不是面试重点。


EXP-25:向量搜索与 RAG — AI 平台扩展

对标 Databricks 功能:Mosaic AI Vector Search / Agent Framework

实验步骤

# === Qdrant 向量数据库 + LangChain RAG ===
from langchain_community.vectorstores import Qdrant
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.chains import RetrievalQA

# 嵌入航班运营文档
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")

# 创建向量索引
vectorstore = Qdrant.from_documents(
    documents=operations_docs,  # 航班运营 SOP 文档
    embedding=embeddings,
    url="http://qdrant:6333",
    collection_name="airport_ops",
)

# RAG 问答
qa_chain = RetrievalQA.from_chain_type(
    llm=local_llm,
    retriever=vectorstore.as_retriever(),
)
answer = qa_chain.run("What is the standard taxi procedure at EDDF during peak hours?")

交付物

  • Qdrant 部署 + 文档索引脚本
  • RAG 演示 Notebook

100K+ 信号:⭐⭐⭐⭐ — RAG 是 2025-2026 最热门的 AI 应用模式。能在自己的平台上跑 RAG 是强信号。


🏆 FINAL:端到端整合项目 — Frankfurt Airport Intelligent Operations Platform

这是所有实验的大融合。 一个完整的项目,整合 25 个实验的所有组件,构建一个 "法兰克福机场智能运营平台"。这就是你简历上的旗舰项目,面试时用 15 分钟讲清楚就能拿到 offer。

项目概述

业务问题:预测 Frankfurt Airport (EDDF) 的航班 Taxi Time,帮助机场运营团队优化跑道调度和地面交通管理。

技术亮点:一个人搭建了等价于 Databricks 全功能平台的开源替代方案,并在此平台上完成了从数据摄入到模型服务的完整 AI 工程闭环。

架构图

                    ┌─────────────────────────────────────────────┐
                    │         Frankfurt Airport Ops Platform       │
                    │            (SoloLakehouse v2.0)              │
                    └─────────────────────────────────────────────┘

┌───────────────┐   ┌──────────────────┐   ┌──────────────────────┐
│  Data Sources │   │   Ingestion      │   │   Storage (MinIO)    │
│               │   │                  │   │                      │
│ Eurocontrol   │──►│ Auto Loader      │──►│ raw-landing/         │
│ Weather API   │   │ (Airflow Sensor) │   │ bronze-warehouse/    │
│ ATC Feeds     │   │ Spark Streaming  │   │ silver-warehouse/    │
│ Airport AODB  │   │ + Redpanda       │   │ gold-warehouse/      │
└───────────────┘   └──────────────────┘   └──────────┬───────────┘
                                                       │
                    ┌──────────────────────────────────┘
                    │
         ┌──────────▼──────────┐
         │   Table Format      │
         │   Apache Iceberg    │
         │   (ACID + TTL +     │
         │    Schema Evolution)│
         └──────────┬──────────┘
                    │
    ┌───────────────┼───────────────┐
    │               │               │
┌───▼───┐    ┌──────▼──────┐   ┌───▼───────────┐
│ Spark │    │    Trino    │   │  Feature Store │
│ (ETL) │    │ (Analytics) │   │    (Feast)     │
└───┬───┘    └──────┬──────┘   └───┬───────────┘
    │               │               │
    │    ┌──────────▼──────┐        │
    │    │   Dashboards    │        │
    │    │   (Grafana)     │        │
    │    └─────────────────┘        │
    │                               │
    └───────────┬───────────────────┘
                │
    ┌───────────▼───────────┐
    │     ML Pipeline       │
    │                       │
    │  Training (Spark ML)  │
    │  Tracking (MLflow)    │
    │  Registry (MLflow)    │
    │  Serving (BentoML)    │
    │  Monitoring(Evidently)│
    └───────────┬───────────┘
                │
    ┌───────────▼───────────┐
    │     Governance        │
    │                       │
    │  Catalog (Gravitino)  │
    │  Lineage (Marquez)    │
    │  Secrets (Vault)      │
    │  Quality (GE + Soda)  │
    └───────────────────────┘

搭建步骤(端到端教程)

Phase 1: 基础设施搭建 (Day 1-2)

# Step 1: 克隆项目
git clone https://github.com/your-repo/sololakehouse.git
cd sololakehouse

# Step 2: 一键启动所有基础服务
make setup   # 生成密钥、下载 JAR、创建目录
make up      # 启动 Docker Compose(~15 个容器)

# Step 3: 验证所有服务健康
make status  # 检查每个服务的健康状态

# 预期输出:
# ✅ MinIO         - http://localhost:9001
# ✅ Spark Master  - http://localhost:8080
# ✅ JupyterLab    - http://localhost:8888
# ✅ Trino         - http://localhost:8090
# ✅ CloudBeaver   - http://localhost:8978
# ✅ Airflow       - http://localhost:8180
# ✅ MLflow        - http://localhost:5000
# ✅ Grafana       - http://localhost:3000
# ✅ Redpanda      - http://localhost:8082
# ✅ Prometheus    - http://localhost:9090

# Step 4: 初始化存储和 Catalog
python scripts/init_platform.py
# scripts/init_platform.py
"""平台初始化脚本 — 创建 Bucket、数据库、初始表"""
from spark_session_factory import create_spark_session

spark = create_spark_session("Platform-Init")

# 创建 Medallion 命名空间
for ns in ["bronze", "silver", "gold", "features", "ml"]:
    spark.sql(f"CREATE DATABASE IF NOT EXISTS lakehouse.{ns}")

print("✅ Platform initialized successfully")

Phase 2: 数据摄入 + Medallion Pipeline (Day 3-5)

# notebooks/01_data_ingestion.ipynb
"""
端到端数据摄入:外部数据 → Raw → Bronze → Silver → Gold
"""
spark = create_spark_session("Data-Ingestion")

# === Step 1: 加载原始航班数据到 Raw Landing ===
# 数据来源:Eurocontrol 公开航班数据 + Frankfurt 机场历史数据
import requests
import os

# 下载示例数据集(或使用你已有的 81,119 条航班记录)
DATA_URL = "https://your-data-source/frankfurt_flights_2025.csv"
LOCAL_PATH = "/tmp/flights.csv"

# 上传到 MinIO raw-landing
s3.upload_file(LOCAL_PATH, "raw-landing", "flights/2025/flights.csv")

# === Step 2: Bronze 摄入 ===
ingest_to_bronze(
    spark,
    source_path="s3a://raw-landing/flights/2025/*.csv",
    table_name="flights_raw",
    source_system="eurocontrol",
)

# === Step 3: Silver 清洗 ===
bronze_to_silver_flights(spark)

# === Step 4: Gold 聚合 ===
silver_to_gold_airport_metrics(spark)

# === Step 5: 验证全链路 ===
for layer in ["bronze.flights_raw", "silver.flights", "gold.airport_daily_metrics"]:
    count = spark.table(f"lakehouse.{layer}").count()
    print(f"  {layer}: {count:,} rows")

Phase 3: 特征工程 + Feature Store (Day 6-7)

# notebooks/02_feature_engineering.ipynb
"""
为 Taxi Time 预测构建特征。

关键特征(基于你之前的分析):
- taxi_in_length_m (correlation: 0.56) — 最强预测因子
- 时间特征(小时、星期、月份)
- 机场运营指标(每小时航班量、延误率)
- 天气特征(如可获取)
"""
spark = create_spark_session("Feature-Engineering")

# === 构建 Taxi Time 特征表 ===
spark.sql("""
    CREATE OR REPLACE TABLE lakehouse.features.taxi_time_features AS
    WITH hourly_stats AS (
        SELECT
            airport_icao,
            DATE_TRUNC('hour', departure_time) AS hour_bucket,
            COUNT(*) AS flights_per_hour,
            AVG(delay_minutes) AS avg_delay_1h,
            STDDEV(delay_minutes) AS std_delay_1h
        FROM lakehouse.silver.flights
        GROUP BY 1, 2
    )
    SELECT
        f.flight_id,
        f.airport_icao,
        f.departure_time,
        f.airline_code,
        
        -- 目标变量
        f.taxi_time_minutes AS target_taxi_time,
        
        -- 核心特征
        f.taxi_in_length_m,           -- 最强预测因子 (r=0.56)
        f.runway_id,
        f.aircraft_type,
        
        -- 时间特征
        HOUR(f.departure_time) AS dep_hour,
        DAYOFWEEK(f.departure_time) AS dep_dow,
        MONTH(f.departure_time) AS dep_month,
        
        -- 机场实时状态特征(来自小时聚合)
        h.flights_per_hour,
        h.avg_delay_1h,
        h.std_delay_1h,
        
        -- 用于 Feast point-in-time join 的时间戳
        f.departure_time AS event_timestamp
        
    FROM lakehouse.silver.flights f
    LEFT JOIN hourly_stats h
        ON f.airport_icao = h.airport_icao
        AND DATE_TRUNC('hour', f.departure_time) = h.hour_bucket
    WHERE f.taxi_time_minutes IS NOT NULL
""")

# 注册到 Feast Feature Store
# feast apply  (命令行)

Phase 4: 模型训练 + MLflow 实验管理 (Day 8-9)

# notebooks/03_model_training.ipynb
"""
Taxi Time 预测模型训练 — 完整 MLOps 流程
"""
import mlflow
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

mlflow.set_tracking_uri("http://mlflow:5000")
mlflow.set_experiment("frankfurt-taxi-time-prediction")

# 加载特征
features_df = spark.table("lakehouse.features.taxi_time_features").toPandas()

# 特征列
FEATURE_COLS = [
    "taxi_in_length_m", "dep_hour", "dep_dow", "dep_month",
    "flights_per_hour", "avg_delay_1h", "std_delay_1h",
]
TARGET = "target_taxi_time"

X = features_df[FEATURE_COLS]
y = features_df[TARGET]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# === 实验 1: XGBoost Baseline ===
with mlflow.start_run(run_name="xgboost-baseline"):
    params = {"n_estimators": 200, "max_depth": 6, "learning_rate": 0.1}
    mlflow.log_params(params)
    mlflow.log_param("feature_set", "v1-core")
    mlflow.log_param("training_samples", len(X_train))
    
    model = xgb.XGBRegressor(**params)
    model.fit(X_train, y_train)
    
    preds = model.predict(X_test)
    metrics = {
        "rmse": mean_squared_error(y_test, preds, squared=False),
        "mae": mean_absolute_error(y_test, preds),
        "r2": r2_score(y_test, preds),
    }
    mlflow.log_metrics(metrics)
    
    # 记录特征重要性
    importance = dict(zip(FEATURE_COLS, model.feature_importances_))
    mlflow.log_dict(importance, "feature_importance.json")
    
    # 注册模型
    mlflow.xgboost.log_model(
        model, "model",
        registered_model_name="frankfurt-taxi-time",
    )
    
    print(f"  RMSE: {metrics['rmse']:.2f} min | R²: {metrics['r2']:.3f}")

# === 实验 2: XGBoost + Hyperparameter Tuning ===
# (类似流程,调参后对比)

# === 实验 3: LightGBM 对比 ===
# (换模型类型,MLflow 自动记录对比)

Phase 5: 模型部署 + 在线推理 (Day 10-11)

# === 部署 Production 模型 ===
client = MlflowClient()

# 审批 + 提升到 Production
client.transition_model_version_stage(
    name="frankfurt-taxi-time",
    version=2,  # 最佳实验的版本号
    stage="Production",
)

# === 启动推理服务 ===
# docker-compose up -d taxi-time-service

# === 测试推理端点 ===
import requests

response = requests.post(
    "http://localhost:8501/predict",
    json={
        "taxi_in_length_m": 2500,
        "dep_hour": 14,
        "dep_dow": 3,
        "dep_month": 7,
        "flights_per_hour": 45,
        "avg_delay_1h": 8.5,
        "std_delay_1h": 12.3,
    },
)
print(response.json())
# {"predicted_taxi_time_min": 12.3, "model_version": "v2", "confidence": 0.87}

Phase 6: 监控 + 治理 + 文档 (Day 12-14)

# 启动监控和治理服务
docker-compose -f docker-compose.monitoring.yml up -d
docker-compose -f docker-compose.governance.yml up -d

# 部署 Grafana 仪表板
python scripts/deploy_dashboards.py

# 生成项目文档
cd docs && mkdocs build && mkdocs serve

项目完成后的交付清单

类别 交付物 文件/链接
基础设施 Docker Compose 全栈配置 docker-compose.yml
基础设施 Makefile 一键操作 Makefile
基础设施 平台初始化脚本 scripts/init_platform.py
数据工程 Medallion Pipeline DAG dags/medallion_pipeline.py
数据工程 Bronze/Silver/Gold 转换脚本 spark-jobs/*.py
数据工程 Auto Loader DAG dags/auto_loader_dag.py
数据工程 声明式 ETL 框架 declarative_etl/
数据质量 质量检查框架 quality/checks.py
数据质量 数据契约定义 contracts/*.yaml
ML 特征工程 Notebook notebooks/02_feature_engineering.ipynb
ML 模型训练 Notebook notebooks/03_model_training.ipynb
ML Feast Feature Store 定义 feature_repo/
ML 模型服务代码 serving/service.py
ML 模型监控报告 reports/
治理 Gravitino Catalog 配置 gravitino/
治理 OpenLineage 集成 lineage/
治理 Vault 密钥管理 vault/
监控 Prometheus 配置 monitoring/prometheus/
监控 Grafana 仪表板 monitoring/grafana/dashboards/
CI/CD GitHub Actions Pipeline .github/workflows/
文档 MkDocs 项目文档 docs/
文档 Architecture Decision Records docs/adr/
文档 README + ARCHITECTURE.md 项目根目录

简历项目描述(模板)

Frankfurt Airport Intelligent Operations Platform | Solo Architect & Engineer

Designed and built a self-hosted Databricks-equivalent data platform using 15+ open-source components (Spark, Iceberg, Trino, Airflow, MLflow, Feast, Redpanda, Grafana), demonstrating full-stack data and AI platform capabilities:Implemented Medallion Architecture (Bronze/Silver/Gold) processing 81K+ flight records with Apache Iceberg on MinIO, achieving ACID compliance and time travel capabilitiesBuilt end-to-end ML pipeline for taxi time prediction (RMSE: X min, R²: 0.XX) with MLflow experiment tracking, Feast feature store, and BentoML model servingDesigned declarative ETL framework inspired by Delta Live Tables with data quality expectations and automated schema evolutionEstablished data governance layer with Apache Gravitino (Unity Catalog alternative), OpenLineage for lineage tracking, and HashiCorp Vault for secrets managementCreated comprehensive CI/CD pipeline for data pipelines with schema compatibility checks and data contract testingDeployed full observability stack (Prometheus + Grafana) with 5 operational dashboards and automated alerting

Tech Stack: PySpark, Apache Iceberg, Trino, Airflow, MLflow, Feast, Redpanda, MinIO, Docker, Grafana, XGBoost

Relevance: Demonstrates platform architecture thinking, regulatory awareness (DORA/BaFin), and ability to independently design and operate enterprise-grade data infrastructure

时间规划总览

阶段 周数 实验 关键里程碑
Level 1: 基础层 Week 1-2 EXP-01 ~ 03 Spark + Iceberg + MinIO 全链路打通
Level 2: 数据工程 Week 3-5 EXP-04 ~ 07 生产级 Medallion Pipeline 运行
Level 3: 高级数据工程 Week 6-9 EXP-08 ~ 12 流处理 + 声明式 ETL 完成
Level 4: 数据治理 Week 10-12 EXP-13 ~ 15 Catalog + Lineage + Secrets 就绪
Level 5: ML 平台 Week 13-17 EXP-16 ~ 20 模型从训练到服务完整闭环
Level 6: 平台运维 Week 18-22 EXP-21 ~ 25 监控 + CI/CD + 文档完善
Final 整合 Week 23-26 端到端项目 Frankfurt Airport 平台完整交付
总计:约 6 个月 — 每周投入 15-20 小时,完成 25 个实验 + 1 个端到端整合项目。完成后你将拥有一个比绝大多数候选人都更有深度的 Portfolio 项目。

附录:与 Databricks 认证的对应关系

Databricks 认证考点 对应实验 覆盖度
Data Engineering Associate EXP-01~07 ✅ 100%
Data Engineering Professional EXP-08~12 ✅ 95%
ML Associate EXP-16~18 ✅ 100%
ML Professional EXP-17~20 ✅ 90%
Data Analyst Associate EXP-05, EXP-21 ✅ 85%
Platform Administrator EXP-13~15, EXP-22 ✅ 80%
完成全部实验后,你不仅具备了 Databricks 全系列认证的知识基础,更重要的是你拥有了超越认证的实操经验——因为你不只是"使用"平台,而是"构建"了平台。