Databricks 完全指南:从架构原理到生产实践

版本: 2025年完整版
适用认证: Databricks Data Engineer Associate/Professional · AI Engineer Associate · ML Professional
定位: 不只是"怎么用",更是"为什么这样设计"——Platform Engineer视角的深度解读

目录

  1. Databricks 是什么:一个架构级的理解
  2. Workspace:你的工作台
  3. Clusters:计算引擎的核心
  4. Databricks Runtime(DBR)
  5. Apache Spark on Databricks
  6. Delta Lake:Lakehouse的地基
  7. Unity Catalog:数据治理的全局大脑
  8. Delta Live Tables(DLT):声明式数据管道
  9. Databricks Workflows:编排与调度
  10. Databricks SQL:分析师的战场
  11. MLflow:ML生命周期管理
  12. Databricks AutoML
  13. Feature Store:特征工程的工厂
  14. Model Serving:模型上线的最后一公里
  15. Databricks Asset Bundles(DAB):基础设施即代码
  16. Repos & Git 集成
  17. Secrets & Security 安全体系
  18. Databricks CLI & REST API
  19. Databricks Connect:本地开发的桥梁
  20. Partner Connect & 生态集成
  21. Databricks on 多云:AWS/Azure/GCP差异
  22. 完整架构总结:Platform Engineer视角

1. Databricks 是什么:架构级的理解

1.1 三句话的本质

很多人对Databricks的认知停留在"云上的Spark平台",但这是极大的误解。Databricks的本质可以用三句话描述:

第一,Databricks是一个统一的数据智能平台(Unified Data Intelligence Platform),它将数据工程(ETL/ELT)、数据仓库(SQL Analytics)、机器学习(ML/AI)三个传统上完全分离的工作负载,统一在一个平台上。这个统一不是简单的功能堆叠,而是共享同一套底层存储(Delta Lake)和治理层(Unity Catalog)。

第二,Databricks实现了"计算与存储分离"的现代Lakehouse架构。你的数据存在对象存储(S3/ADLS/GCS)上,计算资源(Clusters)按需启动、按需关闭。这带来了弹性伸缩能力和极低的存储成本。

第三,Databricks是一家云服务公司,不是开源软件。它构建在Apache Spark、MLflow、Delta Lake等开源项目之上,但提供了大量企业级的商业化能力(Unity Catalog、DLT、Model Serving等),并以按需计费(DBU)的方式收费。

1.2 Lakehouse架构的诞生逻辑

要真正理解Databricks,你必须理解Lakehouse架构为什么出现。传统架构有两个极端:

**数据湖(Data Lake)**将海量原始数据低成本存储在S3/ADLS上,但它没有ACID事务、没有Schema enforcement、没有索引,导致"数据沼泽"问题——数据堆进去了,但查询慢、质量差、难以治理。

**数据仓库(Data Warehouse)**如Snowflake/Redshift,有强Schema、高性能SQL查询,但存储成本高昂(专有格式),且不适合存储非结构化数据(图像、文本),也不适合直接跑ML训练(数据要先导出到S3再训练)。

Lakehouse的设计思想是:在廉价的对象存储(S3)上,通过一个开放的表格式(Delta Lake/Iceberg/Hudi)来提供Data Warehouse级别的功能——ACID事务、Schema管理、高性能查询,同时保持Data Lake的低成本和对ML工作负载的直接支持。

传统架构(双栈,数据需要复制):
                                  ┌─────────────┐
                                  │  Data Lake  │  ← ML训练
                                  │  (S3/ADLS)  │
                                  └──────┬──────┘
                                         │ ETL复制数据
                                  ┌──────▼──────┐
                                  │  Data       │  ← SQL分析
                                  │  Warehouse  │
                                  └─────────────┘

Lakehouse架构(统一存储):
                        ┌─────────────────────────────┐
                        │   对象存储 (S3/ADLS/GCS)    │
                        │   ┌─────────────────────┐   │
                        │   │    Delta Lake        │   │
                        │   │  (开放表格式 + ACID)  │   │
                        │   └─────────────────────┘   │
                        └─────────────┬───────────────┘
                                      │  同一份数据
              ┌───────────────────────┼────────────────────┐
              │                       │                    │
       ┌──────▼──────┐      ┌─────────▼──────┐    ┌───────▼──────┐
       │ Data Eng.   │      │ SQL Analytics  │    │  ML/AI       │
       │ (Spark/DLT) │      │ (SQL Warehouse)│    │  (MLflow)    │
       └─────────────┘      └────────────────┘    └──────────────┘

1.3 Databricks的核心抽象层次

在深入各个组件之前,你需要先建立这个层次模型:

┌────────────────────────────────────────────────────────┐
│              应用层 Applications                        │
│  Notebooks · SQL Editor · ML Experiments · Dashboards  │
├────────────────────────────────────────────────────────┤
│              服务层 Services                            │
│  DLT · Workflows · Model Serving · AutoML · FS         │
├────────────────────────────────────────────────────────┤
│              计算层 Compute                             │
│   All-Purpose Clusters · Job Clusters · SQL Warehouse  │
│   Databricks Runtime (DBR) · Photon                    │
├────────────────────────────────────────────────────────┤
│              存储与治理层 Storage & Governance          │
│   Delta Lake · Unity Catalog · Delta Sharing           │
├────────────────────────────────────────────────────────┤
│              基础设施层 Infrastructure                  │
│   Cloud Object Storage (S3/ADLS/GCS)                   │
│   Cloud VMs (EC2/Azure VM/GCE) ← Databricks管理       │
└────────────────────────────────────────────────────────┘

这个层次模型非常关键:下层为上层服务,每一层都可以独立扩展。这正是Platform Engineer要理解和设计的内容。


2. Workspace:你的工作台

2.1 Workspace的本质

Databricks Workspace是你与平台交互的统一入口,本质上是一个托管在云上的Web应用,但它管理着背后极其复杂的云资源。你可以把Workspace理解为"操作系统的桌面"——所有的工作都从这里开始。

每个Workspace对应一个独立的Databricks实例,有唯一的URL(如https://adb-1234567890.1.azuredatabricks.net)。在企业里,通常会有多个Workspace对应不同环境(开发/测试/生产)。

2.2 Workspace的主要区域

Workspace浏览器(左侧边栏) 是组织你所有资产的地方。它的目录结构分为两类:用户个人目录(/Users/your_email/)和共享目录(/Shared/)。在Unity Catalog启用之前,Workspace浏览器也是管理旧版Hive Metastore数据库的地方。

Catalog(数据目录) 是访问Unity Catalog的主入口,你可以在这里浏览所有的数据表、视图、卷(Volumes)、模型,并查看其Schema和样本数据。这是现代Databricks工作流的核心。

Compute(计算管理) 页面让你管理所有的Clusters和SQL Warehouses,包括创建、配置、启动和终止。

Workflows 页面是Job的管理中心,你可以创建、监控、调试所有的数据管道和自动化任务。

SQL(SQL Editor) 是面向分析师的界面,提供SQL编辑器、查询历史、结果可视化和Dashboard功能。

Machine Learning 是ML工作流的入口,整合了MLflow Experiments、Feature Store、Model Registry的访问。

2.3 Workspace的访问控制

Workspace有两种顶层角色:Admin(管理整个Workspace的配置、用户、计算资源)和User(使用资源进行开发)。在Unity Catalog启用后,数据对象的权限管理从Workspace级别下沉到了Catalog级别,形成了更细粒度的控制。


3. Clusters:计算引擎的核心

3.1 Cluster是什么

Cluster(集群)是Databricks中运行代码的计算单元。它本质上是一组云虚拟机(Driver节点 + 若干Worker节点),运行着Apache Spark、Databricks Runtime等软件。你的每一行Python/Scala/SQL代码,最终都在某个Cluster上执行。

这里有一个关键认知:Cluster是按时间计费的资源(DBU/小时 × 运行时间)。理解何时启动Cluster、如何配置Cluster、何时可以共享Cluster,是控制Databricks成本的关键能力,也是Platform Engineer的核心职责之一。

3.2 三种Cluster类型详解

All-Purpose Cluster(通用集群) 是交互式开发用的集群。你在Notebook里写代码时,就是附加到一个All-Purpose Cluster上的。它的特点是:可以被多个用户和多个Notebook同时共享,可以长时间运行(但会产生持续费用),支持自动终止(Autotermination)来节省成本。由于是共享资源,All-Purpose Cluster上任意用户的代码都可能相互影响。

# 在Notebook中,你通过附加(Attach)到一个Cluster来运行代码
# 集群选择在Notebook右上角的下拉菜单中

# 检查当前Spark版本和集群信息
print(spark.version)
print(sc.getConf().getAll())

Job Cluster(作业集群) 是为自动化任务(Job/Workflow)专门启动的集群。它在Job开始时自动创建,Job结束时自动销毁。由于生命周期与任务绑定,Job Cluster有几个重要优势:隔离性好(不会因其他任务影响性能)、成本精确(只在任务运行时产生费用)、配置可以为特定任务精确调优。在生产环境中,几乎所有数据管道都应使用Job Cluster而不是All-Purpose Cluster。

SQL Warehouse(SQL仓库) 是专门为SQL查询优化的计算资源,内部运行Photon引擎(C++写的向量化执行引擎,比Java版Spark快2-12倍)。SQL Warehouse有Classic和Serverless两种模式:Classic模式在你的云账号VPC中启动VM;Serverless模式在Databricks管理的基础设施上运行,启动时间从分钟级降到秒级。

3.3 Cluster配置详解

一个完整的Cluster配置包含以下关键参数:

Cluster Mode(集群模式) 决定了资源如何在多用户间分配。Single Node模式只有Driver没有Worker,适合小数据集的探索和ML模型开发(省钱);Standard模式是最常见的多节点集群;High Concurrency模式(旧版)为多用户并发做了优化,现在已被Shared Access Mode取代。

Access Mode(访问模式) 是Unity Catalog引入后的新概念,控制集群的安全隔离级别。Single User模式只允许一个用户使用,支持所有语言(Python/Scala/R/SQL),并且支持Unity Catalog的所有功能;Shared模式允许多用户共享但通过进程隔离保证安全,支持Python和SQL,支持Unity Catalog;No Isolation Shared模式(遗留)没有用户隔离,不推荐在生产中使用。

Node Type(节点类型) 就是选择底层的云VM规格。r系列(内存优化型)适合大型DataFrame操作和复杂Join;c系列(计算优化型)适合CPU密集型的ML训练;g系列(GPU型)适合深度学习。Driver节点通常选择内存较大的机型,因为它需要协调所有Worker并收集结果。

Autoscaling(自动伸缩) 允许集群根据负载在Min Workers和Max Workers之间自动调整Worker数量。开启Autoscaling可以显著节省成本,但对于流式处理(Structured Streaming)等场景,固定Worker数量可能表现更稳定。

// 典型的Job Cluster配置(通过API或DAB定义)
{
  "cluster_name": "etl-production-cluster",
  "spark_version": "14.3.x-scala2.12",
  "node_type_id": "i3.2xlarge",
  "driver_node_type_id": "i3.2xlarge",
  "num_workers": 4,
  "autoscale": {
    "min_workers": 2,
    "max_workers": 8
  },
  "data_security_mode": "SINGLE_USER",
  "spark_conf": {
    "spark.sql.shuffle.partitions": "200",
    "spark.databricks.delta.optimizeWrite.enabled": "true"
  },
  "init_scripts": [
    {
      "workspace": {
        "destination": "/init_scripts/install_dependencies.sh"
      }
    }
  ]
}

Init Scripts(初始化脚本) 在Cluster启动时每个节点上执行,用于安装系统级依赖、配置环境变量、安装自定义库等。有三种类型:Workspace Path(存在Workspace中)、DBFS Path(存在DBFS中)和Volume Path(存在Unity Catalog Volume中,推荐)。Init Script的执行在DBR加载之后、用户代码运行之前,所以可以放心安装Python包。

Cluster Libraries(集群库) 是安装在整个Cluster上、所有用户都可以使用的Python/Maven/CRAN包。可以通过UI或API指定,也可以在Notebook中用%pip install临时安装(但这个安装只在当前Notebook Session有效)。

3.4 Cluster Events与健康监控

Cluster页面的Events标签页是诊断问题的重要工具。它记录了集群生命周期中的所有事件:STARTINGRUNNINGRESIZING(自动伸缩触发)→ TERMINATING。当集群意外终止或Job失败时,Events日志是你第一个要看的地方。

Spark UI是另一个关键的调试工具,在Cluster详情页可以直接打开。它展示了所有Stage的执行情况、每个Task的耗时、数据倾斜(Skew)的迹象。在生产环境中,能熟练使用Spark UI诊断性能问题,是区分初级工程师和高级工程师的重要标志。


4. Databricks Runtime(DBR)

4.1 DBR的本质

Databricks Runtime(DBR)是运行在每个Cluster节点上的软件栈。你可以把它理解为一个预配置好的"大数据操作系统"——它打包了Apache Spark、Delta Lake、Python、Scala、R、各种优化后的库,以及Databricks自研的优化(如Photon)。

当你创建Cluster时选择的"Databricks Runtime Version"(如14.3 LTS),决定了这个Cluster上运行的软件版本。这个选择影响极大:不同DBR版本对应不同的Spark版本、Python版本、Delta Lake版本,以及不同的性能优化和功能支持。

4.2 DBR的主要变体

Databricks Runtime (Standard) 是最基础的运行时,包含Spark、Delta Lake、Python、Scala的核心依赖。适用于大多数数据工程任务。

Databricks Runtime ML 在Standard基础上,预安装了机器学习生态系统:TensorFlow、PyTorch、Scikit-learn、XGBoost、HuggingFace Transformers、MLflow等。如果你要做ML训练,直接选这个版本,省去了手动安装的麻烦和版本冲突的烦恼。

Databricks Runtime with Photon 内置了Photon引擎——Databricks用C++重写的向量化查询引擎,专为SQL查询和DataFrame操作优化。Photon对Spark API完全兼容(你不需要修改任何代码),但在列式扫描、聚合、Join操作上有2-12倍的性能提升。SQL Warehouse默认使用Photon。

Databricks Runtime for Genomics 专为基因组学工作负载优化,内置了Glow等专业库。这是一个较小众的变体,但展示了Databricks针对垂直领域的定制化能力。

4.3 LTS版本策略

Databricks标记为LTS(Long-Term Support)的版本,提供两年以上的维护支持。在生产环境中,永远使用LTS版本,避免使用非LTS版本(它们的维护周期短,升级频繁)。例如13.3 LTS14.3 LTS都是稳定的生产选择。

版本升级是一个需要谨慎管理的过程:新版本可能带来Spark行为变化(如默认的spark.sql.ansi.enabled从false变true),需要在测试环境充分验证后再推向生产。


5. Apache Spark on Databricks

5.1 Spark的架构回顾

Apache Spark是Databricks的计算核心。在这里不做基础介绍,而是聚焦Databricks环境中Spark的特殊行为和最佳实践。

Spark采用Master-Worker架构:Driver Program是主程序,负责解析你写的代码、构建执行计划(DAG)、将任务分发给各个Worker;Executor(Worker节点上的进程)负责实际执行任务、读写数据。在Databricks中,Cluster的Driver节点运行Driver Program,Worker节点运行Executors。

# SparkSession在Databricks中已经预创建,直接使用spark变量
# 不需要也不应该自己创建SparkSession

# 读取Delta表(Databricks最常见的操作)
df = spark.table("catalog_name.schema_name.table_name")

# 或者读取文件
df = spark.read.format("delta").load("/path/to/delta/table")

# 基本的DataFrame操作
from pyspark.sql import functions as F
from pyspark.sql.window import Window

result = (df
    .filter(F.col("event_date") >= "2024-01-01")
    .groupBy("user_id", "event_date")
    .agg(
        F.count("event_id").alias("event_count"),
        F.sum("revenue").alias("total_revenue")
    )
    .withColumn("revenue_rank",
        F.rank().over(Window.partitionBy("event_date")
                           .orderBy(F.desc("total_revenue")))
    )
)

result.write.mode("overwrite").saveAsTable("my_schema.daily_user_summary")

5.2 Spark的懒执行(Lazy Evaluation)

Spark最重要的概念之一是懒执行:Transformations(转换操作)不会立即执行,只有Actions(动作操作)才会触发实际计算

Transformations包括:filter()select()groupBy()join()withColumn()等。这些操作只是在构建执行计划(Logical Plan),不触发任何计算。

Actions包括:show()count()collect()write()save()等。每个Action触发一次Job,Spark编译之前积累的所有Transformations为一个优化后的物理执行计划,然后分发到集群执行。

这个设计的好处是:Spark的Catalyst优化器可以看到完整的计划,进行谓词下推(Predicate Pushdown)、列剪裁(Column Pruning)、Join重排等优化,比逐步执行效率高得多。

# 这三行Transformation都不执行任何计算
filtered_df = df.filter(F.col("status") == "active")  # 懒执行
projected_df = filtered_df.select("user_id", "created_at", "revenue")  # 懒执行
grouped_df = projected_df.groupBy("user_id").agg(F.sum("revenue"))  # 懒执行

# 直到这个Action,Spark才开始执行上面所有操作的优化后计划
grouped_df.show()  # 触发执行!

# 使用explain()查看执行计划(不触发执行)
grouped_df.explain(extended=True)  # 显示从Parsed到Physical Plan的完整计划

5.3 Shuffle:性能的核心瓶颈

当Spark执行需要跨节点重新分配数据的操作时(如groupByjoinorderBy),就会发生Shuffle——数据从每个Executor写到磁盘,然后通过网络传输到目标Executor。Shuffle是分布式计算中最昂贵的操作,是性能调优的核心关注点。

spark.sql.shuffle.partitions控制Shuffle后的分区数,默认值是200。这个值需要根据数据量调整:对于小数据集(<100GB),200可能导致每个分区太小("小文件问题");对于大数据集(>10TB),200个分区可能导致每个分区太大(处理慢)。

在Databricks中,可以使用Adaptive Query Execution(AQE,Spark 3.x默认开启)让Spark在运行时动态调整分区数,大大减少了手动调优的需要。

# 查看/设置shuffle分区数
print(spark.conf.get("spark.sql.shuffle.partitions"))
spark.conf.set("spark.sql.shuffle.partitions", "100")

# AQE在Databricks中默认启用(Spark 3.0+)
# 它会自动处理:
# 1. 动态合并小的Shuffle分区
# 2. 自动处理数据倾斜(Skew Join)
# 3. 动态切换Join策略

# 检查AQE是否启用
print(spark.conf.get("spark.sql.adaptive.enabled"))  # 应该是 true

5.4 广播Join(Broadcast Join)

当一个大表和一个小表Join时,可以使用广播Join将小表复制到每个Executor,避免大规模Shuffle。Spark会在小表小于spark.sql.autoBroadcastJoinThreshold(默认10MB)时自动使用广播Join,也可以手动Hint。

from pyspark.sql.functions import broadcast

# 手动广播小表(比如维度表)
result = large_fact_df.join(
    broadcast(small_dim_df),  # 强制广播
    on="product_id",
    how="left"
)

5.5 Partitioning与数据组织

DataFrame的分区(Partitions)决定了并行度——每个Partition由一个Task处理。理想状态下,每个Partition大小应在100MB-200MB之间,这样既保证了并行度,又不会产生过多的调度开销。

repartition(n)会触发Full Shuffle,重新均匀分配数据到n个分区;coalesce(n)只做本地合并(不Shuffle),只能减少分区数,但更高效。

# 检查当前分区数
print(df.rdd.getNumPartitions())

# 增加分区(触发Shuffle)
df_repartitioned = df.repartition(100)

# 按列分区(用于优化特定的Filter或Join)
df_repartitioned = df.repartition(100, F.col("date"))

# 减少分区(不触发Shuffle,更高效)
df_coalesced = df.coalesce(10)

# 在写入文件时,分区数决定了输出文件数量
# Delta Lake的OPTIMIZE命令可以事后整合小文件

6. Delta Lake:Lakehouse的地基

6.1 Delta Lake解决的根本问题

Delta Lake是Databricks最核心的创新之一(后来开源)。它本质上是一个在对象存储上实现ACID事务的开放表格式,由以下三个核心部分组成:

数据文件(Parquet格式)存储在对象存储上,这部分没有魔法——就是普通的Parquet文件。

事务日志(Transaction Log)是Delta Lake的灵魂,存储在数据目录下的_delta_log/子目录中。每次对表的操作(写入、更新、删除、Schema变更)都会在事务日志中追加一条JSON记录,这条记录描述了"发生了什么"(添加了哪些文件、删除了哪些文件)。

Checkpoints是事务日志的快照,每10个事务会创建一个Parquet格式的Checkpoint,用于加速元数据读取(不需要回放所有历史日志)。

Delta Table目录结构:
my_table/
├── _delta_log/
│   ├── 00000000000000000000.json    ← 事务0(建表)
│   ├── 00000000000000000001.json    ← 事务1(第一次写入)
│   ├── 00000000000000000002.json    ← 事务2(第二次写入)
│   ├── ...
│   ├── 00000000000000000010.checkpoint.parquet  ← 第10次事务的快照
│   └── _last_checkpoint                          ← 指向最新checkpoint
├── part-00000-xxx.snappy.parquet    ← 实际数据文件
├── part-00001-xxx.snappy.parquet
└── ...

6.2 ACID事务的实现

Delta Lake通过**乐观并发控制(Optimistic Concurrency Control)**实现ACID事务。当两个写入操作同时进行时,Delta Lake使用条件写入(Conditional Write)到对象存储来解决冲突:先完成的操作成功写入事务日志,后完成的操作检测到冲突,会重新读取最新状态、重试执行,若无法自动解决则抛出ConcurrentModificationException

这个设计的聪明之处在于:对象存储本身不支持事务,但通过这种日志+条件写入的机制,Delta Lake在它之上构建了事务语义。

# ACID的原子性:要么全部成功,要么全部失败
# 即使写入过程中集群崩溃,Delta Lake也能保证数据一致性

# 原子性写入
df.write.format("delta").mode("overwrite").saveAsTable("my_table")

# 多步骤操作的事务性(使用DeltaTable API)
from delta.tables import DeltaTable

delta_table = DeltaTable.forName(spark, "catalog.schema.my_table")

# MERGE操作:原子地执行插入/更新/删除
delta_table.alias("target").merge(
    source_df.alias("source"),
    "target.id = source.id"
).whenMatchedUpdateAll(
).whenNotMatchedInsertAll(
).execute()

6.3 Time Travel(时间旅行)

Time Travel是Delta Lake最受用户喜爱的功能之一。由于所有历史版本的数据文件都被保留(直到VACUUM操作清理),你可以查询任何历史版本的数据。

# 按版本号查询
df_v1 = spark.read.format("delta").option("versionAsOf", 5).load("/path/to/table")

# 按时间戳查询
df_yesterday = spark.read.format("delta") \
    .option("timestampAsOf", "2024-01-15 12:00:00") \
    .load("/path/to/table")

# 使用SQL
spark.sql("""
    SELECT * FROM my_table VERSION AS OF 5
""")

spark.sql("""
    SELECT * FROM my_table TIMESTAMP AS OF '2024-01-15 12:00:00'
""")

# 查看表的历史记录
delta_table.history().show()
# 输出包含:version, timestamp, operation, operationParameters, etc.

# 回滚到历史版本(使用RESTORE)
spark.sql("RESTORE TABLE my_table TO VERSION AS OF 3")

Time Travel的用途包括:数据审计(谁在什么时候改了什么)、事故恢复(误删数据后回滚)、A/B测试的可重现性(使用相同的历史数据重新跑模型训练)。

注意:Time Travel依赖于数据文件没有被VACUUM清理。VACUUM默认保留7天的历史,之后旧文件会被删除,就无法时间旅行到更早的版本了。

6.4 Schema Evolution与Schema Enforcement

Delta Lake默认开启Schema Enforcement:当你试图写入Schema不匹配的数据(如多了一列、少了一列、列类型不兼容),写入操作会失败并抛出AnalysisException。这比传统数据湖的"写什么都行"行为要安全得多,防止了"坏数据悄悄写进去"的问题。

当你确实需要添加新列或做其他Schema变化时,可以启用Schema Evolution:

# 方式1:合并Schema(自动添加新列)
df_with_new_column.write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save("/path/to/table")

# 方式2:覆盖Schema(完全替换)
df_new_schema.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("/path/to/table")

# 通过ALTER TABLE添加列(DDL方式)
spark.sql("ALTER TABLE my_table ADD COLUMNS (new_col STRING COMMENT 'New field')")

# 修改列(有限制:可以放宽类型,不能缩窄)
spark.sql("ALTER TABLE my_table ALTER COLUMN amount TYPE DOUBLE")

6.5 Change Data Feed(CDF)

Change Data Feed(也叫Change Data Capture for Delta)允许你高效地追踪Delta表的变更,而不需要全量扫描。开启CDF后,每次对表的DML操作(INSERT/UPDATE/DELETE/MERGE)都会记录"变更行",包含一个特殊的_change_type列(insert/update_preimage/update_postimage/delete)。

# 在创建表时启用CDF
spark.sql("""
    CREATE TABLE my_cdc_table (
        id BIGINT,
        value STRING,
        updated_at TIMESTAMP
    )
    TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')
""")

# 也可以在已有表上启用
spark.sql("ALTER TABLE my_table SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')")

# 读取变更数据(增量处理的核心)
changes = (spark.read.format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", 5)          # 从版本5开始读
    # .option("startingTimestamp", "2024-01-01")  # 或从时间戳开始
    .table("my_cdc_table"))

changes.show()
# 输出:id | value | updated_at | _change_type | _commit_version | _commit_timestamp

CDF在构建增量ETL管道时极其有价值——下游表可以只处理"上游表的变化部分",而不需要每次重新扫描全量数据。

6.6 OPTIMIZE与Z-Order

随着数据不断写入,Delta表会积累大量小文件(每次写操作都可能产生新的Parquet文件)。小文件问题会导致查询时需要打开大量文件,显著降低性能。

OPTIMIZE命令将小文件合并成更大的文件(默认目标文件大小1GB),这个过程不影响数据内容,只是重新整合了物理存储。

-- 基础OPTIMIZE(合并小文件)
OPTIMIZE catalog.schema.my_table;

-- OPTIMIZE + Z-Order(合并文件同时按列排序,提升过滤性能)
OPTIMIZE catalog.schema.my_table ZORDER BY (user_id, event_date);

Z-Order是一种多维数据排序技术,它把多个列的值"交织"排列,使得在这些列上过滤时,可以跳过更多的不相关数据文件(Data Skipping)。规则是:将最常用于过滤(WHERE子句)的列放入Z-Order,通常是时间列和高基数的业务ID列。

Liquid Clustering是Databricks在2023年推出的Z-Order替代方案,它使用增量聚类(不需要重写所有文件),并且可以随时更改聚类列而不需要重建表,是新表的推荐方式。

-- 创建Liquid Clustering表
CREATE TABLE my_table (
    id BIGINT,
    user_id STRING,
    event_date DATE,
    value DOUBLE
)
CLUSTER BY (user_id, event_date);  -- 指定聚类列

-- 对已有表应用Liquid Clustering
ALTER TABLE existing_table CLUSTER BY (user_id, event_date);

-- 手动触发聚类(也可以让OPTIMIZE自动处理)
OPTIMIZE my_table;

6.7 VACUUM

VACUUM命令清理不再被任何Delta表版本引用的旧数据文件。默认保留7天(168小时)的历史数据,这对应了默认的Time Travel窗口。

-- 查看会被清理的文件(DRY RUN不实际删除)
VACUUM catalog.schema.my_table DRY RUN;

-- 执行清理(保留默认7天的历史)
VACUUM catalog.schema.my_table;

-- 自定义保留时间(最少1小时,低于1小时需要关闭安全检查)
VACUUM catalog.schema.my_table RETAIN 72 HOURS;

重要:VACUUM操作是不可逆的。执行前,确保没有长时间运行的读取操作(它们依赖旧文件),也确保理解缩短保留时间后Time Travel的限制。

6.8 Delta Lake Table Properties

通过TBLPROPERTIES可以精细控制Delta表的行为:

ALTER TABLE my_table SET TBLPROPERTIES (
    -- 启用增量优化写(自动合并小文件到适当大小)
    'delta.autoOptimize.optimizeWrite' = 'true',
    -- 启用自动压缩
    'delta.autoOptimize.autoCompact' = 'true',
    -- 设置数据保留时间
    'delta.deletedFileRetentionDuration' = 'interval 30 days',
    -- 启用Change Data Feed
    'delta.enableChangeDataFeed' = 'true',
    -- 设置目标文件大小(Liquid Clustering中控制文件大小)
    'delta.targetFileSize' = '134217728'  -- 128MB
);

7. Unity Catalog:数据治理的全局大脑

7.1 为什么需要Unity Catalog

在Unity Catalog出现之前,Databricks的数据治理是混乱的:每个Workspace有自己独立的Hive Metastore,跨Workspace共享数据困难;权限管理分散在Workspace级别,缺乏统一视图;没有列级权限控制;数据Lineage不完整。

Unity Catalog(UC)解决了所有这些问题,它是一个跨Workspace、跨云的统一数据治理层,提供:

  • 三层命名空间(Catalog → Schema → Table)
  • 细粒度的权限控制(表级、列级、行级)
  • 自动化数据Lineage追踪(列级Lineage)
  • 统一的数据审计日志
  • 安全的数据共享(Delta Sharing)

7.2 三层命名空间

Unity Catalog引入了三层命名空间,相比旧的两层(Database.Table)多了一层Catalog:

Metastore(最顶层,一个Region一个)
├── Catalog(相当于一个数据库实例)
│   ├── Schema(相当于数据库中的schema/namespace)
│   │   ├── Table(Delta表、外部表、视图)
│   │   ├── View
│   │   ├── Volume(存储非表格数据的文件)
│   │   ├── Function(UDF)
│   │   └── Model(MLflow模型)
│   └── ...
└── ...

完整的三段式引用:catalog_name.schema_name.table_name
例如:main.finance.transactions

这个设计让不同的团队/环境/项目可以有独立的Catalog,而共享同一个Metastore(和同一套权限管理框架)。典型的Catalog划分方式:devstagingprod(环境隔离),或financemarketingoperations(业务域隔离)。

7.3 权限模型(GRANT/REVOKE)

UC使用标准SQL的GRANT/REVOKE语法管理权限,支持层次继承:在Catalog上授予的权限会自动传递给其下的所有Schema和Table。

-- 创建一个服务账号principal
-- (通常由Admin通过UI/API完成)

-- 给用户授予Catalog的使用权
GRANT USE CATALOG ON CATALOG main TO `user@company.com`;

-- 给组授予Schema的使用权
GRANT USE SCHEMA ON SCHEMA main.finance TO `group:analysts`;

-- 给服务账号授予表的SELECT权限
GRANT SELECT ON TABLE main.finance.transactions TO `service-account@project.iam.gserviceaccount.com`;

-- 授予表的写权限
GRANT MODIFY ON TABLE main.finance.staging_data TO `etl-service-principal`;

-- 授予创建表的权限(给数据工程师)
GRANT CREATE TABLE ON SCHEMA main.finance TO `group:data-engineers`;

-- 列级权限:屏蔽特定列
-- (通过Row Filter和Column Mask实现)
CREATE FUNCTION mask_email(email STRING)
RETURNS STRING
RETURN CASE 
    WHEN is_account_group_member('pii-access') THEN email
    ELSE REGEXP_REPLACE(email, '(.)(.*)(@.*)', '$1***$3')
END;

ALTER TABLE main.finance.users ALTER COLUMN email SET MASK mask_email;

-- 查看当前权限
SHOW GRANTS ON TABLE main.finance.transactions;

7.4 Volumes:非表格数据的统一管理

Volumes是Unity Catalog中管理非结构化和半结构化数据(文件、图像、脚本等)的对象,它让文件存储也纳入了UC的权限体系。

有两种类型:Managed Volume(数据文件存储在Databricks管理的存储位置,由UC完全控制)和External Volume(指向你自己云存储中的一个路径,UC管理访问权限但不管理数据本身)。

-- 创建Managed Volume
CREATE VOLUME main.raw_data.uploads;

-- 创建External Volume(指向你的S3/ADLS路径)
CREATE EXTERNAL VOLUME main.raw_data.external_files
LOCATION 's3://my-bucket/raw-files/';

-- 通过Volume路径访问文件(统一通过UC路径访问)
-- /Volumes/catalog_name/schema_name/volume_name/file.csv
LIST '/Volumes/main/raw_data/uploads/';

-- 在代码中读取Volume中的文件
df = spark.read.csv("/Volumes/main/raw_data/uploads/data.csv", header=True)

7.5 Data Lineage(数据血缘)

Unity Catalog自动追踪数据从源表到目标表的流转路径,包括列级Lineage。当你执行CREATE TABLE AS SELECTINSERT INTO ... SELECT、或通过Spark读写表时,UC会自动记录这些操作中的列与列之间的关系。

在Catalog的Table详情页,你可以看到"Lineage"标签,展示:

  • Upstream:这张表的数据来自哪些表的哪些列
  • Downstream:这张表的数据被哪些表/视图/Dashboard消费

这对数据影响分析(Impact Analysis)极其有价值:当你要修改一张表的Schema时,可以立即看到下游哪些表、哪些报表会受到影响,从而制定安全的迁移策略。

7.6 Delta Sharing:安全的跨组织数据共享

Delta Sharing是一个开放协议,允许你在不移动数据、不复制数据的前提下,安全地将Delta表共享给外部合作方(其他公司、其他云平台、其他工具)。接收方不需要使用Databricks,只要支持Delta Sharing协议就可以读取数据。

-- 创建一个Share
CREATE SHARE my_partner_share;

-- 向Share中添加表
ALTER SHARE my_partner_share ADD TABLE main.public.monthly_summary;

-- 创建Recipient(合作方)
CREATE RECIPIENT partner_company;

-- 将Share授予Recipient
GRANT SELECT ON SHARE my_partner_share TO RECIPIENT partner_company;

-- 获取激活链接(发给合作方)
DESC RECIPIENT partner_company;

8. Delta Live Tables(DLT):声明式数据管道

8.1 DLT的设计哲学

Delta Live Tables(DLT)是Databricks对"声明式数据管道"的实现。传统的ETL管道需要你命令式地指定:先读这个表,然后做这个转换,再写到那个表,如果失败了怎么办,依赖顺序怎么排。DLT让你只需声明"这张表应该是什么",由框架自动推断执行顺序、管理依赖、处理错误、自动优化。

DLT的核心思想与React的声明式UI有相似之处:你不描述"如何"更新UI,你描述"UI应该是什么样",框架负责diff和更新。DLT里,你不描述"如何"生成数据,你描述"每张表的定义",DLT负责执行计划、依赖管理和增量更新。

8.2 DLT的三种表类型

Streaming Table(流式表)是为增量数据处理设计的,使用APPEND ONLY语义——每次运行只处理新数据(基于Structured Streaming的微批或连续处理)。适用于从Kafka、Event Hubs、Kinesis摄入数据,或处理每天新增的日志文件。

Materialized View(物化视图)在每次更新时完全重新计算结果,适用于需要聚合历史数据的场景(如每日汇总表),DLT会自动判断是否需要全量刷新。

View是临时的、不物化存储的视图,只在管道内部用于逻辑封装,不会创建实际的Delta表。

8.3 DLT Pipeline的编写

# DLT管道代码(在Notebook中编写,由DLT框架执行)
import dlt
from pyspark.sql import functions as F

# Bronze层:原始数据摄入(Streaming Table)
@dlt.table(
    name="raw_events",
    comment="原始事件数据,从Kafka摄入",
    table_properties={
        "quality": "bronze",
        "pipelines.reset.allowed": "false"  # 防止意外重置
    }
)
def raw_events():
    return (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "broker1:9092")
        .option("subscribe", "events_topic")
        .option("startingOffsets", "latest")
        .load()
        .select(
            F.col("timestamp").alias("kafka_timestamp"),
            F.from_json(F.col("value").cast("string"), events_schema).alias("data")
        )
        .select("kafka_timestamp", "data.*")
    )

# Silver层:清洗和标准化(Streaming Table,从Bronze增量读取)
@dlt.table(
    name="cleaned_events",
    comment="清洗后的事件数据"
)
@dlt.expect("valid_user_id", "user_id IS NOT NULL")
@dlt.expect_or_drop("valid_amount", "amount > 0")
@dlt.expect_or_fail("valid_event_type", "event_type IN ('click', 'view', 'purchase')")
def cleaned_events():
    return (
        dlt.read_stream("raw_events")  # 增量读取Bronze表
        .withColumn("event_date", F.to_date("event_timestamp"))
        .withColumn("amount_usd", 
            F.when(F.col("currency") == "EUR", F.col("amount") * 1.1)
            .otherwise(F.col("amount")))
        .dropDuplicates(["event_id"])  # 去重
    )

# Gold层:业务指标聚合(Materialized View,每次全量计算)
@dlt.table(
    name="daily_revenue",
    comment="每日收入汇总"
)
def daily_revenue():
    return (
        dlt.read("cleaned_events")  # 读取整张表(非流式)
        .filter(F.col("event_type") == "purchase")
        .groupBy("event_date", "product_category")
        .agg(
            F.sum("amount_usd").alias("total_revenue"),
            F.count("event_id").alias("transaction_count"),
            F.countDistinct("user_id").alias("unique_buyers")
        )
    )

8.4 DLT的数据质量:Expectations

DLT的Expectations(期望)是声明式的数据质量规则,相当于内置的数据断言机制:

@dlt.expect(name, constraint) — 记录违规行但继续处理(软约束,用于监控)。

@dlt.expect_or_drop(name, constraint) — 删除违规行,其余数据继续流入目标表。

@dlt.expect_or_fail(name, constraint) — 任何违规都导致管道失败(硬约束,用于Critical数据质量要求)。

@dlt.expect_all(expectations_dict) — 一次定义多个期望。

所有Expectations的违规情况都会被自动记录,在DLT Pipeline的Event Log中可以查询,也可以通过event_log()函数进行分析。这就是"内置的数据质量监控",无需额外工具。

8.5 DLT的两种模式

Triggered模式:管道按计划触发,处理完新数据后停止集群。适合批处理场景(每天或每小时运行一次)。

Continuous模式:管道持续运行,像Structured Streaming一样实时处理新数据。适合低延迟的实时场景(分钟级或秒级延迟)。集群持续运行,费用较高。


9. Databricks Workflows:编排与调度

9.1 Workflow的本质

Databricks Workflows(以前叫Jobs)是平台内置的工作流编排系统,用于自动化执行数据管道、ML训练、数据处理等任务。它是Databricks生态中对Airflow、Prefect等外部编排器的原生替代。

一个Workflow由一个或多个Tasks组成,Tasks之间可以定义依赖关系,形成DAG(有向无环图)。支持多种Task类型,能够将不同技术栈的任务组织在一个Workflow中。

9.2 Task类型

Notebook Task:运行一个Databricks Notebook,是最常见的Task类型。可以传递参数(通过dbutils.widgets读取),可以使用dbutils.jobs.taskValues.set()向后续Task传递值。

Python Script Task:运行一个Python脚本文件(从Repo或DBFS读取),比Notebook更适合生产化的代码(可以用pytest测试、版本控制更友好)。

Spark Submit Task:使用spark-submit风格运行Spark作业(Jar或Python),适合已有的Spark应用迁移到Databricks。

Delta Live Tables Task:触发一个DLT Pipeline的运行,将DLT管道集成到更大的Workflow中。

dbt Task:原生集成dbt(Data Build Tool),运行dbt models,使得Databricks + dbt的组合工作流非常流畅。

SQL Task:运行SQL查询(在SQL Warehouse上),适合触发数据刷新或执行DDL操作。

Condition Task 和 For Each Task:控制流Task,实现条件分支和循环逻辑,使Workflow能处理动态的执行路径。

Run Job Task:将另一个Workflow作为子任务调用,实现Workflow的模块化和复用。

9.3 完整Workflow配置示例

# Workflow配置(通过Databricks Asset Bundles的YAML定义)
resources:
  jobs:
    daily_etl_pipeline:
      name: "Daily ETL Pipeline"
      schedule:
        quartz_cron_expression: "0 0 6 * * ?"  # 每天UTC 06:00执行
        timezone_id: "Europe/Berlin"
      
      # 邮件通知配置
      email_notifications:
        on_failure:
          - "data-team@company.com"
        on_success:
          - "data-team@company.com"
      
      # Job级别的集群配置(所有Task共享)
      job_clusters:
        - job_cluster_key: "etl_cluster"
          new_cluster:
            spark_version: "14.3.x-scala2.12"
            node_type_id: "i3.2xlarge"
            num_workers: 4
            spark_conf:
              "spark.sql.shuffle.partitions": "200"
      
      tasks:
        # Task 1: 数据摄入
        - task_key: "ingest_raw_data"
          job_cluster_key: "etl_cluster"
          python_wheel_task:
            package_name: "my_etl_package"
            entry_point: "ingest"
            parameters: ["--date", "{{job.start_time.iso_date}}"]
          libraries:
            - whl: "/Volumes/main/packages/my_etl_package-1.0.0-py3-none-any.whl"
        
        # Task 2: 数据转换(依赖Task 1完成)
        - task_key: "transform_data"
          depends_on:
            - task_key: "ingest_raw_data"
          job_cluster_key: "etl_cluster"
          notebook_task:
            notebook_path: "/Repos/main/etl/notebooks/transform"
            base_parameters:
              date: "{{job.start_time.iso_date}}"
        
        # Task 3: DLT管道(依赖Task 2)
        - task_key: "run_dlt_pipeline"
          depends_on:
            - task_key: "transform_data"
          pipeline_task:
            pipeline_id: "{{var.dlt_pipeline_id}}"
            full_refresh: false
        
        # Task 4: 数据质量检查(依赖Task 3)
        - task_key: "data_quality_check"
          depends_on:
            - task_key: "run_dlt_pipeline"
          job_cluster_key: "etl_cluster"
          notebook_task:
            notebook_path: "/Repos/main/etl/notebooks/quality_check"
          
        # Task 5: 通知(无论成功失败都执行)
        - task_key: "send_notification"
          depends_on:
            - task_key: "data_quality_check"
              outcome: "FAILED"  # 只在失败时触发
          run_if: "AT_LEAST_ONE_FAILED"
          notebook_task:
            notebook_path: "/Repos/main/etl/notebooks/alert_notification"

9.4 动态值引用(Task Values)

Workflows允许Task之间通过Task Values传递动态数据:

# Task 1: 设置值供后续Task使用
from databricks.sdk.runtime import dbutils

processed_count = 1500000
dbutils.jobs.taskValues.set(key="processed_records", value=processed_count)
dbutils.jobs.taskValues.set(key="output_path", value="/delta/processed/2024-01-15")

# Task 2(Task 1的下游): 读取Task 1设置的值
record_count = dbutils.jobs.taskValues.get(
    taskKey="ingest_raw_data",
    key="processed_records"
)
output_path = dbutils.jobs.taskValues.get(
    taskKey="ingest_raw_data", 
    key="output_path"
)

print(f"Processing {record_count} records from {output_path}")

10. Databricks SQL:分析师的战场

10.1 SQL Warehouse深度解析

SQL Warehouse是专为SQL分析工作负载优化的计算资源,有别于通用Cluster的几个关键特性:

Photon引擎:SQL Warehouse默认使用Photon(C++向量化执行引擎),对列式扫描、聚合、Hash Join等操作的加速幅度从2倍到12倍不等。Photon对SQL语义完全兼容,不需要修改查询。

自动缩放(Auto-Stop和Scaling):SQL Warehouse在不使用时自动暂停(默认10分钟无查询后),在有查询时自动重启。对于多并发查询,可以配置扩展策略(Scaling Policy),在繁忙时自动增加Cluster数量(注意:一个"SQL Warehouse"可以包含多个底层Cluster)。

Serverless SQL Warehouse:启动时间从2-5分钟(Classic)降到几秒,计费粒度更细(精确到秒级),运行在Databricks管理的基础设施上(不占用你的云账号VPC资源)。这是现代Databricks最推荐的SQL分析计算形式。

-- SQL Warehouse中可以执行的典型操作

-- 查询Delta表
SELECT 
    date_trunc('month', transaction_date) AS month,
    product_category,
    SUM(amount) AS total_revenue,
    COUNT(DISTINCT customer_id) AS unique_customers
FROM main.finance.transactions
WHERE transaction_date >= '2024-01-01'
GROUP BY ALL  -- Databricks SQL支持GROUP BY ALL(自动推断分组列)
ORDER BY month DESC, total_revenue DESC;

-- 使用PIVOT进行数据透视
SELECT *
FROM (
    SELECT product_category, month(transaction_date) as month, amount
    FROM main.finance.transactions
)
PIVOT (
    SUM(amount) FOR month IN (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
);

-- Databricks SQL扩展语法
-- QUALIFY(窗口函数过滤,避免嵌套子查询)
SELECT user_id, event_date, revenue,
       ROW_NUMBER() OVER (PARTITION BY event_date ORDER BY revenue DESC) AS rank
FROM events
QUALIFY rank <= 10;  -- 直接过滤窗口函数结果

-- ILIKE(大小写不敏感的LIKE)
SELECT * FROM products WHERE name ILIKE '%coffee%';

10.2 Query Editor与可视化

SQL Editor提供了完整的SQL开发体验:语法高亮、自动补全(表名、列名)、查询历史记录。每次查询结果都可以直接生成多种可视化图表(柱状图、折线图、散点图、热力图等),无需Python或额外工具。

重要特性是Query Parameters(查询参数),允许你在SQL中使用{{parameter_name}}占位符,用于创建动态查询和交互式Dashboard:

-- 使用参数化查询(双大括号语法)
SELECT *
FROM main.finance.transactions
WHERE 
    transaction_date BETWEEN {{start_date}} AND {{end_date}}
    AND product_category = '{{category}}'
    AND amount >= {{min_amount}};

10.3 Databricks SQL Dashboards

Dashboard是将多个可视化组件(图表、数字、文本)组织在一个页面上,并可以共享给非技术用户的功能。新版的AI/BI Dashboards(Lakeview)在2024年推出,提供了更现代的设计和AI辅助功能(自然语言生成SQL/图表)。

Dashboard可以设置自动刷新频率,并嵌入到外部系统(通过iframe),是面向业务用户的数据展示层。


11. MLflow:ML生命周期管理

11.1 MLflow的四个核心模块

MLflow是Databricks开源并主导的ML生命周期管理平台,由四个核心组件构成,在Databricks中深度集成(无需任何配置即可使用)。

MLflow Tracking解决的问题是:如何记录、比较和重现ML实验。每次训练运行(Run)可以记录:Parameters(超参数,如learning_rate、max_depth)、Metrics(模型评估指标,如accuracy、AUC,支持记录多个Epoch的历史曲线)、Artifacts(模型文件、图表、特征重要性图、任何文件)、Tags(自由标签,如团队、项目、数据版本)。

import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, roc_auc_score
import pandas as pd

# 在Databricks中,MLflow自动连接到Workspace的Tracking Server
# 设置实验(如果不设置,默认使用Notebook路径)
mlflow.set_experiment("/Users/bill@company.com/fraud_detection")

# 使用context manager自动管理Run生命周期
with mlflow.start_run(run_name="rf_baseline_v1") as run:
    # 记录参数
    params = {"n_estimators": 100, "max_depth": 10, "min_samples_split": 5}
    mlflow.log_params(params)
    
    # 添加标签
    mlflow.set_tags({
        "model_type": "random_forest",
        "data_version": "2024-01-15",
        "team": "fraud-detection"
    })
    
    # 训练模型
    model = RandomForestClassifier(**params, random_state=42)
    model.fit(X_train, y_train)
    
    # 评估并记录Metrics
    y_pred = model.predict(X_test)
    y_prob = model.predict_proba(X_test)[:, 1]
    
    accuracy = accuracy_score(y_test, y_pred)
    auc = roc_auc_score(y_test, y_prob)
    
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("auc", auc)
    
    # 记录图表(特征重要性)
    import matplotlib.pyplot as plt
    fig, ax = plt.subplots(figsize=(10, 6))
    feature_importances = pd.Series(model.feature_importances_, index=feature_names)
    feature_importances.nlargest(20).plot(kind='barh', ax=ax)
    mlflow.log_figure(fig, "feature_importance.png")
    plt.close()
    
    # 记录模型(最重要的步骤)
    # input_example帮助MLflow推断模型的输入Schema
    mlflow.sklearn.log_model(
        sk_model=model,
        artifact_path="model",
        input_example=X_train.head(5),
        signature=mlflow.models.infer_signature(X_train, y_pred)
    )
    
    print(f"Run ID: {run.info.run_id}")
    print(f"Accuracy: {accuracy:.4f}, AUC: {auc:.4f}")

11.2 MLflow Model Registry

Model Registry是ML模型的版本控制和部署管理系统。一个模型在Registry中经历三个阶段:Staging(测试中)、Production(生产运行)、Archived(已归档)。

在Unity Catalog启用后,模型也遵循三层命名空间(catalog.schema.model_name),并受UC权限管理。

import mlflow

# 将一个Run的模型注册到Model Registry
run_id = "your_run_id_here"
model_uri = f"runs:/{run_id}/model"

# 注册模型(如果不存在则创建,如果存在则创建新版本)
registered_model = mlflow.register_model(
    model_uri=model_uri,
    name="main.ml_models.fraud_detector"  # UC三层命名
)

print(f"Model Version: {registered_model.version}")

# 使用MlflowClient进行更精细的管理
from mlflow.tracking import MlflowClient
client = MlflowClient()

# 将新版本设置为Champion(Production)
client.set_registered_model_alias(
    name="main.ml_models.fraud_detector",
    alias="champion",
    version=registered_model.version
)

# 通过别名加载生产模型(推荐方式,解耦版本号)
production_model = mlflow.sklearn.load_model(
    "models:/main.ml_models.fraud_detector@champion"
)

# 或者按版本加载
specific_version_model = mlflow.sklearn.load_model(
    "models:/main.ml_models.fraud_detector/3"
)

11.3 MLflow的pyfunc:模型的通用接口

mlflow.pyfunc是MLflow的通用模型接口,任何模型(sklearn、PyTorch、XGBoost甚至自定义预测逻辑)都可以被包装成pyfunc格式。这个格式是Databricks Model Serving的统一接口。

# 创建自定义pyfunc模型(包含预处理+模型推断+后处理)
class FraudDetectionModel(mlflow.pyfunc.PythonModel):
    def load_context(self, context):
        """加载模型和预处理器(在加载时调用一次)"""
        import joblib
        self.model = joblib.load(context.artifacts["model_path"])
        self.preprocessor = joblib.load(context.artifacts["preprocessor_path"])
        self.threshold = 0.65  # 决策阈值
    
    def predict(self, context, model_input):
        """预测逻辑"""
        import pandas as pd
        
        # 预处理
        X = self.preprocessor.transform(model_input)
        
        # 获取概率
        proba = self.model.predict_proba(X)[:, 1]
        
        # 返回包含概率和二元预测的DataFrame
        return pd.DataFrame({
            "fraud_probability": proba,
            "is_fraud": (proba >= self.threshold).astype(int),
            "risk_level": pd.cut(proba, 
                bins=[0, 0.3, 0.65, 1.0], 
                labels=["low", "medium", "high"])
        })

# 保存自定义pyfunc模型
with mlflow.start_run():
    mlflow.pyfunc.log_model(
        artifact_path="fraud_model",
        python_model=FraudDetectionModel(),
        artifacts={
            "model_path": "path/to/model.pkl",
            "preprocessor_path": "path/to/preprocessor.pkl"
        },
        pip_requirements=["scikit-learn==1.3.0", "joblib"]
    )

11.4 MLflow Projects

MLflow Projects是ML代码的标准化打包格式,通过MLproject文件定义项目的入口点和依赖:

# MLproject文件
name: fraud_detection_training

conda_env: conda.yaml  # 或 python_env.yaml

entry_points:
  train:
    parameters:
      learning_rate: {type: float, default: 0.01}
      max_depth: {type: int, default: 6}
      n_estimators: {type: int, default: 100}
      data_path: {type: str}
    command: "python train.py --lr {learning_rate} --max-depth {max_depth} --n-estimators {n_estimators} --data {data_path}"
  
  evaluate:
    parameters:
      model_uri: {type: str}
      test_data: {type: str}
    command: "python evaluate.py --model-uri {model_uri} --test-data {test_data}"

12. Databricks AutoML

12.1 AutoML的定位

Databricks AutoML是为数据科学家设计的自动化建模工具,而非替代数据科学家的工具。它的核心价值在于:快速建立性能基准(Baseline),并自动生成可编辑的Notebook代码,作为后续手动调优的起点。

AutoML支持三类任务:分类(Classification)、回归(Regression)、预测(Forecasting/Time Series)。它会自动:探索性数据分析(EDA)、特征工程(处理缺失值、编码分类变量、日期特征提取)、超参数调优(基于MLflow Hyperopt)、模型比较(训练多种算法并比较)。

12.2 使用AutoML

from databricks import automl

# 加载训练数据(Spark DataFrame或pandas DataFrame)
train_df = spark.table("main.ml_data.fraud_training_set")

# 运行AutoML分类任务
summary = automl.classify(
    dataset=train_df,
    target_col="is_fraud",
    
    # 可选:指定时间列(用于时序切分,避免数据泄露)
    time_col="transaction_timestamp",
    
    # 可选:排除不应作为特征的列
    exclude_cols=["transaction_id", "customer_ssn"],
    
    # 可选:自定义主要评估指标
    primary_metric="f1",
    
    # 最大运行时间(分钟)
    timeout_minutes=30,
    
    # 可选:指定数据分割方式
    split_col="split",  # 数据中预定义了train/val/test列
    
    # MLflow实验名
    experiment_dir="/Users/bill@company.com/automl_experiments"
)

# 查看最佳模型
best_model = summary.best_trial
print(f"Best model: {best_model.model_path}")
print(f"Best F1: {best_model.metrics['test_f1_score']:.4f}")

# 获取生成的Notebook路径(可以打开并修改)
print(f"Generated Notebook: {summary.trials[0].notebook_path}")

# 加载最佳模型并进行预测
import mlflow
best_model_uri = best_model.model_path
loaded_model = mlflow.pyfunc.load_model(best_model_uri)
predictions = loaded_model.predict(test_data)

AutoML最有价值的输出是生成的Notebook——它包含了完整的特征工程、模型训练、评估代码,是一个可直接修改的模板,让数据科学家从"0-1"的时间从几天压缩到几分钟。


13. Feature Store:特征工程的工厂

13.1 Feature Store解决的核心问题

Feature Store(特征存储)解决了ML工程中一个经典问题:训练-服务不一致(Training-Serving Skew)

在没有Feature Store的情况下,训练时的特征计算代码(Python/Spark)和服务时的特征计算代码(Python/Java/REST API)是分开维护的。随着时间推移,两套代码出现细微差异(不同的空值处理、不同的归一化公式),导致线上模型的实际输入与训练时不同,产生难以发现的性能下降。

Feature Store通过统一存储和统一计算逻辑解决这个问题:特征只定义一次,训练和服务都从同一个Feature Store读取。

13.2 使用Feature Store

from databricks.feature_engineering import FeatureEngineeringClient

fe = FeatureEngineeringClient()

# 步骤1:创建Feature Table
# 先定义特征计算函数(返回Spark DataFrame)
def compute_user_features(df):
    return (df
        .groupBy("user_id")
        .agg(
            F.count("transaction_id").alias("transaction_count_30d"),
            F.sum("amount").alias("total_spend_30d"),
            F.avg("amount").alias("avg_transaction_amount"),
            F.stddev("amount").alias("std_transaction_amount"),
            F.max("fraud_flag").alias("has_fraud_history")
        )
        .withColumn("spend_velocity", 
            F.col("total_spend_30d") / F.lit(30))
    )

# 计算特征(用过去30天数据)
user_features_df = compute_user_features(transactions_df)

# 将特征写入Feature Store(创建Feature Table)
fe.create_table(
    name="main.features.user_transaction_features",
    primary_keys=["user_id"],
    timestamp_keys=["feature_date"],  # 时序特征用时间戳键
    df=user_features_df,
    description="用户过去30天的交易特征,每日更新"
)

# 步骤2:在训练时使用Feature Store
from databricks.feature_engineering import FeatureLookup

# 定义特征查找(从Feature Table自动join特征)
feature_lookups = [
    FeatureLookup(
        table_name="main.features.user_transaction_features",
        feature_names=["transaction_count_30d", "total_spend_30d", 
                       "avg_transaction_amount", "spend_velocity"],
        lookup_key="user_id",
        timestamp_lookup_key="transaction_timestamp"  # Point-in-time correct join
    ),
    FeatureLookup(
        table_name="main.features.merchant_risk_features",
        feature_names=["merchant_risk_score", "merchant_category_fraud_rate"],
        lookup_key="merchant_id"
    )
]

# 创建训练数据集(自动Join特征)
training_set = fe.create_training_set(
    df=labels_df,  # 只包含主键和label的DataFrame
    feature_lookups=feature_lookups,
    label="is_fraud",
    exclude_columns=["transaction_timestamp"]  # 不作为模型输入的列
)

# 转换为pandas DataFrame用于训练
training_df = training_set.load_df().toPandas()

# 步骤3:用Feature Store记录模型(关键!记录特征依赖)
import mlflow
with mlflow.start_run():
    model = train_model(training_df)
    
    # 使用fe.log_model代替mlflow.sklearn.log_model
    # 这样模型和Feature Store的关联关系会被记录
    fe.log_model(
        model=model,
        artifact_path="fraud_model",
        flavor=mlflow.sklearn,
        training_set=training_set,
        registered_model_name="main.ml_models.fraud_detector"
    )

14. Model Serving:模型上线的最后一公里

14.1 Model Serving的架构

Databricks Model Serving提供了托管的REST API端点,将MLflow中注册的模型暴露为HTTP API,用于实时推断(Real-time Inference)。它是Serverless的,按调用量和使用时间计费,无需管理任何Kubernetes/容器基础设施。

Model Serving支持多种模型类型:MLflow Python模型(sklearn/PyTorch/XGBoost等)、外部模型(通过统一API代理OpenAI/Anthropic/Azure OpenAI,便于模型切换和成本控制)、基础模型API(Databricks托管的Llama、DBRX、Mistral等开源LLM)。

14.2 创建和使用Model Serving Endpoint

import mlflow
from mlflow.deployments import get_deploy_client

# 方式1:通过Databricks SDK创建Endpoint
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import ServedModelInput, ServedModelInputWorkloadSize

w = WorkspaceClient()

endpoint = w.serving_endpoints.create_and_wait(
    name="fraud-detector-endpoint",
    config=w.serving_endpoints.EndpointCoreConfigInput(
        served_models=[
            ServedModelInput(
                name="fraud-detector-v1",
                model_name="main.ml_models.fraud_detector",
                model_version="3",
                workload_size=ServedModelInputWorkloadSize.SMALL,  # SMALL/MEDIUM/LARGE
                scale_to_zero_enabled=True  # 没有流量时自动缩容到0
            )
        ]
    )
)

# 方式2:A/B测试(流量切分)
endpoint_with_ab = w.serving_endpoints.create_and_wait(
    name="fraud-detector-ab-test",
    config=w.serving_endpoints.EndpointCoreConfigInput(
        served_models=[
            ServedModelInput(
                name="champion",
                model_name="main.ml_models.fraud_detector",
                model_version="3",
                traffic_percentage=80  # 80%流量
            ),
            ServedModelInput(
                name="challenger",
                model_name="main.ml_models.fraud_detector_v2",
                model_version="1",
                traffic_percentage=20  # 20%流量用于测试新模型
            )
        ]
    )
)

# 调用Model Serving Endpoint
import requests

endpoint_url = "https://your-workspace.azuredatabricks.net/serving-endpoints/fraud-detector-endpoint/invocations"
token = "your_databricks_token"

response = requests.post(
    url=endpoint_url,
    headers={"Authorization": f"Bearer {token}"},
    json={
        "dataframe_records": [
            {
                "user_id": "user_123",
                "merchant_id": "merch_456",
                "amount": 1500.00,
                "transaction_hour": 2,
                "country_code": "DE"
            }
        ]
    }
)

result = response.json()
print(result["predictions"])
# 输出: [{"fraud_probability": 0.87, "is_fraud": 1, "risk_level": "high"}]

14.3 Foundation Models API

Databricks提供了对开源大语言模型(如Meta Llama 3, Mistral, DBRX)的托管API,遵循OpenAI API格式,便于从OpenAI迁移:

from openai import OpenAI

# 使用Databricks Foundation Models API(OpenAI兼容接口)
client = OpenAI(
    api_key="your-databricks-token",
    base_url="https://your-workspace.azuredatabricks.net/serving-endpoints"
)

# 使用Llama 3进行Chat Completion
response = client.chat.completions.create(
    model="databricks-meta-llama-3-1-70b-instruct",
    messages=[
        {"role": "system", "content": "You are a financial analysis assistant."},
        {"role": "user", "content": "Analyze the key risks in the following transaction: ..."}
    ],
    max_tokens=500,
    temperature=0.1
)

print(response.choices[0].message.content)

15. Databricks Asset Bundles(DAB):基础设施即代码

15.1 DAB的核心价值

Databricks Asset Bundles(DAB)是Databricks对"基础设施即代码"的官方实现。它允许你用YAML文件定义所有的Databricks资源(Workflows、Clusters、DLT Pipelines、Model Serving Endpoints、Jobs),并通过CLI将这些定义部署到不同的环境(dev/staging/prod)。

DAB对于Platform Engineer来说极其重要,因为它解决了Databricks在CI/CD流程中的关键问题:如何将Databricks资源的变更纳入版本控制和自动化部署。没有DAB之前,只能手动通过UI点击来配置和部署,无法实现基础设施的可重现性。

15.2 DAB的项目结构

my_databricks_project/
├── databricks.yml          ← 主配置文件(定义bundle名、workspace连接、环境)
├── resources/
│   ├── jobs.yml            ← Workflow/Job定义
│   ├── pipelines.yml       ← DLT Pipeline定义
│   └── serving_endpoints.yml ← Model Serving Endpoint定义
├── src/
│   ├── notebooks/          ← Notebook文件
│   ├── python/             ← Python源代码
│   └── sql/                ← SQL文件
└── tests/
    └── test_pipeline.py    ← 测试代码

15.3 完整的DAB配置示例

# databricks.yml(主配置文件)
bundle:
  name: finlakehouse_etl

# 变量定义(可被各环境覆盖)
variables:
  catalog_name:
    default: "dev"
    description: "Unity Catalog名称"
  dlt_pipeline_id:
    description: "DLT Pipeline ID(部署后填入)"

# 工作区连接(不同环境连接不同Workspace)
workspace:
  host: https://your-workspace.azuredatabricks.net
  auth_type: pat

# 多环境配置
targets:
  # 开发环境
  dev:
    mode: development  # 开发模式:所有资源名称加dev前缀,避免污染生产
    workspace:
      host: https://dev-workspace.azuredatabricks.net
    variables:
      catalog_name: "dev"
    
  # 生产环境
  prod:
    mode: production
    workspace:
      host: https://prod-workspace.azuredatabricks.net
    variables:
      catalog_name: "prod"
    run_as:
      service_principal_name: etl-service-principal@project.com

# 引入其他YAML资源文件
include:
  - resources/*.yml
# resources/jobs.yml
resources:
  jobs:
    macro_etl_daily:
      name: "FinLakehouse - Macro ETL Daily"
      description: "Daily macro economic data ingestion and processing"
      
      schedule:
        quartz_cron_expression: "0 30 7 * * ?"
        timezone_id: "Europe/Berlin"
        pause_status: UNPAUSED
      
      # 在开发环境中,jobs.run_as可以是当前用户
      # 在生产环境中,使用Service Principal
      
      job_clusters:
        - job_cluster_key: "default_cluster"
          new_cluster:
            spark_version: "14.3.x-scala2.12"
            node_type_id: "i3.xlarge"
            autoscale:
              min_workers: 1
              max_workers: 4
            spark_conf:
              "spark.databricks.delta.preview.enabled": "true"
            data_security_mode: SINGLE_USER
      
      tasks:
        - task_key: "ingest_ecb_rates"
          job_cluster_key: "default_cluster"
          python_wheel_task:
            package_name: "finlakehouse"
            entry_point: "ingest_ecb"
            parameters:
              - "--catalog"
              - "${var.catalog_name}"
              - "--env"
              - "${bundle.target}"
          
        - task_key: "process_bronze_to_silver"
          depends_on:
            - task_key: "ingest_ecb_rates"
          job_cluster_key: "default_cluster"
          notebook_task:
            notebook_path: ./src/notebooks/bronze_to_silver
            base_parameters:
              catalog: "${var.catalog_name}"
      
      email_notifications:
        on_failure:
          - "bill@company.com"
      
      # 健康检查
      health:
        rules:
          - metric: RUN_DURATION_SECONDS
            op: GREATER_THAN
            value: 3600  # 如果超过1小时,发出警告

15.4 DAB的常用CLI命令

# 安装Databricks CLI(v0.200+,Go版本,支持DAB)
curl -fsSL https://raw.githubusercontent.com/databricks/setup-cli/main/install.sh | sh

# 配置认证
databricks configure --token

# 验证bundle配置(不实际部署,只做验证和渲染)
databricks bundle validate

# 部署到开发环境(上传代码+创建/更新Databricks资源)
databricks bundle deploy --target dev

# 部署到生产环境
databricks bundle deploy --target prod

# 手动触发一次Job运行(不等待完成)
databricks bundle run macro_etl_daily --target dev

# 查看部署状态
databricks bundle summary --target prod

# 销毁部署(删除所有Bundle创建的资源)
databricks bundle destroy --target dev

# 在GitHub Actions中的典型CI/CD配置
# .github/workflows/deploy.yml
# on: push to main → databricks bundle deploy --target prod

16. Repos & Git 集成

16.1 Repos的作用

Databricks Repos允许你将Git仓库(GitHub、GitLab、Bitbucket、Azure DevOps)克隆到Databricks Workspace,直接在Workspace中编辑、提交、推送代码。这解决了传统Notebook只能存在Workspace中、无法被Git有效版本控制的问题。

通过Repos,你的工作流程可以是:本地IDE开发 → Git Push → GitHub Actions触发CI → Databricks Bundle Deploy → 生产部署。Notebook文件在Repos中以.py格式(使用# COMMAND ----------分隔符)存储,可以被pytest测试,可以被代码审查。

# 在Workspace UI中添加Repo
# 1. 左侧导航栏 → Repos → Add Repo
# 2. 输入GitHub仓库URL
# 3. 选择分支

# 或者通过API/CLI添加
databricks repos create \
    --url https://github.com/your-org/finlakehouse \
    --provider gitHub

16.2 与Git的集成最佳实践

在生产环境中,应该通过CI/CD管道(而非手动操作)来更新Repos。典型的工作流是:开发分支合并到main → GitHub Actions触发 → databricks bundle deploy --target prod → 生产Repo自动更新到最新代码并重新部署Job。

不要在生产Workspace中直接编辑代码——这是一个常见的反模式,会导致"谁改了什么、什么时候改的"无法追踪。所有生产代码变更都应该通过Git PR流程


17. Secrets & Security 安全体系

17.1 Secrets管理

Databricks Secrets提供了安全存储敏感信息(API密钥、数据库密码、访问令牌)的机制。Secrets存储在加密的后端,在Notebook中引用时会被自动隐藏(显示为[REDACTED])。

# 通过CLI创建Secret Scope(两种类型)
# 1. Databricks-backed(存在Databricks内部)
databricks secrets create-scope --scope my-secrets

# 2. Azure Key Vault-backed(存在Azure Key Vault)
databricks secrets create-scope \
    --scope azure-kv-scope \
    --scope-backend-type AZURE_KEYVAULT \
    --resource-id /subscriptions/.../vaults/my-key-vault \
    --dns-name https://my-key-vault.vault.azure.net/

# 添加Secret
databricks secrets put-secret my-secrets kafka-api-key --string-value "your-api-key"
databricks secrets put-secret my-secrets db-password --string-value "your-password"

# 列出Scope中的所有Secret(只能看key名,不能看值)
databricks secrets list-secrets my-secrets
# 在Notebook/脚本中使用Secrets
api_key = dbutils.secrets.get(scope="my-secrets", key="kafka-api-key")
db_password = dbutils.secrets.get(scope="my-secrets", key="db-password")

# 在Spark配置中使用(用于存储访问)
spark.conf.set(
    "fs.azure.account.key.mystorageaccount.blob.core.windows.net",
    dbutils.secrets.get(scope="azure-storage", key="storage-account-key")
)

17.2 IP访问列表与网络安全

在企业Databricks部署中,网络安全是关键考量:

Private Link / VNet Injection:将Databricks的Control Plane(API/UI)和Data Plane(Clusters)部署在你的VPC/VNet内,避免流量经过公网。

IP Access Lists:限制只有特定IP段才能访问Databricks Workspace的Web UI和REST API,防止未授权访问。

Customer-Managed Keys(CMK):使用你自己的KMS密钥加密Databricks的存储(Notebook内容、Job运行日志等),满足金融监管(DORA/BaFin)对数据加密的要求。


18. Databricks CLI & REST API

18.1 CLI全览

Databricks CLI(v0.200+,基于Go重写)是与Databricks平台交互的命令行工具,覆盖了几乎所有的API功能:

# 认证配置
databricks configure                    # 交互式配置
databricks configure --token            # 使用PAT(Personal Access Token)
databricks auth login --host https://... # OAuth登录

# Workspace操作
databricks workspace ls /Users/bill@company.com
databricks workspace import ./local_notebook.py /Repos/main/notebook.py
databricks workspace export /path/to/notebook ./local.py

# Cluster管理
databricks clusters list
databricks clusters create --json @cluster_config.json
databricks clusters start --cluster-id 1234-567890-abc123
databricks clusters delete --cluster-id 1234-567890-abc123

# Job/Workflow管理
databricks jobs list
databricks jobs run-now --job-id 123456
databricks runs get --run-id 789012
databricks runs cancel --run-id 789012

# DBFS(Databricks文件系统)
databricks fs ls dbfs:/path/to/data
databricks fs cp ./local_file.csv dbfs:/path/to/destination/
databricks fs rm dbfs:/path/to/file

# Secrets
databricks secrets list-scopes
databricks secrets list-secrets --scope my-scope
databricks secrets put-secret --scope my-scope --key my-key --string-value "value"

# Unity Catalog
databricks catalogs list
databricks schemas list --catalog-name main
databricks tables list --catalog-name main --schema-name finance

18.2 REST API

Databricks的所有功能都通过REST API暴露,是自动化和集成的基础:

import requests

DATABRICKS_HOST = "https://your-workspace.azuredatabricks.net"
TOKEN = "your-personal-access-token"

headers = {
    "Authorization": f"Bearer {TOKEN}",
    "Content-Type": "application/json"
}

# 列出所有Clusters
response = requests.get(
    f"{DATABRICKS_HOST}/api/2.0/clusters/list",
    headers=headers
)
clusters = response.json()["clusters"]

# 触发Job运行
job_run = requests.post(
    f"{DATABRICKS_HOST}/api/2.1/jobs/run-now",
    headers=headers,
    json={
        "job_id": 123456,
        "job_parameters": {
            "date": "2024-01-15",
            "env": "prod"
        }
    }
)
run_id = job_run.json()["run_id"]

# 轮询运行状态
import time
while True:
    run_status = requests.get(
        f"{DATABRICKS_HOST}/api/2.1/jobs/runs/get",
        headers=headers,
        params={"run_id": run_id}
    )
    state = run_status.json()["state"]["life_cycle_state"]
    
    if state in ["TERMINATED", "SKIPPED", "INTERNAL_ERROR"]:
        result_state = run_status.json()["state"].get("result_state")
        print(f"Run completed: {result_state}")
        break
    
    print(f"Run state: {state}, waiting...")
    time.sleep(30)

18.3 Databricks Python SDK

比直接调用REST API更方便的是使用官方Python SDK:

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import RunNow

# 使用环境变量认证(推荐)
# DATABRICKS_HOST 和 DATABRICKS_TOKEN 环境变量
w = WorkspaceClient()

# 列出所有集群
for cluster in w.clusters.list():
    print(f"{cluster.cluster_id}: {cluster.cluster_name} ({cluster.state.value})")

# 触发Job并等待完成
run = w.jobs.run_now_and_wait(
    job_id=123456,
    job_parameters={"date": "2024-01-15"}
)
print(f"Job completed with state: {run.state.result_state.value}")

# 上传文件到Unity Catalog Volume
with open("./data.csv", "rb") as f:
    w.files.upload("/Volumes/main/data/uploads/data.csv", f)

19. Databricks Connect:本地开发的桥梁

19.1 Databricks Connect的价值

Databricks Connect允许你在本地IDE(VS Code、PyCharm)中运行代码,但计算在远程Databricks Cluster上执行。你可以在本地用断点调试Spark代码,代码直接操作Databricks上的Delta表,完全不需要将数据下载到本地。

这对大型项目的开发效率提升巨大:本地IDE的代码补全、调试器、Git集成,配合Databricks的计算和存储能力,是最理想的开发体验。

# 安装(需要与远程DBR版本匹配)
pip install databricks-connect==14.3.*  # 与远程DBR 14.3版本对应
# 在本地Python脚本中使用(与Notebook代码几乎相同)
from databricks.connect import DatabricksSession

# 连接到远程Cluster(使用.databrickscfg中的配置)
spark = DatabricksSession.builder \
    .profile("DEFAULT") \
    .getOrCreate()

# 读取远程Databricks上的Delta表(数据不下载到本地,计算在远端)
df = spark.table("main.finance.transactions")
result = df.filter("amount > 1000").groupBy("user_id").count()

# 只有collect()/toPandas()等Action会触发远程计算并将结果传回本地
result.show()  # 触发远程计算,结果打印到本地控制台

# 可以在VS Code/PyCharm中设置断点调试!

20. Partner Connect & 生态集成

20.1 主要集成类别

Databricks拥有广泛的生态系统,Partner Connect页面(Workspace左侧导航)提供了一键集成常见第三方工具的功能:

数据摄入(Data Ingestion):Fivetran、Airbyte(将SaaS应用数据同步到Delta Lake);Kafka + Confluent(实时流数据摄入);dbt Cloud(SQL转换层,与Databricks SQL深度集成)。

BI与可视化:Tableau、Power BI、Looker(通过JDBC/ODBC连接SQL Warehouse);这些工具连接SQL Warehouse后,可以直接查询Delta Lake中的数据,无需额外的数据复制。

数据质量:Great Expectations(Python数据质量框架,可与DLT的Expectations配合使用);Monte Carlo、Bigeye(数据可观测性平台)。

AI/ML:Weights & Biases(实验追踪,与MLflow互补);Hugging Face(模型下载和微调);LangChain(LLM应用开发,与Model Serving集成)。

数据目录:Alation、Collibra(企业级数据目录,可与Unity Catalog的Lineage集成)。


21. Databricks on 多云:AWS/Azure/GCP差异

21.1 架构上的统一与差异

Databricks的设计目标之一是跨云一致性——同样的Python代码、同样的SQL、同样的ML API,在AWS/Azure/GCP上行为一致。但在基础设施层面,有一些重要的云特定差异需要了解:

存储:在AWS上,底层存储是S3(s3://路径);在Azure上是ADLS Gen2(abfss://路径);在GCP上是GCS(gs://路径)。通过Unity Catalog,这些差异被抽象掉,代码中只使用catalog.schema.table三段式引用。

身份认证:在Azure上,使用Azure Active Directory(AAD)和Service Principals进行身份认证,与Azure AD深度集成,支持SSO(单点登录);在AWS上,使用IAM Roles和Instance Profiles;在GCP上,使用Service Accounts。

网络:Azure上叫VNet Injection,AWS上叫VPC Peering/PrivateLink,GCP上叫VPC Peering。原理相同,实现细节不同。

Serverless:Serverless SQL Warehouse和Serverless Job Compute(无需管理Cluster的Job运行方式)在各云的可用性不同,Azure和AWS支持较完整,GCP某些功能仍在推进中。


22. 完整架构总结:Platform Engineer视角

22.1 所有组件的关系图

                        ┌────────────────────────────────────────────────────────────┐
                        │                   DATABRICKS PLATFORM                       │
                        │                                                              │
                        │  ┌──────────────────────────────────────────────────────┐  │
                        │  │              Unity Catalog (治理层)                   │  │
                        │  │  Catalogs · Schemas · Tables · Volumes · Models       │  │
                        │  │  Lineage · Audit Log · Delta Sharing · Row/Col Policy │  │
                        │  └──────────────────────────────────────────────────────┘  │
                        │                                                              │
                        │  ┌─────────────┐  ┌──────────────┐  ┌──────────────────┐  │
                        │  │  Data Eng.  │  │  SQL/BI      │  │  ML/AI           │  │
                        │  │             │  │              │  │                  │  │
                        │  │ DLT         │  │ SQL Warehouse│  │ MLflow           │  │
                        │  │ Workflows   │  │ Dashboards   │  │ AutoML           │  │
                        │  │ Spark       │  │ Queries      │  │ Feature Store    │  │
                        │  └──────┬──────┘  └──────┬───────┘  │ Model Serving    │  │
                        │         │                │           └────────┬─────────┘  │
                        │  ┌──────▼──────────────────────────────────▼──────────┐   │
                        │  │                    Compute Layer                    │   │
                        │  │   All-Purpose Clusters  │  Job Clusters             │   │
                        │  │   SQL Warehouses (Photon)│  Serverless Compute      │   │
                        │  │         Databricks Runtime (DBR / ML / Photon)      │   │
                        │  └─────────────────────────────────────────────────────┘  │
                        │                                                              │
                        │  ┌──────────────────────────────────────────────────────┐  │
                        │  │           Storage & Open Formats (存储层)             │  │
                        │  │    Delta Lake · Apache Iceberg · Parquet · JSON       │  │
                        │  │    Object Storage: S3 / ADLS Gen2 / GCS               │  │
                        │  └──────────────────────────────────────────────────────┘  │
                        │                                                              │
                        │  ┌─────────────────────────────────────────────────────┐   │
                        │  │      开发者工具 & DevOps (开发层)                     │   │
                        │  │   Repos/Git · DAB (IaC) · CLI · REST API · SDK       │   │
                        │  │   Notebooks · Databricks Connect (Local Dev)          │   │
                        │  └─────────────────────────────────────────────────────┘   │
                        │                                                              │
                        └────────────────────────────────────────────────────────────┘

22.2 典型的端到端数据流

下面展示一个金融数据平台(FinLakehouse类场景)的端到端数据流,体现所有组件如何协作:

外部数据源(ECB API、Euribor、市场数据)
        │
        ▼
[摄入层] Python脚本/Notebook (Job Cluster)
  → 写入 Delta Lake Bronze (Raw数据,完整保留原始格式)
  → 通过Unity Catalog Volume暂存文件
        │
        ▼
[转换层] DLT Pipeline (Job Cluster,自动管理)
  Bronze → Silver(清洗、标准化、数据质量Expectations)
  Silver → Gold(业务指标聚合,如macro_state_daily)
  → 所有层的表都在Unity Catalog中注册
        │
        ├──────────────────────────┬─────────────────────────
        ▼                          ▼                         ▼
[分析层]                    [ML层]                     [服务层]
SQL Warehouse (Photon)       Feature Store              Model Serving
  → Dashboards/BI            → MLflow Experiments        → REST API
  → Tableau/Power BI连接     → MLflow Model Registry     → 实时预测
  → 自助查询                 → Model Training            → A/B测试
        │
        ▼
[编排层] Databricks Workflows
  → 调度所有上述任务的执行顺序
  → 监控、告警、重试
  → 通过DAB部署,纳入CI/CD

[治理层] Unity Catalog(横跨所有层)
  → 自动记录数据Lineage(从Bronze到Gold到Dashboard)
  → 列级权限控制(PII字段脱敏)
  → 审计日志(谁在什么时候读取了哪张表)

22.3 Platform Engineer的核心能力矩阵

作为一个Platform Engineer,你对Databricks的掌握程度应该体现在以下维度:

架构设计能力:能够根据业务需求选择合适的组件组合(DLT vs 手写Spark、Job Cluster vs SQL Warehouse、Managed vs External Table),并能解释取舍理由(Trade-off Analysis)。

成本优化能力:理解DBU(Databricks Unit)的计费模型,知道如何通过Auto-Terminate、Spot实例、Cluster共享、Serverless等手段控制成本。能够通过System Tables(system.billingsystem.compute)分析和优化成本。

治理设计能力:能够设计Unity Catalog的Catalog层次结构、权限模型、数据分类策略,满足GDPR/DORA等合规要求。

DevOps集成能力:使用DAB将Databricks资源纳入版本控制和CI/CD(GitHub Actions/Azure DevOps),实现可重现、可审计的部署流程。

性能调优能力:能够通过Spark UI诊断性能问题(数据倾斜、Shuffle过多、小文件问题),并通过Z-Order/Liquid Clustering、OPTIMIZE、AQE、广播Join等手段解决。

ML工程能力:能够设计完整的ML平台——从Feature Store到Model Training到Registry到Serving,确保训练-服务一致性,并集成监控和再训练流程。


附录A:关键配置速查

# 常用的Spark配置(在Cluster创建时设置)
spark_conf = {
    # 性能优化
    "spark.sql.shuffle.partitions": "200",
    "spark.sql.adaptive.enabled": "true",
    "spark.sql.adaptive.coalescePartitions.enabled": "true",
    "spark.sql.adaptive.skewJoin.enabled": "true",
    
    # Delta Lake优化
    "spark.databricks.delta.optimizeWrite.enabled": "true",
    "spark.databricks.delta.autoCompact.enabled": "true",
    
    # 内存管理
    "spark.memory.fraction": "0.8",
    "spark.memory.storageFraction": "0.3",
    
    # 广播Join阈值(10MB是默认值,可适当提高)
    "spark.sql.autoBroadcastJoinThreshold": "20971520",  # 20MB
}

附录B:常用dbutils命令速查

# 文件系统操作
dbutils.fs.ls("/mnt/data")           # 列出目录
dbutils.fs.cp("src", "dst")         # 复制文件
dbutils.fs.mv("src", "dst")         # 移动文件
dbutils.fs.rm("/path", recurse=True) # 删除
dbutils.fs.mkdirs("/new/path")       # 创建目录

# Notebook控制
dbutils.notebook.run("/path/to/nb", timeout_seconds=600, 
                     arguments={"param": "value"})
dbutils.notebook.exit("success")    # 退出当前Notebook并返回值

# Secrets
dbutils.secrets.listScopes()        # 列出所有Scope
dbutils.secrets.list("scope-name")  # 列出Scope中的Key
dbutils.secrets.get("scope", "key") # 获取Secret值

# Widgets(Notebook参数)
dbutils.widgets.text("date", "2024-01-01", "Processing Date")
dbutils.widgets.dropdown("env", "dev", ["dev", "staging", "prod"])
date = dbutils.widgets.get("date")  # 读取参数值
dbutils.widgets.removeAll()         # 清除所有Widgets

# Task Values(Workflow中跨Task传值)
dbutils.jobs.taskValues.set(key="output_count", value=1000)
dbutils.jobs.taskValues.get(taskKey="previous_task", key="output_count")

本文档涵盖了Databricks平台的所有主要组件。每个组件都是一个深度领域——建议以本文档为地图,根据当前项目需要深入某个具体方向。Delta Lake和Unity Catalog是优先级最高的深入方向,它们是平台的地基。