๐๏ธ SmartPouch ML Platform - ๅบไบLakehouseๆถๆ
ๅคชๅฅฝไบ๏ผๆๅ ็ปไฝ ่ฎพ่ฎกไธไธช็ไบง็บงๅซ็ๅฎๆดๆถๆ๏ผ็ถๅ็ญไฝ ๅ่ฏๆๅ ทไฝ็ฏๅขๅ๏ผๆไผ็ปๅบ่ฏฆ็ป็ๅฎ็ฐไปฃ็ ใ
ๆดไฝๆถๆๅพ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ DATA SOURCES LAYER โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ IoT Devices โ Mobile Apps โ Historical CSV Files โ
โ (MQTT/Kafka) โ (REST API) โ (Batch Upload) โ
โโโโโโโโโโฌโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโ
โ โ โ
โผ โผ โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ INGESTION LAYER โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Apache Kafka/Kinesis โ REST API Gateway โ S3/MinIO Bucket โ
โ (Real-time Stream) โ (On-demand) โ (Batch) โ
โโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโ
โ โ
โผ โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ BRONZE LAYER (Raw Data) โ
โ Delta Lake / Iceberg โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Location: s3://lakehouse/bronze/smartpouch/ โ
โ Format: Delta/Parquet (append-only, immutable) โ
โ Schema: timestamp, device_id, aG, bG, cG, xG, yG, zG, metadata โ
โ Partitioning: year/month/day/hour โ
โโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โ PySpark Streaming / Batch ETL
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ SILVER LAYER (Cleaned & Enriched) โ
โ Delta Lake / Iceberg โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ - Data Quality Checks (Great Expectations) โ
โ - Deduplication โ
โ - Sensor Calibration & Normalization โ
โ - Outlier Detection & Handling โ
โ Location: s3://lakehouse/silver/smartpouch/ โ
โโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โ Feature Engineering Pipeline
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ GOLD LAYER (Feature Store) โ
โ Delta Lake + Feature Store โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Time-series Features: โ
โ - Sliding Windows (5000 samples, 50% overlap) โ
โ - Statistical Features (mean, std, skewness per channel) โ
โ - Frequency Domain Features (FFT) โ
โ - Label Encoding with Quality Score โ
โ Location: s3://lakehouse/gold/features/ โ
โ Registered in: Databricks Feature Store / Feast โ
โโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโ
โผ โผ โผ โผ
โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ
โ TRAINING โ โ INFERENCE โ โ ANALYTICS โ โ EXPORT โ
โ PIPELINE โ โ PIPELINE โ โ DASHBOARD โ โ API โ
โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ
๐ ่ฏฆ็ปๅๅฑ่ฎพ่ฎก
1๏ธโฃ Bronze Layer - ๅๅงๆฐๆฎๆน
็ฎๆ ๏ผไฟ็ๆๆๅๅงๆฐๆฎ๏ผๆฏๆๅๆบฏๅๅฎก่ฎก
# BronzeๅฑๆฐๆฎSchema
bronze_schema = """
timestamp TIMESTAMP,
device_id STRING,
session_id STRING,
aG DOUBLE,
bG DOUBLE,
cG DOUBLE,
xG DOUBLE,
yG DOUBLE,
zG DOUBLE,
label STRING,
data_quality_score DOUBLE, -- ไฟกๅท่ดจ้่ฏๅ
ingestion_time TIMESTAMP,
source_system STRING, -- kafka/api/batch
_metadata STRUCT<...> -- ๅ
ๆฐๆฎ
"""
# ๅๅบ็ญ็ฅ
partitioning = "year/month/day/hour"
# ๆฐๆฎไฟ็็ญ็ฅ
retention_policy = "5 years (compliance requirement)"
ไปฃ็ ๅฎ็ฐ๏ผ
# bronze_ingestion.py
from pyspark.sql import SparkSession
from delta import *
def ingest_to_bronze(spark, source_path, bronze_path):
"""
ไปๅค็งๆฐๆฎๆบๆๅ
ฅๅฐBronzeๅฑ
"""
# ่ฏปๅๅๅงCSV๏ผไฝ ็ฐๆ็ๆฐๆฎ๏ผ
raw_df = spark.read.csv(
source_path,
header=True,
inferSchema=True
)
# ๆทปๅ ๅ
ๆฐๆฎ
bronze_df = raw_df \
.withColumn("ingestion_time", F.current_timestamp()) \
.withColumn("source_system", F.lit("batch_upload")) \
.withColumn("year", F.year("timestamp")) \
.withColumn("month", F.month("timestamp")) \
.withColumn("day", F.dayofmonth("timestamp"))
# ๅๅ
ฅDelta Lake๏ผๆฏๆACIDไบๅก๏ผ
bronze_df.write \
.format("delta") \
.mode("append") \
.partitionBy("year", "month", "day") \
.save(bronze_path)
print(f"โ
Ingested {bronze_df.count()} records to Bronze layer")
2๏ธโฃ Silver Layer - ๆธ ๆดๅ้ช่ฏ
็ฎๆ ๏ผ็ไบง็บงๆฐๆฎ่ดจ้๏ผๅฏ็ดๆฅ็จไบๅๆ
# silver_transformation.py
from great_expectations.dataset import SparkDFDataset
def clean_to_silver(spark, bronze_path, silver_path):
"""
ๆฐๆฎ่ดจ้ๆฃๆฅ + ๆธ
ๆด
"""
# ่ฏปๅBronzeๆฐๆฎ
bronze_df = spark.read.format("delta").load(bronze_path)
# 1. ๆฐๆฎ่ดจ้ๆฃๆฅ๏ผGreat Expectations๏ผ
ge_df = SparkDFDataset(bronze_df)
validation_results = ge_df.expect_column_values_to_not_be_null("timestamp")
validation_results &= ge_df.expect_column_values_to_be_between("aG", -20, 20)
# ... ๆดๅคๆฃๆฅ
# 2. ๅป้๏ผๅบไบtimestamp + device_id๏ผ
deduped_df = bronze_df.dropDuplicates(["timestamp", "device_id"])
# 3. ไผ ๆๅจๆ กๅ
calibrated_df = deduped_df \
.withColumn("aG_calibrated",
F.when(F.col("aG").between(-20, 20), F.col("aG"))
.otherwise(F.lit(None))
)
# 4. ๅผๅธธๅผๆฃๆต๏ผไฝฟ็จZ-score๏ผ
from pyspark.ml.feature import StandardScaler, VectorAssembler
assembler = VectorAssembler(
inputCols=["aG", "bG", "cG", "xG", "yG", "zG"],
outputCol="features"
)
silver_df = calibrated_df \
.withColumn("is_outlier", detect_outliers(F.col("features")))
# 5. ๅๅ
ฅSilverๅฑ๏ผๅธฆSchema Evolution๏ผ
silver_df.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.partitionBy("year", "month", "day") \
.save(silver_path)
# 6. ๆฐๆฎ่ดจ้ๆฅๅ
quality_report = {
"total_records": bronze_df.count(),
"after_dedup": deduped_df.count(),
"outliers_detected": silver_df.filter("is_outlier = true").count(),
"null_percentage": calculate_null_rate(silver_df)
}
return quality_report
3๏ธโฃ Gold Layer - ็นๅพๅทฅ็จ
ๆ ธๅฟๆน้ ๏ผๅฐไฝ ็็ชๅฃๆๅปบ้ป่พๆนไธบPySparkๅๅธๅผ็ๆฌ
# gold_feature_engineering.py
def build_windows_distributed(spark, silver_path, gold_path):
"""
ๅๅธๅผๆปๅจ็ชๅฃ็นๅพๅทฅ็จ
"""
silver_df = spark.read.format("delta").load(silver_path)
# ๅ
ณ้ฎ๏ผไฝฟ็จSpark็Windowๅฝๆฐ
from pyspark.sql.window import Window
from pyspark.sql import functions as F
# ๅฎไน็ชๅฃ่ง่
window_spec = Window \
.partitionBy("device_id", "session_id") \
.orderBy("timestamp") \
.rowsBetween(-5000, 0) # 5000ๆ ทๆฌ็ชๅฃ
# ่ฎก็ฎ็ชๅฃ็นๅพ
windowed_df = silver_df \
.withColumn("window_id",
(F.row_number().over(window_spec) / 2500).cast("int")
) \
.groupBy("device_id", "session_id", "window_id") \
.agg(
# ๆถ้็ชๅฃๅ
็ๆถๅบๆฐๆฎ
F.collect_list(F.struct("aG", "bG", "cG", "xG", "yG", "zG")).alias("sequence"),
F.collect_list("label").alias("labels"),
# ็ป่ฎก็นๅพ
F.mean("aG").alias("aG_mean"),
F.stddev("aG").alias("aG_std"),
F.skewness("aG").alias("aG_skew"),
F.kurtosis("aG").alias("aG_kurt"),
# ... ๅ
ถไป้้
# ้ขๅ็นๅพ๏ผไฝฟ็จUDF๏ผ
compute_fft_features(F.col("sequence")).alias("fft_features"),
# ๆ ็ญพๆ็ฅจ
mode_label(F.col("labels")).alias("window_label"),
label_purity(F.col("labels")).alias("label_purity")
) \
.filter(F.col("label_purity") >= 0.8) # ไฝ ็MIN_LABEL_RATIO
# ๅๅ
ฅGoldๅฑ
windowed_df.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("window_label") \
.save(gold_path)
# ๆณจๅๅฐFeature Store
register_features_to_store(windowed_df, feature_table="smartpouch_windows_v1")
ๅ ณ้ฎUDFๅฎ็ฐ๏ผ
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DoubleType
import numpy as np
@udf(returnType=ArrayType(DoubleType()))
def compute_fft_features(sequence):
"""่ฎก็ฎFFT็นๅพ๏ผ้ขๅๅๆ๏ผ"""
if not sequence or len(sequence) < 100:
return [0.0] * 10
# ๆๅๅ้้
signal = [s['aG'] for s in sequence]
# FFTๅๆข
fft_vals = np.fft.fft(signal)
power_spectrum = np.abs(fft_vals[:len(fft_vals)//2])
# ๆๅ้ขๅ็ป่ฎก็นๅพ
features = [
float(np.mean(power_spectrum)),
float(np.std(power_spectrum)),
float(np.max(power_spectrum)),
# ... ไธป้ข็ใ่ฝ้ๅๅธ็ญ
]
return features
@udf(returnType=StringType())
def mode_label(labels):
"""ๅคๆฐๆ็ฅจ"""
from collections import Counter
if not labels:
return None
return Counter(labels).most_common(1)[0][0]
@udf(returnType=DoubleType())
def label_purity(labels):
"""ๆ ็ญพ็บฏๅบฆ"""
if not labels:
return 0.0
from collections import Counter
counts = Counter(labels)
return counts.most_common(1)[0][1] / len(labels)
4๏ธโฃ Training Pipeline - ้ๆไฝ ็ๆจกๅ
# training_pipeline.py
class SmartPouchTrainingPipeline:
def __init__(self, lakehouse_path, mlflow_uri):
self.spark = SparkSession.builder \
.appName("SmartPouch-Training") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.getOrCreate()
self.lakehouse_path = lakehouse_path
# MLflowๅฎ้ช่ฟฝ่ธช
import mlflow
mlflow.set_tracking_uri(mlflow_uri)
mlflow.set_experiment("smartpouch-cbam-lstm")
def load_features_from_gold(self):
"""ไปGoldๅฑๅ ่ฝฝ็นๅพ"""
gold_df = self.spark.read.format("delta").load(
f"{self.lakehouse_path}/gold/features/"
)
# ่ฝฌๆขไธบPandas๏ผๅฐๆฐๆฎ้๏ผๆ ็ปง็ปญ็จSpark ML
pdf = gold_df.toPandas()
# ่งฃๅ
ๅบๅๆฐๆฎ
X = np.stack([
np.array([list(s.values()) for s in seq])
for seq in pdf['sequence']
])
y = pdf['window_label'].map(self.label_encoder).values
return X, y
def train_with_mlflow(self):
"""่ฎญ็ปๅนถ่ฎฐๅฝๅฐMLflow"""
X, y = self.load_features_from_gold()
with mlflow.start_run():
# ่ฎฐๅฝๅๆฐ
mlflow.log_params({
"window_length": 5000,
"model_architecture": "CBAM-CNN-BiLSTM",
"optimizer": "Adam",
"learning_rate": 1e-3
})
# ่ฎญ็ปไฝ ็ๆจกๅ๏ผไฟๆๅๆไปฃ็ ๏ผ
model = build_smartpouch_model(...)
history = model.fit(X, y, ...)
# ่ฎฐๅฝๆๆ
mlflow.log_metrics({
"train_accuracy": history.history['accuracy'][-1],
"val_accuracy": history.history['val_accuracy'][-1]
})
# ไฟๅญๆจกๅ
mlflow.tensorflow.log_model(model, "model")
# ๆณจๅๆจกๅๅฐModel Registry
mlflow.register_model(
f"runs:/{mlflow.active_run().info.run_id}/model",
"smartpouch-classifier"
)
5๏ธโฃ Inference Pipeline - ๅฎๆถๆจ็
# inference_service.py
from fastapi import FastAPI
from pyspark.sql import SparkSession
import mlflow
app = FastAPI()
# ๅ ่ฝฝๆจกๅ
model = mlflow.tensorflow.load_model("models:/smartpouch-classifier/Production")
# Sparkๅฎไพ๏ผๅค็จ่ฟๆฅ๏ผ
spark = SparkSession.builder.getOrCreate()
@app.post("/predict")
async def predict(sensor_data: SensorInput):
"""
ๅฎๆถๆจ็API
"""
# 1. ๅๅ
ฅBronzeๅฑ๏ผๆตๅผ๏ผ
raw_df = spark.createDataFrame([sensor_data.dict()])
raw_df.write.format("delta").mode("append").save("s3://bronze/")
# 2. ๅฎๆถ็นๅพๅทฅ็จ
features = extract_features_realtime(sensor_data)
# 3. ๆจกๅ้ขๆต
prediction = model.predict(features)
# 4. ๅๅ
ฅ้ขๆต็ปๆ๏ผ็จไบ็ๆง๏ผ
save_prediction_to_gold(prediction, sensor_data.device_id)
return {
"predicted_label": prediction,
"confidence": float(prediction.max()),
"timestamp": datetime.now()
}
๐ฏ ๅ ณ้ฎๆๆฏไบฎ็น๏ผ้ข่ฏๆถๅผบ่ฐ๏ผ
1. Lambda Architectureๅไฝ
ๆนๅค็่ทฏๅพ๏ผๅๅฒๆฐๆฎ๏ผ๏ผBronze โ Silver โ Gold โ Training
โ
ๅฎๆถ่ทฏๅพ๏ผๆฐๆฐๆฎ๏ผ๏ผStreaming โ Feature Store โ Inference
โ
็ปไธๆๅกๅฑ๏ผDelta Lakeๆไพไธ่ดๆง่งๅพ
2. ๆฐๆฎ็ๆฌๆงๅถ
# Delta Lake็ๆถ้ดๆ
่ก
df_v1 = spark.read.format("delta").option("versionAsOf", 1).load(...)
df_v2 = spark.read.format("delta").option("timestampAsOf", "2024-01-01").load(...)
# Schema Evolution
spark.read.format("delta").option("mergeSchema", "true").load(...)
3. ๅฏ่งๆตๆงๆ
็ๆง็ปไปถ:
- Databricks Dashboard: ๆฐๆฎ่ดจ้็ๆง
- MLflow: ๆจกๅๆง่ฝ่ฟฝ่ธช
- Prometheus + Grafana: ๆจ็ๅปถ่ฟ็ๆง
- Great Expectations: ๆฐๆฎๅฅ็บฆ้ช่ฏ
๐ ๆๆฌๅๆง่ฝไผฐ็ฎ
| ๅฑ็บง | ๆฐๆฎ้ | ๅค็ๆถ้ด | ๆๆฌ๏ผๆ๏ผ |
|---|---|---|---|
| Bronze (ๅๅง) | 1TB | - | โฌ10 (S3ๅญๅจ) |
| Silver (ๆธ ๆด) | 800GB | 30min/ๆนๆฌก | โฌ8 |
| Gold (็นๅพ) | 200GB | 2ๅฐๆถ/ๆนๆฌก | โฌ5 |
| ๆป่ฎก | 2TB | <3ๅฐๆถ | โฌ23/ๆ |
็ฐๅจๅ่ฏๆไฝ ็SoloLakehouse็ฏๅข๏ผๆไผ็ปไฝ ๏ผ
- ๅ ทไฝ็้จ็ฝฒ้ ็ฝฎ๏ผDocker Compose / Terraform่ๆฌ๏ผ
- ็ซฏๅฐ็ซฏ็ไปฃ็ ๅฎ็ฐ๏ผๅฏ็ดๆฅ่ฟ่ก๏ผ
- ็ฎๅ้กน็ฎๆ่ฟฐๆจกๆฟ๏ผ็ชๅบๆถๆ่ฝๅ๏ผ
่ฟไธชๆถๆ่ฝๅฑ็คบ๏ผ โ ๆฐๆฎๅทฅ็จ๏ผLakehouseๆถๆ๏ผ โ MLๅทฅ็จ๏ผ็ซฏๅฐ็ซฏpipeline๏ผ โ ๅนณๅฐๆ็ปด๏ผๅฏๆฉๅฑใๅฏ็ๆง๏ผ โ ๆๆฌๆ่ฏ๏ผๆททๅ่ฎก็ฎ็ญ็ฅ๏ผ
ๅฏน100k+ offer็ๅธฎๅฉ๏ผโญโญโญโญโญ๏ผ่ฟๆฏๅนณๅฐๆถๆๅธ็ๆ ธๅฟ่ฝๅ๏ผ