Databricks 完全指南:从架构原理到生产实践
版本: 2025年完整版
适用认证: Databricks Data Engineer Associate/Professional · AI Engineer Associate · ML Professional
定位: 不只是"怎么用",更是"为什么这样设计"——Platform Engineer视角的深度解读
目录
- Databricks 是什么:一个架构级的理解
- Workspace:你的工作台
- Clusters:计算引擎的核心
- Databricks Runtime(DBR)
- Apache Spark on Databricks
- Delta Lake:Lakehouse的地基
- Unity Catalog:数据治理的全局大脑
- Delta Live Tables(DLT):声明式数据管道
- Databricks Workflows:编排与调度
- Databricks SQL:分析师的战场
- MLflow:ML生命周期管理
- Databricks AutoML
- Feature Store:特征工程的工厂
- Model Serving:模型上线的最后一公里
- Databricks Asset Bundles(DAB):基础设施即代码
- Repos & Git 集成
- Secrets & Security 安全体系
- Databricks CLI & REST API
- Databricks Connect:本地开发的桥梁
- Partner Connect & 生态集成
- Databricks on 多云:AWS/Azure/GCP差异
- 完整架构总结:Platform Engineer视角
1. Databricks 是什么:架构级的理解
1.1 三句话的本质
很多人对Databricks的认知停留在"云上的Spark平台",但这是极大的误解。Databricks的本质可以用三句话描述:
第一,Databricks是一个统一的数据智能平台(Unified Data Intelligence Platform),它将数据工程(ETL/ELT)、数据仓库(SQL Analytics)、机器学习(ML/AI)三个传统上完全分离的工作负载,统一在一个平台上。这个统一不是简单的功能堆叠,而是共享同一套底层存储(Delta Lake)和治理层(Unity Catalog)。
第二,Databricks实现了"计算与存储分离"的现代Lakehouse架构。你的数据存在对象存储(S3/ADLS/GCS)上,计算资源(Clusters)按需启动、按需关闭。这带来了弹性伸缩能力和极低的存储成本。
第三,Databricks是一家云服务公司,不是开源软件。它构建在Apache Spark、MLflow、Delta Lake等开源项目之上,但提供了大量企业级的商业化能力(Unity Catalog、DLT、Model Serving等),并以按需计费(DBU)的方式收费。
1.2 Lakehouse架构的诞生逻辑
要真正理解Databricks,你必须理解Lakehouse架构为什么出现。传统架构有两个极端:
**数据湖(Data Lake)**将海量原始数据低成本存储在S3/ADLS上,但它没有ACID事务、没有Schema enforcement、没有索引,导致"数据沼泽"问题——数据堆进去了,但查询慢、质量差、难以治理。
**数据仓库(Data Warehouse)**如Snowflake/Redshift,有强Schema、高性能SQL查询,但存储成本高昂(专有格式),且不适合存储非结构化数据(图像、文本),也不适合直接跑ML训练(数据要先导出到S3再训练)。
Lakehouse的设计思想是:在廉价的对象存储(S3)上,通过一个开放的表格式(Delta Lake/Iceberg/Hudi)来提供Data Warehouse级别的功能——ACID事务、Schema管理、高性能查询,同时保持Data Lake的低成本和对ML工作负载的直接支持。
传统架构(双栈,数据需要复制):
┌─────────────┐
│ Data Lake │ ← ML训练
│ (S3/ADLS) │
└──────┬──────┘
│ ETL复制数据
┌──────▼──────┐
│ Data │ ← SQL分析
│ Warehouse │
└─────────────┘
Lakehouse架构(统一存储):
┌─────────────────────────────┐
│ 对象存储 (S3/ADLS/GCS) │
│ ┌─────────────────────┐ │
│ │ Delta Lake │ │
│ │ (开放表格式 + ACID) │ │
│ └─────────────────────┘ │
└─────────────┬───────────────┘
│ 同一份数据
┌───────────────────────┼────────────────────┐
│ │ │
┌──────▼──────┐ ┌─────────▼──────┐ ┌───────▼──────┐
│ Data Eng. │ │ SQL Analytics │ │ ML/AI │
│ (Spark/DLT) │ │ (SQL Warehouse)│ │ (MLflow) │
└─────────────┘ └────────────────┘ └──────────────┘
1.3 Databricks的核心抽象层次
在深入各个组件之前,你需要先建立这个层次模型:
┌────────────────────────────────────────────────────────┐
│ 应用层 Applications │
│ Notebooks · SQL Editor · ML Experiments · Dashboards │
├────────────────────────────────────────────────────────┤
│ 服务层 Services │
│ DLT · Workflows · Model Serving · AutoML · FS │
├────────────────────────────────────────────────────────┤
│ 计算层 Compute │
│ All-Purpose Clusters · Job Clusters · SQL Warehouse │
│ Databricks Runtime (DBR) · Photon │
├────────────────────────────────────────────────────────┤
│ 存储与治理层 Storage & Governance │
│ Delta Lake · Unity Catalog · Delta Sharing │
├────────────────────────────────────────────────────────┤
│ 基础设施层 Infrastructure │
│ Cloud Object Storage (S3/ADLS/GCS) │
│ Cloud VMs (EC2/Azure VM/GCE) ← Databricks管理 │
└────────────────────────────────────────────────────────┘
这个层次模型非常关键:下层为上层服务,每一层都可以独立扩展。这正是Platform Engineer要理解和设计的内容。
2. Workspace:你的工作台
2.1 Workspace的本质
Databricks Workspace是你与平台交互的统一入口,本质上是一个托管在云上的Web应用,但它管理着背后极其复杂的云资源。你可以把Workspace理解为"操作系统的桌面"——所有的工作都从这里开始。
每个Workspace对应一个独立的Databricks实例,有唯一的URL(如https://adb-1234567890.1.azuredatabricks.net)。在企业里,通常会有多个Workspace对应不同环境(开发/测试/生产)。
2.2 Workspace的主要区域
Workspace浏览器(左侧边栏) 是组织你所有资产的地方。它的目录结构分为两类:用户个人目录(/Users/your_email/)和共享目录(/Shared/)。在Unity Catalog启用之前,Workspace浏览器也是管理旧版Hive Metastore数据库的地方。
Catalog(数据目录) 是访问Unity Catalog的主入口,你可以在这里浏览所有的数据表、视图、卷(Volumes)、模型,并查看其Schema和样本数据。这是现代Databricks工作流的核心。
Compute(计算管理) 页面让你管理所有的Clusters和SQL Warehouses,包括创建、配置、启动和终止。
Workflows 页面是Job的管理中心,你可以创建、监控、调试所有的数据管道和自动化任务。
SQL(SQL Editor) 是面向分析师的界面,提供SQL编辑器、查询历史、结果可视化和Dashboard功能。
Machine Learning 是ML工作流的入口,整合了MLflow Experiments、Feature Store、Model Registry的访问。
2.3 Workspace的访问控制
Workspace有两种顶层角色:Admin(管理整个Workspace的配置、用户、计算资源)和User(使用资源进行开发)。在Unity Catalog启用后,数据对象的权限管理从Workspace级别下沉到了Catalog级别,形成了更细粒度的控制。
3. Clusters:计算引擎的核心
3.1 Cluster是什么
Cluster(集群)是Databricks中运行代码的计算单元。它本质上是一组云虚拟机(Driver节点 + 若干Worker节点),运行着Apache Spark、Databricks Runtime等软件。你的每一行Python/Scala/SQL代码,最终都在某个Cluster上执行。
这里有一个关键认知:Cluster是按时间计费的资源(DBU/小时 × 运行时间)。理解何时启动Cluster、如何配置Cluster、何时可以共享Cluster,是控制Databricks成本的关键能力,也是Platform Engineer的核心职责之一。
3.2 三种Cluster类型详解
All-Purpose Cluster(通用集群) 是交互式开发用的集群。你在Notebook里写代码时,就是附加到一个All-Purpose Cluster上的。它的特点是:可以被多个用户和多个Notebook同时共享,可以长时间运行(但会产生持续费用),支持自动终止(Autotermination)来节省成本。由于是共享资源,All-Purpose Cluster上任意用户的代码都可能相互影响。
# 在Notebook中,你通过附加(Attach)到一个Cluster来运行代码
# 集群选择在Notebook右上角的下拉菜单中
# 检查当前Spark版本和集群信息
print(spark.version)
print(sc.getConf().getAll())
Job Cluster(作业集群) 是为自动化任务(Job/Workflow)专门启动的集群。它在Job开始时自动创建,Job结束时自动销毁。由于生命周期与任务绑定,Job Cluster有几个重要优势:隔离性好(不会因其他任务影响性能)、成本精确(只在任务运行时产生费用)、配置可以为特定任务精确调优。在生产环境中,几乎所有数据管道都应使用Job Cluster而不是All-Purpose Cluster。
SQL Warehouse(SQL仓库) 是专门为SQL查询优化的计算资源,内部运行Photon引擎(C++写的向量化执行引擎,比Java版Spark快2-12倍)。SQL Warehouse有Classic和Serverless两种模式:Classic模式在你的云账号VPC中启动VM;Serverless模式在Databricks管理的基础设施上运行,启动时间从分钟级降到秒级。
3.3 Cluster配置详解
一个完整的Cluster配置包含以下关键参数:
Cluster Mode(集群模式) 决定了资源如何在多用户间分配。Single Node模式只有Driver没有Worker,适合小数据集的探索和ML模型开发(省钱);Standard模式是最常见的多节点集群;High Concurrency模式(旧版)为多用户并发做了优化,现在已被Shared Access Mode取代。
Access Mode(访问模式) 是Unity Catalog引入后的新概念,控制集群的安全隔离级别。Single User模式只允许一个用户使用,支持所有语言(Python/Scala/R/SQL),并且支持Unity Catalog的所有功能;Shared模式允许多用户共享但通过进程隔离保证安全,支持Python和SQL,支持Unity Catalog;No Isolation Shared模式(遗留)没有用户隔离,不推荐在生产中使用。
Node Type(节点类型) 就是选择底层的云VM规格。r系列(内存优化型)适合大型DataFrame操作和复杂Join;c系列(计算优化型)适合CPU密集型的ML训练;g系列(GPU型)适合深度学习。Driver节点通常选择内存较大的机型,因为它需要协调所有Worker并收集结果。
Autoscaling(自动伸缩) 允许集群根据负载在Min Workers和Max Workers之间自动调整Worker数量。开启Autoscaling可以显著节省成本,但对于流式处理(Structured Streaming)等场景,固定Worker数量可能表现更稳定。
// 典型的Job Cluster配置(通过API或DAB定义)
{
"cluster_name": "etl-production-cluster",
"spark_version": "14.3.x-scala2.12",
"node_type_id": "i3.2xlarge",
"driver_node_type_id": "i3.2xlarge",
"num_workers": 4,
"autoscale": {
"min_workers": 2,
"max_workers": 8
},
"data_security_mode": "SINGLE_USER",
"spark_conf": {
"spark.sql.shuffle.partitions": "200",
"spark.databricks.delta.optimizeWrite.enabled": "true"
},
"init_scripts": [
{
"workspace": {
"destination": "/init_scripts/install_dependencies.sh"
}
}
]
}
Init Scripts(初始化脚本) 在Cluster启动时每个节点上执行,用于安装系统级依赖、配置环境变量、安装自定义库等。有三种类型:Workspace Path(存在Workspace中)、DBFS Path(存在DBFS中)和Volume Path(存在Unity Catalog Volume中,推荐)。Init Script的执行在DBR加载之后、用户代码运行之前,所以可以放心安装Python包。
Cluster Libraries(集群库) 是安装在整个Cluster上、所有用户都可以使用的Python/Maven/CRAN包。可以通过UI或API指定,也可以在Notebook中用%pip install临时安装(但这个安装只在当前Notebook Session有效)。
3.4 Cluster Events与健康监控
Cluster页面的Events标签页是诊断问题的重要工具。它记录了集群生命周期中的所有事件:STARTING → RUNNING → RESIZING(自动伸缩触发)→ TERMINATING。当集群意外终止或Job失败时,Events日志是你第一个要看的地方。
Spark UI是另一个关键的调试工具,在Cluster详情页可以直接打开。它展示了所有Stage的执行情况、每个Task的耗时、数据倾斜(Skew)的迹象。在生产环境中,能熟练使用Spark UI诊断性能问题,是区分初级工程师和高级工程师的重要标志。
4. Databricks Runtime(DBR)
4.1 DBR的本质
Databricks Runtime(DBR)是运行在每个Cluster节点上的软件栈。你可以把它理解为一个预配置好的"大数据操作系统"——它打包了Apache Spark、Delta Lake、Python、Scala、R、各种优化后的库,以及Databricks自研的优化(如Photon)。
当你创建Cluster时选择的"Databricks Runtime Version"(如14.3 LTS),决定了这个Cluster上运行的软件版本。这个选择影响极大:不同DBR版本对应不同的Spark版本、Python版本、Delta Lake版本,以及不同的性能优化和功能支持。
4.2 DBR的主要变体
Databricks Runtime (Standard) 是最基础的运行时,包含Spark、Delta Lake、Python、Scala的核心依赖。适用于大多数数据工程任务。
Databricks Runtime ML 在Standard基础上,预安装了机器学习生态系统:TensorFlow、PyTorch、Scikit-learn、XGBoost、HuggingFace Transformers、MLflow等。如果你要做ML训练,直接选这个版本,省去了手动安装的麻烦和版本冲突的烦恼。
Databricks Runtime with Photon 内置了Photon引擎——Databricks用C++重写的向量化查询引擎,专为SQL查询和DataFrame操作优化。Photon对Spark API完全兼容(你不需要修改任何代码),但在列式扫描、聚合、Join操作上有2-12倍的性能提升。SQL Warehouse默认使用Photon。
Databricks Runtime for Genomics 专为基因组学工作负载优化,内置了Glow等专业库。这是一个较小众的变体,但展示了Databricks针对垂直领域的定制化能力。
4.3 LTS版本策略
Databricks标记为LTS(Long-Term Support)的版本,提供两年以上的维护支持。在生产环境中,永远使用LTS版本,避免使用非LTS版本(它们的维护周期短,升级频繁)。例如13.3 LTS和14.3 LTS都是稳定的生产选择。
版本升级是一个需要谨慎管理的过程:新版本可能带来Spark行为变化(如默认的spark.sql.ansi.enabled从false变true),需要在测试环境充分验证后再推向生产。
5. Apache Spark on Databricks
5.1 Spark的架构回顾
Apache Spark是Databricks的计算核心。在这里不做基础介绍,而是聚焦Databricks环境中Spark的特殊行为和最佳实践。
Spark采用Master-Worker架构:Driver Program是主程序,负责解析你写的代码、构建执行计划(DAG)、将任务分发给各个Worker;Executor(Worker节点上的进程)负责实际执行任务、读写数据。在Databricks中,Cluster的Driver节点运行Driver Program,Worker节点运行Executors。
# SparkSession在Databricks中已经预创建,直接使用spark变量
# 不需要也不应该自己创建SparkSession
# 读取Delta表(Databricks最常见的操作)
df = spark.table("catalog_name.schema_name.table_name")
# 或者读取文件
df = spark.read.format("delta").load("/path/to/delta/table")
# 基本的DataFrame操作
from pyspark.sql import functions as F
from pyspark.sql.window import Window
result = (df
.filter(F.col("event_date") >= "2024-01-01")
.groupBy("user_id", "event_date")
.agg(
F.count("event_id").alias("event_count"),
F.sum("revenue").alias("total_revenue")
)
.withColumn("revenue_rank",
F.rank().over(Window.partitionBy("event_date")
.orderBy(F.desc("total_revenue")))
)
)
result.write.mode("overwrite").saveAsTable("my_schema.daily_user_summary")
5.2 Spark的懒执行(Lazy Evaluation)
Spark最重要的概念之一是懒执行:Transformations(转换操作)不会立即执行,只有Actions(动作操作)才会触发实际计算。
Transformations包括:filter()、select()、groupBy()、join()、withColumn()等。这些操作只是在构建执行计划(Logical Plan),不触发任何计算。
Actions包括:show()、count()、collect()、write()、save()等。每个Action触发一次Job,Spark编译之前积累的所有Transformations为一个优化后的物理执行计划,然后分发到集群执行。
这个设计的好处是:Spark的Catalyst优化器可以看到完整的计划,进行谓词下推(Predicate Pushdown)、列剪裁(Column Pruning)、Join重排等优化,比逐步执行效率高得多。
# 这三行Transformation都不执行任何计算
filtered_df = df.filter(F.col("status") == "active") # 懒执行
projected_df = filtered_df.select("user_id", "created_at", "revenue") # 懒执行
grouped_df = projected_df.groupBy("user_id").agg(F.sum("revenue")) # 懒执行
# 直到这个Action,Spark才开始执行上面所有操作的优化后计划
grouped_df.show() # 触发执行!
# 使用explain()查看执行计划(不触发执行)
grouped_df.explain(extended=True) # 显示从Parsed到Physical Plan的完整计划
5.3 Shuffle:性能的核心瓶颈
当Spark执行需要跨节点重新分配数据的操作时(如groupBy、join、orderBy),就会发生Shuffle——数据从每个Executor写到磁盘,然后通过网络传输到目标Executor。Shuffle是分布式计算中最昂贵的操作,是性能调优的核心关注点。
spark.sql.shuffle.partitions控制Shuffle后的分区数,默认值是200。这个值需要根据数据量调整:对于小数据集(<100GB),200可能导致每个分区太小("小文件问题");对于大数据集(>10TB),200个分区可能导致每个分区太大(处理慢)。
在Databricks中,可以使用Adaptive Query Execution(AQE,Spark 3.x默认开启)让Spark在运行时动态调整分区数,大大减少了手动调优的需要。
# 查看/设置shuffle分区数
print(spark.conf.get("spark.sql.shuffle.partitions"))
spark.conf.set("spark.sql.shuffle.partitions", "100")
# AQE在Databricks中默认启用(Spark 3.0+)
# 它会自动处理:
# 1. 动态合并小的Shuffle分区
# 2. 自动处理数据倾斜(Skew Join)
# 3. 动态切换Join策略
# 检查AQE是否启用
print(spark.conf.get("spark.sql.adaptive.enabled")) # 应该是 true
5.4 广播Join(Broadcast Join)
当一个大表和一个小表Join时,可以使用广播Join将小表复制到每个Executor,避免大规模Shuffle。Spark会在小表小于spark.sql.autoBroadcastJoinThreshold(默认10MB)时自动使用广播Join,也可以手动Hint。
from pyspark.sql.functions import broadcast
# 手动广播小表(比如维度表)
result = large_fact_df.join(
broadcast(small_dim_df), # 强制广播
on="product_id",
how="left"
)
5.5 Partitioning与数据组织
DataFrame的分区(Partitions)决定了并行度——每个Partition由一个Task处理。理想状态下,每个Partition大小应在100MB-200MB之间,这样既保证了并行度,又不会产生过多的调度开销。
repartition(n)会触发Full Shuffle,重新均匀分配数据到n个分区;coalesce(n)只做本地合并(不Shuffle),只能减少分区数,但更高效。
# 检查当前分区数
print(df.rdd.getNumPartitions())
# 增加分区(触发Shuffle)
df_repartitioned = df.repartition(100)
# 按列分区(用于优化特定的Filter或Join)
df_repartitioned = df.repartition(100, F.col("date"))
# 减少分区(不触发Shuffle,更高效)
df_coalesced = df.coalesce(10)
# 在写入文件时,分区数决定了输出文件数量
# Delta Lake的OPTIMIZE命令可以事后整合小文件
6. Delta Lake:Lakehouse的地基
6.1 Delta Lake解决的根本问题
Delta Lake是Databricks最核心的创新之一(后来开源)。它本质上是一个在对象存储上实现ACID事务的开放表格式,由以下三个核心部分组成:
数据文件(Parquet格式)存储在对象存储上,这部分没有魔法——就是普通的Parquet文件。
事务日志(Transaction Log)是Delta Lake的灵魂,存储在数据目录下的_delta_log/子目录中。每次对表的操作(写入、更新、删除、Schema变更)都会在事务日志中追加一条JSON记录,这条记录描述了"发生了什么"(添加了哪些文件、删除了哪些文件)。
Checkpoints是事务日志的快照,每10个事务会创建一个Parquet格式的Checkpoint,用于加速元数据读取(不需要回放所有历史日志)。
Delta Table目录结构:
my_table/
├── _delta_log/
│ ├── 00000000000000000000.json ← 事务0(建表)
│ ├── 00000000000000000001.json ← 事务1(第一次写入)
│ ├── 00000000000000000002.json ← 事务2(第二次写入)
│ ├── ...
│ ├── 00000000000000000010.checkpoint.parquet ← 第10次事务的快照
│ └── _last_checkpoint ← 指向最新checkpoint
├── part-00000-xxx.snappy.parquet ← 实际数据文件
├── part-00001-xxx.snappy.parquet
└── ...
6.2 ACID事务的实现
Delta Lake通过**乐观并发控制(Optimistic Concurrency Control)**实现ACID事务。当两个写入操作同时进行时,Delta Lake使用条件写入(Conditional Write)到对象存储来解决冲突:先完成的操作成功写入事务日志,后完成的操作检测到冲突,会重新读取最新状态、重试执行,若无法自动解决则抛出ConcurrentModificationException。
这个设计的聪明之处在于:对象存储本身不支持事务,但通过这种日志+条件写入的机制,Delta Lake在它之上构建了事务语义。
# ACID的原子性:要么全部成功,要么全部失败
# 即使写入过程中集群崩溃,Delta Lake也能保证数据一致性
# 原子性写入
df.write.format("delta").mode("overwrite").saveAsTable("my_table")
# 多步骤操作的事务性(使用DeltaTable API)
from delta.tables import DeltaTable
delta_table = DeltaTable.forName(spark, "catalog.schema.my_table")
# MERGE操作:原子地执行插入/更新/删除
delta_table.alias("target").merge(
source_df.alias("source"),
"target.id = source.id"
).whenMatchedUpdateAll(
).whenNotMatchedInsertAll(
).execute()
6.3 Time Travel(时间旅行)
Time Travel是Delta Lake最受用户喜爱的功能之一。由于所有历史版本的数据文件都被保留(直到VACUUM操作清理),你可以查询任何历史版本的数据。
# 按版本号查询
df_v1 = spark.read.format("delta").option("versionAsOf", 5).load("/path/to/table")
# 按时间戳查询
df_yesterday = spark.read.format("delta") \
.option("timestampAsOf", "2024-01-15 12:00:00") \
.load("/path/to/table")
# 使用SQL
spark.sql("""
SELECT * FROM my_table VERSION AS OF 5
""")
spark.sql("""
SELECT * FROM my_table TIMESTAMP AS OF '2024-01-15 12:00:00'
""")
# 查看表的历史记录
delta_table.history().show()
# 输出包含:version, timestamp, operation, operationParameters, etc.
# 回滚到历史版本(使用RESTORE)
spark.sql("RESTORE TABLE my_table TO VERSION AS OF 3")
Time Travel的用途包括:数据审计(谁在什么时候改了什么)、事故恢复(误删数据后回滚)、A/B测试的可重现性(使用相同的历史数据重新跑模型训练)。
注意:Time Travel依赖于数据文件没有被VACUUM清理。VACUUM默认保留7天的历史,之后旧文件会被删除,就无法时间旅行到更早的版本了。
6.4 Schema Evolution与Schema Enforcement
Delta Lake默认开启Schema Enforcement:当你试图写入Schema不匹配的数据(如多了一列、少了一列、列类型不兼容),写入操作会失败并抛出AnalysisException。这比传统数据湖的"写什么都行"行为要安全得多,防止了"坏数据悄悄写进去"的问题。
当你确实需要添加新列或做其他Schema变化时,可以启用Schema Evolution:
# 方式1:合并Schema(自动添加新列)
df_with_new_column.write.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save("/path/to/table")
# 方式2:覆盖Schema(完全替换)
df_new_schema.write.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.save("/path/to/table")
# 通过ALTER TABLE添加列(DDL方式)
spark.sql("ALTER TABLE my_table ADD COLUMNS (new_col STRING COMMENT 'New field')")
# 修改列(有限制:可以放宽类型,不能缩窄)
spark.sql("ALTER TABLE my_table ALTER COLUMN amount TYPE DOUBLE")
6.5 Change Data Feed(CDF)
Change Data Feed(也叫Change Data Capture for Delta)允许你高效地追踪Delta表的变更,而不需要全量扫描。开启CDF后,每次对表的DML操作(INSERT/UPDATE/DELETE/MERGE)都会记录"变更行",包含一个特殊的_change_type列(insert/update_preimage/update_postimage/delete)。
# 在创建表时启用CDF
spark.sql("""
CREATE TABLE my_cdc_table (
id BIGINT,
value STRING,
updated_at TIMESTAMP
)
TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')
""")
# 也可以在已有表上启用
spark.sql("ALTER TABLE my_table SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')")
# 读取变更数据(增量处理的核心)
changes = (spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 5) # 从版本5开始读
# .option("startingTimestamp", "2024-01-01") # 或从时间戳开始
.table("my_cdc_table"))
changes.show()
# 输出:id | value | updated_at | _change_type | _commit_version | _commit_timestamp
CDF在构建增量ETL管道时极其有价值——下游表可以只处理"上游表的变化部分",而不需要每次重新扫描全量数据。
6.6 OPTIMIZE与Z-Order
随着数据不断写入,Delta表会积累大量小文件(每次写操作都可能产生新的Parquet文件)。小文件问题会导致查询时需要打开大量文件,显著降低性能。
OPTIMIZE命令将小文件合并成更大的文件(默认目标文件大小1GB),这个过程不影响数据内容,只是重新整合了物理存储。
-- 基础OPTIMIZE(合并小文件)
OPTIMIZE catalog.schema.my_table;
-- OPTIMIZE + Z-Order(合并文件同时按列排序,提升过滤性能)
OPTIMIZE catalog.schema.my_table ZORDER BY (user_id, event_date);
Z-Order是一种多维数据排序技术,它把多个列的值"交织"排列,使得在这些列上过滤时,可以跳过更多的不相关数据文件(Data Skipping)。规则是:将最常用于过滤(WHERE子句)的列放入Z-Order,通常是时间列和高基数的业务ID列。
Liquid Clustering是Databricks在2023年推出的Z-Order替代方案,它使用增量聚类(不需要重写所有文件),并且可以随时更改聚类列而不需要重建表,是新表的推荐方式。
-- 创建Liquid Clustering表
CREATE TABLE my_table (
id BIGINT,
user_id STRING,
event_date DATE,
value DOUBLE
)
CLUSTER BY (user_id, event_date); -- 指定聚类列
-- 对已有表应用Liquid Clustering
ALTER TABLE existing_table CLUSTER BY (user_id, event_date);
-- 手动触发聚类(也可以让OPTIMIZE自动处理)
OPTIMIZE my_table;
6.7 VACUUM
VACUUM命令清理不再被任何Delta表版本引用的旧数据文件。默认保留7天(168小时)的历史数据,这对应了默认的Time Travel窗口。
-- 查看会被清理的文件(DRY RUN不实际删除)
VACUUM catalog.schema.my_table DRY RUN;
-- 执行清理(保留默认7天的历史)
VACUUM catalog.schema.my_table;
-- 自定义保留时间(最少1小时,低于1小时需要关闭安全检查)
VACUUM catalog.schema.my_table RETAIN 72 HOURS;
重要:VACUUM操作是不可逆的。执行前,确保没有长时间运行的读取操作(它们依赖旧文件),也确保理解缩短保留时间后Time Travel的限制。
6.8 Delta Lake Table Properties
通过TBLPROPERTIES可以精细控制Delta表的行为:
ALTER TABLE my_table SET TBLPROPERTIES (
-- 启用增量优化写(自动合并小文件到适当大小)
'delta.autoOptimize.optimizeWrite' = 'true',
-- 启用自动压缩
'delta.autoOptimize.autoCompact' = 'true',
-- 设置数据保留时间
'delta.deletedFileRetentionDuration' = 'interval 30 days',
-- 启用Change Data Feed
'delta.enableChangeDataFeed' = 'true',
-- 设置目标文件大小(Liquid Clustering中控制文件大小)
'delta.targetFileSize' = '134217728' -- 128MB
);
7. Unity Catalog:数据治理的全局大脑
7.1 为什么需要Unity Catalog
在Unity Catalog出现之前,Databricks的数据治理是混乱的:每个Workspace有自己独立的Hive Metastore,跨Workspace共享数据困难;权限管理分散在Workspace级别,缺乏统一视图;没有列级权限控制;数据Lineage不完整。
Unity Catalog(UC)解决了所有这些问题,它是一个跨Workspace、跨云的统一数据治理层,提供:
- 三层命名空间(Catalog → Schema → Table)
- 细粒度的权限控制(表级、列级、行级)
- 自动化数据Lineage追踪(列级Lineage)
- 统一的数据审计日志
- 安全的数据共享(Delta Sharing)
7.2 三层命名空间
Unity Catalog引入了三层命名空间,相比旧的两层(Database.Table)多了一层Catalog:
Metastore(最顶层,一个Region一个)
├── Catalog(相当于一个数据库实例)
│ ├── Schema(相当于数据库中的schema/namespace)
│ │ ├── Table(Delta表、外部表、视图)
│ │ ├── View
│ │ ├── Volume(存储非表格数据的文件)
│ │ ├── Function(UDF)
│ │ └── Model(MLflow模型)
│ └── ...
└── ...
完整的三段式引用:catalog_name.schema_name.table_name
例如:main.finance.transactions
这个设计让不同的团队/环境/项目可以有独立的Catalog,而共享同一个Metastore(和同一套权限管理框架)。典型的Catalog划分方式:dev、staging、prod(环境隔离),或finance、marketing、operations(业务域隔离)。
7.3 权限模型(GRANT/REVOKE)
UC使用标准SQL的GRANT/REVOKE语法管理权限,支持层次继承:在Catalog上授予的权限会自动传递给其下的所有Schema和Table。
-- 创建一个服务账号principal
-- (通常由Admin通过UI/API完成)
-- 给用户授予Catalog的使用权
GRANT USE CATALOG ON CATALOG main TO `user@company.com`;
-- 给组授予Schema的使用权
GRANT USE SCHEMA ON SCHEMA main.finance TO `group:analysts`;
-- 给服务账号授予表的SELECT权限
GRANT SELECT ON TABLE main.finance.transactions TO `service-account@project.iam.gserviceaccount.com`;
-- 授予表的写权限
GRANT MODIFY ON TABLE main.finance.staging_data TO `etl-service-principal`;
-- 授予创建表的权限(给数据工程师)
GRANT CREATE TABLE ON SCHEMA main.finance TO `group:data-engineers`;
-- 列级权限:屏蔽特定列
-- (通过Row Filter和Column Mask实现)
CREATE FUNCTION mask_email(email STRING)
RETURNS STRING
RETURN CASE
WHEN is_account_group_member('pii-access') THEN email
ELSE REGEXP_REPLACE(email, '(.)(.*)(@.*)', '$1***$3')
END;
ALTER TABLE main.finance.users ALTER COLUMN email SET MASK mask_email;
-- 查看当前权限
SHOW GRANTS ON TABLE main.finance.transactions;
7.4 Volumes:非表格数据的统一管理
Volumes是Unity Catalog中管理非结构化和半结构化数据(文件、图像、脚本等)的对象,它让文件存储也纳入了UC的权限体系。
有两种类型:Managed Volume(数据文件存储在Databricks管理的存储位置,由UC完全控制)和External Volume(指向你自己云存储中的一个路径,UC管理访问权限但不管理数据本身)。
-- 创建Managed Volume
CREATE VOLUME main.raw_data.uploads;
-- 创建External Volume(指向你的S3/ADLS路径)
CREATE EXTERNAL VOLUME main.raw_data.external_files
LOCATION 's3://my-bucket/raw-files/';
-- 通过Volume路径访问文件(统一通过UC路径访问)
-- /Volumes/catalog_name/schema_name/volume_name/file.csv
LIST '/Volumes/main/raw_data/uploads/';
-- 在代码中读取Volume中的文件
df = spark.read.csv("/Volumes/main/raw_data/uploads/data.csv", header=True)
7.5 Data Lineage(数据血缘)
Unity Catalog自动追踪数据从源表到目标表的流转路径,包括列级Lineage。当你执行CREATE TABLE AS SELECT、INSERT INTO ... SELECT、或通过Spark读写表时,UC会自动记录这些操作中的列与列之间的关系。
在Catalog的Table详情页,你可以看到"Lineage"标签,展示:
- Upstream:这张表的数据来自哪些表的哪些列
- Downstream:这张表的数据被哪些表/视图/Dashboard消费
这对数据影响分析(Impact Analysis)极其有价值:当你要修改一张表的Schema时,可以立即看到下游哪些表、哪些报表会受到影响,从而制定安全的迁移策略。
7.6 Delta Sharing:安全的跨组织数据共享
Delta Sharing是一个开放协议,允许你在不移动数据、不复制数据的前提下,安全地将Delta表共享给外部合作方(其他公司、其他云平台、其他工具)。接收方不需要使用Databricks,只要支持Delta Sharing协议就可以读取数据。
-- 创建一个Share
CREATE SHARE my_partner_share;
-- 向Share中添加表
ALTER SHARE my_partner_share ADD TABLE main.public.monthly_summary;
-- 创建Recipient(合作方)
CREATE RECIPIENT partner_company;
-- 将Share授予Recipient
GRANT SELECT ON SHARE my_partner_share TO RECIPIENT partner_company;
-- 获取激活链接(发给合作方)
DESC RECIPIENT partner_company;
8. Delta Live Tables(DLT):声明式数据管道
8.1 DLT的设计哲学
Delta Live Tables(DLT)是Databricks对"声明式数据管道"的实现。传统的ETL管道需要你命令式地指定:先读这个表,然后做这个转换,再写到那个表,如果失败了怎么办,依赖顺序怎么排。DLT让你只需声明"这张表应该是什么",由框架自动推断执行顺序、管理依赖、处理错误、自动优化。
DLT的核心思想与React的声明式UI有相似之处:你不描述"如何"更新UI,你描述"UI应该是什么样",框架负责diff和更新。DLT里,你不描述"如何"生成数据,你描述"每张表的定义",DLT负责执行计划、依赖管理和增量更新。
8.2 DLT的三种表类型
Streaming Table(流式表)是为增量数据处理设计的,使用APPEND ONLY语义——每次运行只处理新数据(基于Structured Streaming的微批或连续处理)。适用于从Kafka、Event Hubs、Kinesis摄入数据,或处理每天新增的日志文件。
Materialized View(物化视图)在每次更新时完全重新计算结果,适用于需要聚合历史数据的场景(如每日汇总表),DLT会自动判断是否需要全量刷新。
View是临时的、不物化存储的视图,只在管道内部用于逻辑封装,不会创建实际的Delta表。
8.3 DLT Pipeline的编写
# DLT管道代码(在Notebook中编写,由DLT框架执行)
import dlt
from pyspark.sql import functions as F
# Bronze层:原始数据摄入(Streaming Table)
@dlt.table(
name="raw_events",
comment="原始事件数据,从Kafka摄入",
table_properties={
"quality": "bronze",
"pipelines.reset.allowed": "false" # 防止意外重置
}
)
def raw_events():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9092")
.option("subscribe", "events_topic")
.option("startingOffsets", "latest")
.load()
.select(
F.col("timestamp").alias("kafka_timestamp"),
F.from_json(F.col("value").cast("string"), events_schema).alias("data")
)
.select("kafka_timestamp", "data.*")
)
# Silver层:清洗和标准化(Streaming Table,从Bronze增量读取)
@dlt.table(
name="cleaned_events",
comment="清洗后的事件数据"
)
@dlt.expect("valid_user_id", "user_id IS NOT NULL")
@dlt.expect_or_drop("valid_amount", "amount > 0")
@dlt.expect_or_fail("valid_event_type", "event_type IN ('click', 'view', 'purchase')")
def cleaned_events():
return (
dlt.read_stream("raw_events") # 增量读取Bronze表
.withColumn("event_date", F.to_date("event_timestamp"))
.withColumn("amount_usd",
F.when(F.col("currency") == "EUR", F.col("amount") * 1.1)
.otherwise(F.col("amount")))
.dropDuplicates(["event_id"]) # 去重
)
# Gold层:业务指标聚合(Materialized View,每次全量计算)
@dlt.table(
name="daily_revenue",
comment="每日收入汇总"
)
def daily_revenue():
return (
dlt.read("cleaned_events") # 读取整张表(非流式)
.filter(F.col("event_type") == "purchase")
.groupBy("event_date", "product_category")
.agg(
F.sum("amount_usd").alias("total_revenue"),
F.count("event_id").alias("transaction_count"),
F.countDistinct("user_id").alias("unique_buyers")
)
)
8.4 DLT的数据质量:Expectations
DLT的Expectations(期望)是声明式的数据质量规则,相当于内置的数据断言机制:
@dlt.expect(name, constraint) — 记录违规行但继续处理(软约束,用于监控)。
@dlt.expect_or_drop(name, constraint) — 删除违规行,其余数据继续流入目标表。
@dlt.expect_or_fail(name, constraint) — 任何违规都导致管道失败(硬约束,用于Critical数据质量要求)。
@dlt.expect_all(expectations_dict) — 一次定义多个期望。
所有Expectations的违规情况都会被自动记录,在DLT Pipeline的Event Log中可以查询,也可以通过event_log()函数进行分析。这就是"内置的数据质量监控",无需额外工具。
8.5 DLT的两种模式
Triggered模式:管道按计划触发,处理完新数据后停止集群。适合批处理场景(每天或每小时运行一次)。
Continuous模式:管道持续运行,像Structured Streaming一样实时处理新数据。适合低延迟的实时场景(分钟级或秒级延迟)。集群持续运行,费用较高。
9. Databricks Workflows:编排与调度
9.1 Workflow的本质
Databricks Workflows(以前叫Jobs)是平台内置的工作流编排系统,用于自动化执行数据管道、ML训练、数据处理等任务。它是Databricks生态中对Airflow、Prefect等外部编排器的原生替代。
一个Workflow由一个或多个Tasks组成,Tasks之间可以定义依赖关系,形成DAG(有向无环图)。支持多种Task类型,能够将不同技术栈的任务组织在一个Workflow中。
9.2 Task类型
Notebook Task:运行一个Databricks Notebook,是最常见的Task类型。可以传递参数(通过dbutils.widgets读取),可以使用dbutils.jobs.taskValues.set()向后续Task传递值。
Python Script Task:运行一个Python脚本文件(从Repo或DBFS读取),比Notebook更适合生产化的代码(可以用pytest测试、版本控制更友好)。
Spark Submit Task:使用spark-submit风格运行Spark作业(Jar或Python),适合已有的Spark应用迁移到Databricks。
Delta Live Tables Task:触发一个DLT Pipeline的运行,将DLT管道集成到更大的Workflow中。
dbt Task:原生集成dbt(Data Build Tool),运行dbt models,使得Databricks + dbt的组合工作流非常流畅。
SQL Task:运行SQL查询(在SQL Warehouse上),适合触发数据刷新或执行DDL操作。
Condition Task 和 For Each Task:控制流Task,实现条件分支和循环逻辑,使Workflow能处理动态的执行路径。
Run Job Task:将另一个Workflow作为子任务调用,实现Workflow的模块化和复用。
9.3 完整Workflow配置示例
# Workflow配置(通过Databricks Asset Bundles的YAML定义)
resources:
jobs:
daily_etl_pipeline:
name: "Daily ETL Pipeline"
schedule:
quartz_cron_expression: "0 0 6 * * ?" # 每天UTC 06:00执行
timezone_id: "Europe/Berlin"
# 邮件通知配置
email_notifications:
on_failure:
- "data-team@company.com"
on_success:
- "data-team@company.com"
# Job级别的集群配置(所有Task共享)
job_clusters:
- job_cluster_key: "etl_cluster"
new_cluster:
spark_version: "14.3.x-scala2.12"
node_type_id: "i3.2xlarge"
num_workers: 4
spark_conf:
"spark.sql.shuffle.partitions": "200"
tasks:
# Task 1: 数据摄入
- task_key: "ingest_raw_data"
job_cluster_key: "etl_cluster"
python_wheel_task:
package_name: "my_etl_package"
entry_point: "ingest"
parameters: ["--date", "{{job.start_time.iso_date}}"]
libraries:
- whl: "/Volumes/main/packages/my_etl_package-1.0.0-py3-none-any.whl"
# Task 2: 数据转换(依赖Task 1完成)
- task_key: "transform_data"
depends_on:
- task_key: "ingest_raw_data"
job_cluster_key: "etl_cluster"
notebook_task:
notebook_path: "/Repos/main/etl/notebooks/transform"
base_parameters:
date: "{{job.start_time.iso_date}}"
# Task 3: DLT管道(依赖Task 2)
- task_key: "run_dlt_pipeline"
depends_on:
- task_key: "transform_data"
pipeline_task:
pipeline_id: "{{var.dlt_pipeline_id}}"
full_refresh: false
# Task 4: 数据质量检查(依赖Task 3)
- task_key: "data_quality_check"
depends_on:
- task_key: "run_dlt_pipeline"
job_cluster_key: "etl_cluster"
notebook_task:
notebook_path: "/Repos/main/etl/notebooks/quality_check"
# Task 5: 通知(无论成功失败都执行)
- task_key: "send_notification"
depends_on:
- task_key: "data_quality_check"
outcome: "FAILED" # 只在失败时触发
run_if: "AT_LEAST_ONE_FAILED"
notebook_task:
notebook_path: "/Repos/main/etl/notebooks/alert_notification"
9.4 动态值引用(Task Values)
Workflows允许Task之间通过Task Values传递动态数据:
# Task 1: 设置值供后续Task使用
from databricks.sdk.runtime import dbutils
processed_count = 1500000
dbutils.jobs.taskValues.set(key="processed_records", value=processed_count)
dbutils.jobs.taskValues.set(key="output_path", value="/delta/processed/2024-01-15")
# Task 2(Task 1的下游): 读取Task 1设置的值
record_count = dbutils.jobs.taskValues.get(
taskKey="ingest_raw_data",
key="processed_records"
)
output_path = dbutils.jobs.taskValues.get(
taskKey="ingest_raw_data",
key="output_path"
)
print(f"Processing {record_count} records from {output_path}")
10. Databricks SQL:分析师的战场
10.1 SQL Warehouse深度解析
SQL Warehouse是专为SQL分析工作负载优化的计算资源,有别于通用Cluster的几个关键特性:
Photon引擎:SQL Warehouse默认使用Photon(C++向量化执行引擎),对列式扫描、聚合、Hash Join等操作的加速幅度从2倍到12倍不等。Photon对SQL语义完全兼容,不需要修改查询。
自动缩放(Auto-Stop和Scaling):SQL Warehouse在不使用时自动暂停(默认10分钟无查询后),在有查询时自动重启。对于多并发查询,可以配置扩展策略(Scaling Policy),在繁忙时自动增加Cluster数量(注意:一个"SQL Warehouse"可以包含多个底层Cluster)。
Serverless SQL Warehouse:启动时间从2-5分钟(Classic)降到几秒,计费粒度更细(精确到秒级),运行在Databricks管理的基础设施上(不占用你的云账号VPC资源)。这是现代Databricks最推荐的SQL分析计算形式。
-- SQL Warehouse中可以执行的典型操作
-- 查询Delta表
SELECT
date_trunc('month', transaction_date) AS month,
product_category,
SUM(amount) AS total_revenue,
COUNT(DISTINCT customer_id) AS unique_customers
FROM main.finance.transactions
WHERE transaction_date >= '2024-01-01'
GROUP BY ALL -- Databricks SQL支持GROUP BY ALL(自动推断分组列)
ORDER BY month DESC, total_revenue DESC;
-- 使用PIVOT进行数据透视
SELECT *
FROM (
SELECT product_category, month(transaction_date) as month, amount
FROM main.finance.transactions
)
PIVOT (
SUM(amount) FOR month IN (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
);
-- Databricks SQL扩展语法
-- QUALIFY(窗口函数过滤,避免嵌套子查询)
SELECT user_id, event_date, revenue,
ROW_NUMBER() OVER (PARTITION BY event_date ORDER BY revenue DESC) AS rank
FROM events
QUALIFY rank <= 10; -- 直接过滤窗口函数结果
-- ILIKE(大小写不敏感的LIKE)
SELECT * FROM products WHERE name ILIKE '%coffee%';
10.2 Query Editor与可视化
SQL Editor提供了完整的SQL开发体验:语法高亮、自动补全(表名、列名)、查询历史记录。每次查询结果都可以直接生成多种可视化图表(柱状图、折线图、散点图、热力图等),无需Python或额外工具。
重要特性是Query Parameters(查询参数),允许你在SQL中使用{{parameter_name}}占位符,用于创建动态查询和交互式Dashboard:
-- 使用参数化查询(双大括号语法)
SELECT *
FROM main.finance.transactions
WHERE
transaction_date BETWEEN {{start_date}} AND {{end_date}}
AND product_category = '{{category}}'
AND amount >= {{min_amount}};
10.3 Databricks SQL Dashboards
Dashboard是将多个可视化组件(图表、数字、文本)组织在一个页面上,并可以共享给非技术用户的功能。新版的AI/BI Dashboards(Lakeview)在2024年推出,提供了更现代的设计和AI辅助功能(自然语言生成SQL/图表)。
Dashboard可以设置自动刷新频率,并嵌入到外部系统(通过iframe),是面向业务用户的数据展示层。
11. MLflow:ML生命周期管理
11.1 MLflow的四个核心模块
MLflow是Databricks开源并主导的ML生命周期管理平台,由四个核心组件构成,在Databricks中深度集成(无需任何配置即可使用)。
MLflow Tracking解决的问题是:如何记录、比较和重现ML实验。每次训练运行(Run)可以记录:Parameters(超参数,如learning_rate、max_depth)、Metrics(模型评估指标,如accuracy、AUC,支持记录多个Epoch的历史曲线)、Artifacts(模型文件、图表、特征重要性图、任何文件)、Tags(自由标签,如团队、项目、数据版本)。
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, roc_auc_score
import pandas as pd
# 在Databricks中,MLflow自动连接到Workspace的Tracking Server
# 设置实验(如果不设置,默认使用Notebook路径)
mlflow.set_experiment("/Users/bill@company.com/fraud_detection")
# 使用context manager自动管理Run生命周期
with mlflow.start_run(run_name="rf_baseline_v1") as run:
# 记录参数
params = {"n_estimators": 100, "max_depth": 10, "min_samples_split": 5}
mlflow.log_params(params)
# 添加标签
mlflow.set_tags({
"model_type": "random_forest",
"data_version": "2024-01-15",
"team": "fraud-detection"
})
# 训练模型
model = RandomForestClassifier(**params, random_state=42)
model.fit(X_train, y_train)
# 评估并记录Metrics
y_pred = model.predict(X_test)
y_prob = model.predict_proba(X_test)[:, 1]
accuracy = accuracy_score(y_test, y_pred)
auc = roc_auc_score(y_test, y_prob)
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("auc", auc)
# 记录图表(特征重要性)
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=(10, 6))
feature_importances = pd.Series(model.feature_importances_, index=feature_names)
feature_importances.nlargest(20).plot(kind='barh', ax=ax)
mlflow.log_figure(fig, "feature_importance.png")
plt.close()
# 记录模型(最重要的步骤)
# input_example帮助MLflow推断模型的输入Schema
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
input_example=X_train.head(5),
signature=mlflow.models.infer_signature(X_train, y_pred)
)
print(f"Run ID: {run.info.run_id}")
print(f"Accuracy: {accuracy:.4f}, AUC: {auc:.4f}")
11.2 MLflow Model Registry
Model Registry是ML模型的版本控制和部署管理系统。一个模型在Registry中经历三个阶段:Staging(测试中)、Production(生产运行)、Archived(已归档)。
在Unity Catalog启用后,模型也遵循三层命名空间(catalog.schema.model_name),并受UC权限管理。
import mlflow
# 将一个Run的模型注册到Model Registry
run_id = "your_run_id_here"
model_uri = f"runs:/{run_id}/model"
# 注册模型(如果不存在则创建,如果存在则创建新版本)
registered_model = mlflow.register_model(
model_uri=model_uri,
name="main.ml_models.fraud_detector" # UC三层命名
)
print(f"Model Version: {registered_model.version}")
# 使用MlflowClient进行更精细的管理
from mlflow.tracking import MlflowClient
client = MlflowClient()
# 将新版本设置为Champion(Production)
client.set_registered_model_alias(
name="main.ml_models.fraud_detector",
alias="champion",
version=registered_model.version
)
# 通过别名加载生产模型(推荐方式,解耦版本号)
production_model = mlflow.sklearn.load_model(
"models:/main.ml_models.fraud_detector@champion"
)
# 或者按版本加载
specific_version_model = mlflow.sklearn.load_model(
"models:/main.ml_models.fraud_detector/3"
)
11.3 MLflow的pyfunc:模型的通用接口
mlflow.pyfunc是MLflow的通用模型接口,任何模型(sklearn、PyTorch、XGBoost甚至自定义预测逻辑)都可以被包装成pyfunc格式。这个格式是Databricks Model Serving的统一接口。
# 创建自定义pyfunc模型(包含预处理+模型推断+后处理)
class FraudDetectionModel(mlflow.pyfunc.PythonModel):
def load_context(self, context):
"""加载模型和预处理器(在加载时调用一次)"""
import joblib
self.model = joblib.load(context.artifacts["model_path"])
self.preprocessor = joblib.load(context.artifacts["preprocessor_path"])
self.threshold = 0.65 # 决策阈值
def predict(self, context, model_input):
"""预测逻辑"""
import pandas as pd
# 预处理
X = self.preprocessor.transform(model_input)
# 获取概率
proba = self.model.predict_proba(X)[:, 1]
# 返回包含概率和二元预测的DataFrame
return pd.DataFrame({
"fraud_probability": proba,
"is_fraud": (proba >= self.threshold).astype(int),
"risk_level": pd.cut(proba,
bins=[0, 0.3, 0.65, 1.0],
labels=["low", "medium", "high"])
})
# 保存自定义pyfunc模型
with mlflow.start_run():
mlflow.pyfunc.log_model(
artifact_path="fraud_model",
python_model=FraudDetectionModel(),
artifacts={
"model_path": "path/to/model.pkl",
"preprocessor_path": "path/to/preprocessor.pkl"
},
pip_requirements=["scikit-learn==1.3.0", "joblib"]
)
11.4 MLflow Projects
MLflow Projects是ML代码的标准化打包格式,通过MLproject文件定义项目的入口点和依赖:
# MLproject文件
name: fraud_detection_training
conda_env: conda.yaml # 或 python_env.yaml
entry_points:
train:
parameters:
learning_rate: {type: float, default: 0.01}
max_depth: {type: int, default: 6}
n_estimators: {type: int, default: 100}
data_path: {type: str}
command: "python train.py --lr {learning_rate} --max-depth {max_depth} --n-estimators {n_estimators} --data {data_path}"
evaluate:
parameters:
model_uri: {type: str}
test_data: {type: str}
command: "python evaluate.py --model-uri {model_uri} --test-data {test_data}"
12. Databricks AutoML
12.1 AutoML的定位
Databricks AutoML是为数据科学家设计的自动化建模工具,而非替代数据科学家的工具。它的核心价值在于:快速建立性能基准(Baseline),并自动生成可编辑的Notebook代码,作为后续手动调优的起点。
AutoML支持三类任务:分类(Classification)、回归(Regression)、预测(Forecasting/Time Series)。它会自动:探索性数据分析(EDA)、特征工程(处理缺失值、编码分类变量、日期特征提取)、超参数调优(基于MLflow Hyperopt)、模型比较(训练多种算法并比较)。
12.2 使用AutoML
from databricks import automl
# 加载训练数据(Spark DataFrame或pandas DataFrame)
train_df = spark.table("main.ml_data.fraud_training_set")
# 运行AutoML分类任务
summary = automl.classify(
dataset=train_df,
target_col="is_fraud",
# 可选:指定时间列(用于时序切分,避免数据泄露)
time_col="transaction_timestamp",
# 可选:排除不应作为特征的列
exclude_cols=["transaction_id", "customer_ssn"],
# 可选:自定义主要评估指标
primary_metric="f1",
# 最大运行时间(分钟)
timeout_minutes=30,
# 可选:指定数据分割方式
split_col="split", # 数据中预定义了train/val/test列
# MLflow实验名
experiment_dir="/Users/bill@company.com/automl_experiments"
)
# 查看最佳模型
best_model = summary.best_trial
print(f"Best model: {best_model.model_path}")
print(f"Best F1: {best_model.metrics['test_f1_score']:.4f}")
# 获取生成的Notebook路径(可以打开并修改)
print(f"Generated Notebook: {summary.trials[0].notebook_path}")
# 加载最佳模型并进行预测
import mlflow
best_model_uri = best_model.model_path
loaded_model = mlflow.pyfunc.load_model(best_model_uri)
predictions = loaded_model.predict(test_data)
AutoML最有价值的输出是生成的Notebook——它包含了完整的特征工程、模型训练、评估代码,是一个可直接修改的模板,让数据科学家从"0-1"的时间从几天压缩到几分钟。
13. Feature Store:特征工程的工厂
13.1 Feature Store解决的核心问题
Feature Store(特征存储)解决了ML工程中一个经典问题:训练-服务不一致(Training-Serving Skew)。
在没有Feature Store的情况下,训练时的特征计算代码(Python/Spark)和服务时的特征计算代码(Python/Java/REST API)是分开维护的。随着时间推移,两套代码出现细微差异(不同的空值处理、不同的归一化公式),导致线上模型的实际输入与训练时不同,产生难以发现的性能下降。
Feature Store通过统一存储和统一计算逻辑解决这个问题:特征只定义一次,训练和服务都从同一个Feature Store读取。
13.2 使用Feature Store
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
# 步骤1:创建Feature Table
# 先定义特征计算函数(返回Spark DataFrame)
def compute_user_features(df):
return (df
.groupBy("user_id")
.agg(
F.count("transaction_id").alias("transaction_count_30d"),
F.sum("amount").alias("total_spend_30d"),
F.avg("amount").alias("avg_transaction_amount"),
F.stddev("amount").alias("std_transaction_amount"),
F.max("fraud_flag").alias("has_fraud_history")
)
.withColumn("spend_velocity",
F.col("total_spend_30d") / F.lit(30))
)
# 计算特征(用过去30天数据)
user_features_df = compute_user_features(transactions_df)
# 将特征写入Feature Store(创建Feature Table)
fe.create_table(
name="main.features.user_transaction_features",
primary_keys=["user_id"],
timestamp_keys=["feature_date"], # 时序特征用时间戳键
df=user_features_df,
description="用户过去30天的交易特征,每日更新"
)
# 步骤2:在训练时使用Feature Store
from databricks.feature_engineering import FeatureLookup
# 定义特征查找(从Feature Table自动join特征)
feature_lookups = [
FeatureLookup(
table_name="main.features.user_transaction_features",
feature_names=["transaction_count_30d", "total_spend_30d",
"avg_transaction_amount", "spend_velocity"],
lookup_key="user_id",
timestamp_lookup_key="transaction_timestamp" # Point-in-time correct join
),
FeatureLookup(
table_name="main.features.merchant_risk_features",
feature_names=["merchant_risk_score", "merchant_category_fraud_rate"],
lookup_key="merchant_id"
)
]
# 创建训练数据集(自动Join特征)
training_set = fe.create_training_set(
df=labels_df, # 只包含主键和label的DataFrame
feature_lookups=feature_lookups,
label="is_fraud",
exclude_columns=["transaction_timestamp"] # 不作为模型输入的列
)
# 转换为pandas DataFrame用于训练
training_df = training_set.load_df().toPandas()
# 步骤3:用Feature Store记录模型(关键!记录特征依赖)
import mlflow
with mlflow.start_run():
model = train_model(training_df)
# 使用fe.log_model代替mlflow.sklearn.log_model
# 这样模型和Feature Store的关联关系会被记录
fe.log_model(
model=model,
artifact_path="fraud_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name="main.ml_models.fraud_detector"
)
14. Model Serving:模型上线的最后一公里
14.1 Model Serving的架构
Databricks Model Serving提供了托管的REST API端点,将MLflow中注册的模型暴露为HTTP API,用于实时推断(Real-time Inference)。它是Serverless的,按调用量和使用时间计费,无需管理任何Kubernetes/容器基础设施。
Model Serving支持多种模型类型:MLflow Python模型(sklearn/PyTorch/XGBoost等)、外部模型(通过统一API代理OpenAI/Anthropic/Azure OpenAI,便于模型切换和成本控制)、基础模型API(Databricks托管的Llama、DBRX、Mistral等开源LLM)。
14.2 创建和使用Model Serving Endpoint
import mlflow
from mlflow.deployments import get_deploy_client
# 方式1:通过Databricks SDK创建Endpoint
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import ServedModelInput, ServedModelInputWorkloadSize
w = WorkspaceClient()
endpoint = w.serving_endpoints.create_and_wait(
name="fraud-detector-endpoint",
config=w.serving_endpoints.EndpointCoreConfigInput(
served_models=[
ServedModelInput(
name="fraud-detector-v1",
model_name="main.ml_models.fraud_detector",
model_version="3",
workload_size=ServedModelInputWorkloadSize.SMALL, # SMALL/MEDIUM/LARGE
scale_to_zero_enabled=True # 没有流量时自动缩容到0
)
]
)
)
# 方式2:A/B测试(流量切分)
endpoint_with_ab = w.serving_endpoints.create_and_wait(
name="fraud-detector-ab-test",
config=w.serving_endpoints.EndpointCoreConfigInput(
served_models=[
ServedModelInput(
name="champion",
model_name="main.ml_models.fraud_detector",
model_version="3",
traffic_percentage=80 # 80%流量
),
ServedModelInput(
name="challenger",
model_name="main.ml_models.fraud_detector_v2",
model_version="1",
traffic_percentage=20 # 20%流量用于测试新模型
)
]
)
)
# 调用Model Serving Endpoint
import requests
endpoint_url = "https://your-workspace.azuredatabricks.net/serving-endpoints/fraud-detector-endpoint/invocations"
token = "your_databricks_token"
response = requests.post(
url=endpoint_url,
headers={"Authorization": f"Bearer {token}"},
json={
"dataframe_records": [
{
"user_id": "user_123",
"merchant_id": "merch_456",
"amount": 1500.00,
"transaction_hour": 2,
"country_code": "DE"
}
]
}
)
result = response.json()
print(result["predictions"])
# 输出: [{"fraud_probability": 0.87, "is_fraud": 1, "risk_level": "high"}]
14.3 Foundation Models API
Databricks提供了对开源大语言模型(如Meta Llama 3, Mistral, DBRX)的托管API,遵循OpenAI API格式,便于从OpenAI迁移:
from openai import OpenAI
# 使用Databricks Foundation Models API(OpenAI兼容接口)
client = OpenAI(
api_key="your-databricks-token",
base_url="https://your-workspace.azuredatabricks.net/serving-endpoints"
)
# 使用Llama 3进行Chat Completion
response = client.chat.completions.create(
model="databricks-meta-llama-3-1-70b-instruct",
messages=[
{"role": "system", "content": "You are a financial analysis assistant."},
{"role": "user", "content": "Analyze the key risks in the following transaction: ..."}
],
max_tokens=500,
temperature=0.1
)
print(response.choices[0].message.content)
15. Databricks Asset Bundles(DAB):基础设施即代码
15.1 DAB的核心价值
Databricks Asset Bundles(DAB)是Databricks对"基础设施即代码"的官方实现。它允许你用YAML文件定义所有的Databricks资源(Workflows、Clusters、DLT Pipelines、Model Serving Endpoints、Jobs),并通过CLI将这些定义部署到不同的环境(dev/staging/prod)。
DAB对于Platform Engineer来说极其重要,因为它解决了Databricks在CI/CD流程中的关键问题:如何将Databricks资源的变更纳入版本控制和自动化部署。没有DAB之前,只能手动通过UI点击来配置和部署,无法实现基础设施的可重现性。
15.2 DAB的项目结构
my_databricks_project/
├── databricks.yml ← 主配置文件(定义bundle名、workspace连接、环境)
├── resources/
│ ├── jobs.yml ← Workflow/Job定义
│ ├── pipelines.yml ← DLT Pipeline定义
│ └── serving_endpoints.yml ← Model Serving Endpoint定义
├── src/
│ ├── notebooks/ ← Notebook文件
│ ├── python/ ← Python源代码
│ └── sql/ ← SQL文件
└── tests/
└── test_pipeline.py ← 测试代码
15.3 完整的DAB配置示例
# databricks.yml(主配置文件)
bundle:
name: finlakehouse_etl
# 变量定义(可被各环境覆盖)
variables:
catalog_name:
default: "dev"
description: "Unity Catalog名称"
dlt_pipeline_id:
description: "DLT Pipeline ID(部署后填入)"
# 工作区连接(不同环境连接不同Workspace)
workspace:
host: https://your-workspace.azuredatabricks.net
auth_type: pat
# 多环境配置
targets:
# 开发环境
dev:
mode: development # 开发模式:所有资源名称加dev前缀,避免污染生产
workspace:
host: https://dev-workspace.azuredatabricks.net
variables:
catalog_name: "dev"
# 生产环境
prod:
mode: production
workspace:
host: https://prod-workspace.azuredatabricks.net
variables:
catalog_name: "prod"
run_as:
service_principal_name: etl-service-principal@project.com
# 引入其他YAML资源文件
include:
- resources/*.yml
# resources/jobs.yml
resources:
jobs:
macro_etl_daily:
name: "FinLakehouse - Macro ETL Daily"
description: "Daily macro economic data ingestion and processing"
schedule:
quartz_cron_expression: "0 30 7 * * ?"
timezone_id: "Europe/Berlin"
pause_status: UNPAUSED
# 在开发环境中,jobs.run_as可以是当前用户
# 在生产环境中,使用Service Principal
job_clusters:
- job_cluster_key: "default_cluster"
new_cluster:
spark_version: "14.3.x-scala2.12"
node_type_id: "i3.xlarge"
autoscale:
min_workers: 1
max_workers: 4
spark_conf:
"spark.databricks.delta.preview.enabled": "true"
data_security_mode: SINGLE_USER
tasks:
- task_key: "ingest_ecb_rates"
job_cluster_key: "default_cluster"
python_wheel_task:
package_name: "finlakehouse"
entry_point: "ingest_ecb"
parameters:
- "--catalog"
- "${var.catalog_name}"
- "--env"
- "${bundle.target}"
- task_key: "process_bronze_to_silver"
depends_on:
- task_key: "ingest_ecb_rates"
job_cluster_key: "default_cluster"
notebook_task:
notebook_path: ./src/notebooks/bronze_to_silver
base_parameters:
catalog: "${var.catalog_name}"
email_notifications:
on_failure:
- "bill@company.com"
# 健康检查
health:
rules:
- metric: RUN_DURATION_SECONDS
op: GREATER_THAN
value: 3600 # 如果超过1小时,发出警告
15.4 DAB的常用CLI命令
# 安装Databricks CLI(v0.200+,Go版本,支持DAB)
curl -fsSL https://raw.githubusercontent.com/databricks/setup-cli/main/install.sh | sh
# 配置认证
databricks configure --token
# 验证bundle配置(不实际部署,只做验证和渲染)
databricks bundle validate
# 部署到开发环境(上传代码+创建/更新Databricks资源)
databricks bundle deploy --target dev
# 部署到生产环境
databricks bundle deploy --target prod
# 手动触发一次Job运行(不等待完成)
databricks bundle run macro_etl_daily --target dev
# 查看部署状态
databricks bundle summary --target prod
# 销毁部署(删除所有Bundle创建的资源)
databricks bundle destroy --target dev
# 在GitHub Actions中的典型CI/CD配置
# .github/workflows/deploy.yml
# on: push to main → databricks bundle deploy --target prod
16. Repos & Git 集成
16.1 Repos的作用
Databricks Repos允许你将Git仓库(GitHub、GitLab、Bitbucket、Azure DevOps)克隆到Databricks Workspace,直接在Workspace中编辑、提交、推送代码。这解决了传统Notebook只能存在Workspace中、无法被Git有效版本控制的问题。
通过Repos,你的工作流程可以是:本地IDE开发 → Git Push → GitHub Actions触发CI → Databricks Bundle Deploy → 生产部署。Notebook文件在Repos中以.py格式(使用# COMMAND ----------分隔符)存储,可以被pytest测试,可以被代码审查。
# 在Workspace UI中添加Repo
# 1. 左侧导航栏 → Repos → Add Repo
# 2. 输入GitHub仓库URL
# 3. 选择分支
# 或者通过API/CLI添加
databricks repos create \
--url https://github.com/your-org/finlakehouse \
--provider gitHub
16.2 与Git的集成最佳实践
在生产环境中,应该通过CI/CD管道(而非手动操作)来更新Repos。典型的工作流是:开发分支合并到main → GitHub Actions触发 → databricks bundle deploy --target prod → 生产Repo自动更新到最新代码并重新部署Job。
不要在生产Workspace中直接编辑代码——这是一个常见的反模式,会导致"谁改了什么、什么时候改的"无法追踪。所有生产代码变更都应该通过Git PR流程。
17. Secrets & Security 安全体系
17.1 Secrets管理
Databricks Secrets提供了安全存储敏感信息(API密钥、数据库密码、访问令牌)的机制。Secrets存储在加密的后端,在Notebook中引用时会被自动隐藏(显示为[REDACTED])。
# 通过CLI创建Secret Scope(两种类型)
# 1. Databricks-backed(存在Databricks内部)
databricks secrets create-scope --scope my-secrets
# 2. Azure Key Vault-backed(存在Azure Key Vault)
databricks secrets create-scope \
--scope azure-kv-scope \
--scope-backend-type AZURE_KEYVAULT \
--resource-id /subscriptions/.../vaults/my-key-vault \
--dns-name https://my-key-vault.vault.azure.net/
# 添加Secret
databricks secrets put-secret my-secrets kafka-api-key --string-value "your-api-key"
databricks secrets put-secret my-secrets db-password --string-value "your-password"
# 列出Scope中的所有Secret(只能看key名,不能看值)
databricks secrets list-secrets my-secrets
# 在Notebook/脚本中使用Secrets
api_key = dbutils.secrets.get(scope="my-secrets", key="kafka-api-key")
db_password = dbutils.secrets.get(scope="my-secrets", key="db-password")
# 在Spark配置中使用(用于存储访问)
spark.conf.set(
"fs.azure.account.key.mystorageaccount.blob.core.windows.net",
dbutils.secrets.get(scope="azure-storage", key="storage-account-key")
)
17.2 IP访问列表与网络安全
在企业Databricks部署中,网络安全是关键考量:
Private Link / VNet Injection:将Databricks的Control Plane(API/UI)和Data Plane(Clusters)部署在你的VPC/VNet内,避免流量经过公网。
IP Access Lists:限制只有特定IP段才能访问Databricks Workspace的Web UI和REST API,防止未授权访问。
Customer-Managed Keys(CMK):使用你自己的KMS密钥加密Databricks的存储(Notebook内容、Job运行日志等),满足金融监管(DORA/BaFin)对数据加密的要求。
18. Databricks CLI & REST API
18.1 CLI全览
Databricks CLI(v0.200+,基于Go重写)是与Databricks平台交互的命令行工具,覆盖了几乎所有的API功能:
# 认证配置
databricks configure # 交互式配置
databricks configure --token # 使用PAT(Personal Access Token)
databricks auth login --host https://... # OAuth登录
# Workspace操作
databricks workspace ls /Users/bill@company.com
databricks workspace import ./local_notebook.py /Repos/main/notebook.py
databricks workspace export /path/to/notebook ./local.py
# Cluster管理
databricks clusters list
databricks clusters create --json @cluster_config.json
databricks clusters start --cluster-id 1234-567890-abc123
databricks clusters delete --cluster-id 1234-567890-abc123
# Job/Workflow管理
databricks jobs list
databricks jobs run-now --job-id 123456
databricks runs get --run-id 789012
databricks runs cancel --run-id 789012
# DBFS(Databricks文件系统)
databricks fs ls dbfs:/path/to/data
databricks fs cp ./local_file.csv dbfs:/path/to/destination/
databricks fs rm dbfs:/path/to/file
# Secrets
databricks secrets list-scopes
databricks secrets list-secrets --scope my-scope
databricks secrets put-secret --scope my-scope --key my-key --string-value "value"
# Unity Catalog
databricks catalogs list
databricks schemas list --catalog-name main
databricks tables list --catalog-name main --schema-name finance
18.2 REST API
Databricks的所有功能都通过REST API暴露,是自动化和集成的基础:
import requests
DATABRICKS_HOST = "https://your-workspace.azuredatabricks.net"
TOKEN = "your-personal-access-token"
headers = {
"Authorization": f"Bearer {TOKEN}",
"Content-Type": "application/json"
}
# 列出所有Clusters
response = requests.get(
f"{DATABRICKS_HOST}/api/2.0/clusters/list",
headers=headers
)
clusters = response.json()["clusters"]
# 触发Job运行
job_run = requests.post(
f"{DATABRICKS_HOST}/api/2.1/jobs/run-now",
headers=headers,
json={
"job_id": 123456,
"job_parameters": {
"date": "2024-01-15",
"env": "prod"
}
}
)
run_id = job_run.json()["run_id"]
# 轮询运行状态
import time
while True:
run_status = requests.get(
f"{DATABRICKS_HOST}/api/2.1/jobs/runs/get",
headers=headers,
params={"run_id": run_id}
)
state = run_status.json()["state"]["life_cycle_state"]
if state in ["TERMINATED", "SKIPPED", "INTERNAL_ERROR"]:
result_state = run_status.json()["state"].get("result_state")
print(f"Run completed: {result_state}")
break
print(f"Run state: {state}, waiting...")
time.sleep(30)
18.3 Databricks Python SDK
比直接调用REST API更方便的是使用官方Python SDK:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import RunNow
# 使用环境变量认证(推荐)
# DATABRICKS_HOST 和 DATABRICKS_TOKEN 环境变量
w = WorkspaceClient()
# 列出所有集群
for cluster in w.clusters.list():
print(f"{cluster.cluster_id}: {cluster.cluster_name} ({cluster.state.value})")
# 触发Job并等待完成
run = w.jobs.run_now_and_wait(
job_id=123456,
job_parameters={"date": "2024-01-15"}
)
print(f"Job completed with state: {run.state.result_state.value}")
# 上传文件到Unity Catalog Volume
with open("./data.csv", "rb") as f:
w.files.upload("/Volumes/main/data/uploads/data.csv", f)
19. Databricks Connect:本地开发的桥梁
19.1 Databricks Connect的价值
Databricks Connect允许你在本地IDE(VS Code、PyCharm)中运行代码,但计算在远程Databricks Cluster上执行。你可以在本地用断点调试Spark代码,代码直接操作Databricks上的Delta表,完全不需要将数据下载到本地。
这对大型项目的开发效率提升巨大:本地IDE的代码补全、调试器、Git集成,配合Databricks的计算和存储能力,是最理想的开发体验。
# 安装(需要与远程DBR版本匹配)
pip install databricks-connect==14.3.* # 与远程DBR 14.3版本对应
# 在本地Python脚本中使用(与Notebook代码几乎相同)
from databricks.connect import DatabricksSession
# 连接到远程Cluster(使用.databrickscfg中的配置)
spark = DatabricksSession.builder \
.profile("DEFAULT") \
.getOrCreate()
# 读取远程Databricks上的Delta表(数据不下载到本地,计算在远端)
df = spark.table("main.finance.transactions")
result = df.filter("amount > 1000").groupBy("user_id").count()
# 只有collect()/toPandas()等Action会触发远程计算并将结果传回本地
result.show() # 触发远程计算,结果打印到本地控制台
# 可以在VS Code/PyCharm中设置断点调试!
20. Partner Connect & 生态集成
20.1 主要集成类别
Databricks拥有广泛的生态系统,Partner Connect页面(Workspace左侧导航)提供了一键集成常见第三方工具的功能:
数据摄入(Data Ingestion):Fivetran、Airbyte(将SaaS应用数据同步到Delta Lake);Kafka + Confluent(实时流数据摄入);dbt Cloud(SQL转换层,与Databricks SQL深度集成)。
BI与可视化:Tableau、Power BI、Looker(通过JDBC/ODBC连接SQL Warehouse);这些工具连接SQL Warehouse后,可以直接查询Delta Lake中的数据,无需额外的数据复制。
数据质量:Great Expectations(Python数据质量框架,可与DLT的Expectations配合使用);Monte Carlo、Bigeye(数据可观测性平台)。
AI/ML:Weights & Biases(实验追踪,与MLflow互补);Hugging Face(模型下载和微调);LangChain(LLM应用开发,与Model Serving集成)。
数据目录:Alation、Collibra(企业级数据目录,可与Unity Catalog的Lineage集成)。
21. Databricks on 多云:AWS/Azure/GCP差异
21.1 架构上的统一与差异
Databricks的设计目标之一是跨云一致性——同样的Python代码、同样的SQL、同样的ML API,在AWS/Azure/GCP上行为一致。但在基础设施层面,有一些重要的云特定差异需要了解:
存储:在AWS上,底层存储是S3(s3://路径);在Azure上是ADLS Gen2(abfss://路径);在GCP上是GCS(gs://路径)。通过Unity Catalog,这些差异被抽象掉,代码中只使用catalog.schema.table三段式引用。
身份认证:在Azure上,使用Azure Active Directory(AAD)和Service Principals进行身份认证,与Azure AD深度集成,支持SSO(单点登录);在AWS上,使用IAM Roles和Instance Profiles;在GCP上,使用Service Accounts。
网络:Azure上叫VNet Injection,AWS上叫VPC Peering/PrivateLink,GCP上叫VPC Peering。原理相同,实现细节不同。
Serverless:Serverless SQL Warehouse和Serverless Job Compute(无需管理Cluster的Job运行方式)在各云的可用性不同,Azure和AWS支持较完整,GCP某些功能仍在推进中。
22. 完整架构总结:Platform Engineer视角
22.1 所有组件的关系图
┌────────────────────────────────────────────────────────────┐
│ DATABRICKS PLATFORM │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Unity Catalog (治理层) │ │
│ │ Catalogs · Schemas · Tables · Volumes · Models │ │
│ │ Lineage · Audit Log · Delta Sharing · Row/Col Policy │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ Data Eng. │ │ SQL/BI │ │ ML/AI │ │
│ │ │ │ │ │ │ │
│ │ DLT │ │ SQL Warehouse│ │ MLflow │ │
│ │ Workflows │ │ Dashboards │ │ AutoML │ │
│ │ Spark │ │ Queries │ │ Feature Store │ │
│ └──────┬──────┘ └──────┬───────┘ │ Model Serving │ │
│ │ │ └────────┬─────────┘ │
│ ┌──────▼──────────────────────────────────▼──────────┐ │
│ │ Compute Layer │ │
│ │ All-Purpose Clusters │ Job Clusters │ │
│ │ SQL Warehouses (Photon)│ Serverless Compute │ │
│ │ Databricks Runtime (DBR / ML / Photon) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Storage & Open Formats (存储层) │ │
│ │ Delta Lake · Apache Iceberg · Parquet · JSON │ │
│ │ Object Storage: S3 / ADLS Gen2 / GCS │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 开发者工具 & DevOps (开发层) │ │
│ │ Repos/Git · DAB (IaC) · CLI · REST API · SDK │ │
│ │ Notebooks · Databricks Connect (Local Dev) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────┘
22.2 典型的端到端数据流
下面展示一个金融数据平台(FinLakehouse类场景)的端到端数据流,体现所有组件如何协作:
外部数据源(ECB API、Euribor、市场数据)
│
▼
[摄入层] Python脚本/Notebook (Job Cluster)
→ 写入 Delta Lake Bronze (Raw数据,完整保留原始格式)
→ 通过Unity Catalog Volume暂存文件
│
▼
[转换层] DLT Pipeline (Job Cluster,自动管理)
Bronze → Silver(清洗、标准化、数据质量Expectations)
Silver → Gold(业务指标聚合,如macro_state_daily)
→ 所有层的表都在Unity Catalog中注册
│
├──────────────────────────┬─────────────────────────
▼ ▼ ▼
[分析层] [ML层] [服务层]
SQL Warehouse (Photon) Feature Store Model Serving
→ Dashboards/BI → MLflow Experiments → REST API
→ Tableau/Power BI连接 → MLflow Model Registry → 实时预测
→ 自助查询 → Model Training → A/B测试
│
▼
[编排层] Databricks Workflows
→ 调度所有上述任务的执行顺序
→ 监控、告警、重试
→ 通过DAB部署,纳入CI/CD
[治理层] Unity Catalog(横跨所有层)
→ 自动记录数据Lineage(从Bronze到Gold到Dashboard)
→ 列级权限控制(PII字段脱敏)
→ 审计日志(谁在什么时候读取了哪张表)
22.3 Platform Engineer的核心能力矩阵
作为一个Platform Engineer,你对Databricks的掌握程度应该体现在以下维度:
架构设计能力:能够根据业务需求选择合适的组件组合(DLT vs 手写Spark、Job Cluster vs SQL Warehouse、Managed vs External Table),并能解释取舍理由(Trade-off Analysis)。
成本优化能力:理解DBU(Databricks Unit)的计费模型,知道如何通过Auto-Terminate、Spot实例、Cluster共享、Serverless等手段控制成本。能够通过System Tables(system.billing、system.compute)分析和优化成本。
治理设计能力:能够设计Unity Catalog的Catalog层次结构、权限模型、数据分类策略,满足GDPR/DORA等合规要求。
DevOps集成能力:使用DAB将Databricks资源纳入版本控制和CI/CD(GitHub Actions/Azure DevOps),实现可重现、可审计的部署流程。
性能调优能力:能够通过Spark UI诊断性能问题(数据倾斜、Shuffle过多、小文件问题),并通过Z-Order/Liquid Clustering、OPTIMIZE、AQE、广播Join等手段解决。
ML工程能力:能够设计完整的ML平台——从Feature Store到Model Training到Registry到Serving,确保训练-服务一致性,并集成监控和再训练流程。
附录A:关键配置速查
# 常用的Spark配置(在Cluster创建时设置)
spark_conf = {
# 性能优化
"spark.sql.shuffle.partitions": "200",
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.sql.adaptive.skewJoin.enabled": "true",
# Delta Lake优化
"spark.databricks.delta.optimizeWrite.enabled": "true",
"spark.databricks.delta.autoCompact.enabled": "true",
# 内存管理
"spark.memory.fraction": "0.8",
"spark.memory.storageFraction": "0.3",
# 广播Join阈值(10MB是默认值,可适当提高)
"spark.sql.autoBroadcastJoinThreshold": "20971520", # 20MB
}
附录B:常用dbutils命令速查
# 文件系统操作
dbutils.fs.ls("/mnt/data") # 列出目录
dbutils.fs.cp("src", "dst") # 复制文件
dbutils.fs.mv("src", "dst") # 移动文件
dbutils.fs.rm("/path", recurse=True) # 删除
dbutils.fs.mkdirs("/new/path") # 创建目录
# Notebook控制
dbutils.notebook.run("/path/to/nb", timeout_seconds=600,
arguments={"param": "value"})
dbutils.notebook.exit("success") # 退出当前Notebook并返回值
# Secrets
dbutils.secrets.listScopes() # 列出所有Scope
dbutils.secrets.list("scope-name") # 列出Scope中的Key
dbutils.secrets.get("scope", "key") # 获取Secret值
# Widgets(Notebook参数)
dbutils.widgets.text("date", "2024-01-01", "Processing Date")
dbutils.widgets.dropdown("env", "dev", ["dev", "staging", "prod"])
date = dbutils.widgets.get("date") # 读取参数值
dbutils.widgets.removeAll() # 清除所有Widgets
# Task Values(Workflow中跨Task传值)
dbutils.jobs.taskValues.set(key="output_count", value=1000)
dbutils.jobs.taskValues.get(taskKey="previous_task", key="output_count")
本文档涵盖了Databricks平台的所有主要组件。每个组件都是一个深度领域——建议以本文档为地图,根据当前项目需要深入某个具体方向。Delta Lake和Unity Catalog是优先级最高的深入方向,它们是平台的地基。