๐Ÿ—๏ธ SmartPouch ML Platform - ๅŸบไบŽLakehouseๆžถๆž„

๐Ÿ—๏ธ SmartPouch ML Platform - ๅŸบไบŽLakehouseๆžถๆž„
Photo by SOHAM BANERJEE / Unsplash

ๅคชๅฅฝไบ†๏ผๆˆ‘ๅ…ˆ็ป™ไฝ ่ฎพ่ฎกไธ€ไธช็”Ÿไบง็บงๅˆซ็š„ๅฎŒๆ•ดๆžถๆž„๏ผŒ็„ถๅŽ็ญ‰ไฝ ๅ‘Š่ฏ‰ๆˆ‘ๅ…ทไฝ“็ŽฏๅขƒๅŽ๏ผŒๆˆ‘ไผš็ป™ๅ‡บ่ฏฆ็ป†็š„ๅฎž็Žฐไปฃ็ ใ€‚

ๆ•ดไฝ“ๆžถๆž„ๅ›พ

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                      DATA SOURCES LAYER                          โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  IoT Devices    โ”‚   Mobile Apps   โ”‚   Historical CSV Files      โ”‚
โ”‚  (MQTT/Kafka)   โ”‚   (REST API)    โ”‚   (Batch Upload)            โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
         โ”‚                 โ”‚                 โ”‚
         โ–ผ                 โ–ผ                 โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    INGESTION LAYER                               โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  Apache Kafka/Kinesis  โ”‚  REST API Gateway  โ”‚  S3/MinIO Bucket  โ”‚
โ”‚  (Real-time Stream)    โ”‚  (On-demand)       โ”‚  (Batch)          โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
         โ”‚                                       โ”‚
         โ–ผ                                       โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                  BRONZE LAYER (Raw Data)                         โ”‚
โ”‚                     Delta Lake / Iceberg                         โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  Location: s3://lakehouse/bronze/smartpouch/                    โ”‚
โ”‚  Format: Delta/Parquet (append-only, immutable)                 โ”‚
โ”‚  Schema: timestamp, device_id, aG, bG, cG, xG, yG, zG, metadata โ”‚
โ”‚  Partitioning: year/month/day/hour                              โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
         โ”‚
         โ”‚  PySpark Streaming / Batch ETL
         โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              SILVER LAYER (Cleaned & Enriched)                   โ”‚
โ”‚                     Delta Lake / Iceberg                         โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  - Data Quality Checks (Great Expectations)                     โ”‚
โ”‚  - Deduplication                                                โ”‚
โ”‚  - Sensor Calibration & Normalization                           โ”‚
โ”‚  - Outlier Detection & Handling                                 โ”‚
โ”‚  Location: s3://lakehouse/silver/smartpouch/                    โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
         โ”‚
         โ”‚  Feature Engineering Pipeline
         โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚               GOLD LAYER (Feature Store)                         โ”‚
โ”‚                  Delta Lake + Feature Store                      โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  Time-series Features:                                          โ”‚
โ”‚  - Sliding Windows (5000 samples, 50% overlap)                  โ”‚
โ”‚  - Statistical Features (mean, std, skewness per channel)       โ”‚
โ”‚  - Frequency Domain Features (FFT)                              โ”‚
โ”‚  - Label Encoding with Quality Score                            โ”‚
โ”‚  Location: s3://lakehouse/gold/features/                        โ”‚
โ”‚  Registered in: Databricks Feature Store / Feast                โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
         โ”‚
         โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
         โ–ผ                 โ–ผ                 โ–ผ                  โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚   TRAINING  โ”‚   โ”‚  INFERENCE  โ”‚   โ”‚  ANALYTICS  โ”‚   โ”‚   EXPORT    โ”‚
โ”‚   PIPELINE  โ”‚   โ”‚   PIPELINE  โ”‚   โ”‚  DASHBOARD  โ”‚   โ”‚   API       โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐Ÿ“‹ ่ฏฆ็ป†ๅˆ†ๅฑ‚่ฎพ่ฎก

1๏ธโƒฃ Bronze Layer - ๅŽŸๅง‹ๆ•ฐๆฎๆน–

็›ฎๆ ‡๏ผšไฟ็•™ๆ‰€ๆœ‰ๅŽŸๅง‹ๆ•ฐๆฎ๏ผŒๆ”ฏๆŒๅ›žๆบฏๅ’Œๅฎก่ฎก

# Bronzeๅฑ‚ๆ•ฐๆฎSchema
bronze_schema = """
    timestamp TIMESTAMP,
    device_id STRING,
    session_id STRING,
    aG DOUBLE,
    bG DOUBLE, 
    cG DOUBLE,
    xG DOUBLE,
    yG DOUBLE,
    zG DOUBLE,
    label STRING,
    data_quality_score DOUBLE,  -- ไฟกๅท่ดจ้‡่ฏ„ๅˆ†
    ingestion_time TIMESTAMP,
    source_system STRING,       -- kafka/api/batch
    _metadata STRUCT<...>       -- ๅ…ƒๆ•ฐๆฎ
"""

# ๅˆ†ๅŒบ็ญ–็•ฅ
partitioning = "year/month/day/hour"

# ๆ•ฐๆฎไฟ็•™็ญ–็•ฅ
retention_policy = "5 years (compliance requirement)"

ไปฃ็ ๅฎž็Žฐ๏ผš

# bronze_ingestion.py
from pyspark.sql import SparkSession
from delta import *

def ingest_to_bronze(spark, source_path, bronze_path):
    """
    ไปŽๅคš็งๆ•ฐๆฎๆบๆ‘„ๅ…ฅๅˆฐBronzeๅฑ‚
    """
    # ่ฏปๅ–ๅŽŸๅง‹CSV๏ผˆไฝ ็Žฐๆœ‰็š„ๆ•ฐๆฎ๏ผ‰
    raw_df = spark.read.csv(
        source_path,
        header=True,
        inferSchema=True
    )
    
    # ๆทปๅŠ ๅ…ƒๆ•ฐๆฎ
    bronze_df = raw_df \
        .withColumn("ingestion_time", F.current_timestamp()) \
        .withColumn("source_system", F.lit("batch_upload")) \
        .withColumn("year", F.year("timestamp")) \
        .withColumn("month", F.month("timestamp")) \
        .withColumn("day", F.dayofmonth("timestamp"))
    
    # ๅ†™ๅ…ฅDelta Lake๏ผˆๆ”ฏๆŒACIDไบ‹ๅŠก๏ผ‰
    bronze_df.write \
        .format("delta") \
        .mode("append") \
        .partitionBy("year", "month", "day") \
        .save(bronze_path)
    
    print(f"โœ… Ingested {bronze_df.count()} records to Bronze layer")

2๏ธโƒฃ Silver Layer - ๆธ…ๆด—ๅ’Œ้ชŒ่ฏ

็›ฎๆ ‡๏ผš็”Ÿไบง็บงๆ•ฐๆฎ่ดจ้‡๏ผŒๅฏ็›ดๆŽฅ็”จไบŽๅˆ†ๆž

# silver_transformation.py
from great_expectations.dataset import SparkDFDataset

def clean_to_silver(spark, bronze_path, silver_path):
    """
    ๆ•ฐๆฎ่ดจ้‡ๆฃ€ๆŸฅ + ๆธ…ๆด—
    """
    # ่ฏปๅ–Bronzeๆ•ฐๆฎ
    bronze_df = spark.read.format("delta").load(bronze_path)
    
    # 1. ๆ•ฐๆฎ่ดจ้‡ๆฃ€ๆŸฅ๏ผˆGreat Expectations๏ผ‰
    ge_df = SparkDFDataset(bronze_df)
    
    validation_results = ge_df.expect_column_values_to_not_be_null("timestamp")
    validation_results &= ge_df.expect_column_values_to_be_between("aG", -20, 20)
    # ... ๆ›ดๅคšๆฃ€ๆŸฅ
    
    # 2. ๅŽป้‡๏ผˆๅŸบไบŽtimestamp + device_id๏ผ‰
    deduped_df = bronze_df.dropDuplicates(["timestamp", "device_id"])
    
    # 3. ไผ ๆ„Ÿๅ™จๆ กๅ‡†
    calibrated_df = deduped_df \
        .withColumn("aG_calibrated", 
            F.when(F.col("aG").between(-20, 20), F.col("aG"))
             .otherwise(F.lit(None))
        )
    
    # 4. ๅผ‚ๅธธๅ€ผๆฃ€ๆต‹๏ผˆไฝฟ็”จZ-score๏ผ‰
    from pyspark.ml.feature import StandardScaler, VectorAssembler
    
    assembler = VectorAssembler(
        inputCols=["aG", "bG", "cG", "xG", "yG", "zG"],
        outputCol="features"
    )
    
    silver_df = calibrated_df \
        .withColumn("is_outlier", detect_outliers(F.col("features")))
    
    # 5. ๅ†™ๅ…ฅSilverๅฑ‚๏ผˆๅธฆSchema Evolution๏ผ‰
    silver_df.write \
        .format("delta") \
        .mode("append") \
        .option("mergeSchema", "true") \
        .partitionBy("year", "month", "day") \
        .save(silver_path)
    
    # 6. ๆ•ฐๆฎ่ดจ้‡ๆŠฅๅ‘Š
    quality_report = {
        "total_records": bronze_df.count(),
        "after_dedup": deduped_df.count(),
        "outliers_detected": silver_df.filter("is_outlier = true").count(),
        "null_percentage": calculate_null_rate(silver_df)
    }
    
    return quality_report

3๏ธโƒฃ Gold Layer - ็‰นๅพๅทฅ็จ‹

ๆ ธๅฟƒๆ”น้€ ๏ผšๅฐ†ไฝ ็š„็ช—ๅฃๆž„ๅปบ้€ป่พ‘ๆ”นไธบPySparkๅˆ†ๅธƒๅผ็‰ˆๆœฌ

# gold_feature_engineering.py

def build_windows_distributed(spark, silver_path, gold_path):
    """
    ๅˆ†ๅธƒๅผๆป‘ๅŠจ็ช—ๅฃ็‰นๅพๅทฅ็จ‹
    """
    silver_df = spark.read.format("delta").load(silver_path)
    
    # ๅ…ณ้”ฎ๏ผšไฝฟ็”จSpark็š„Windowๅ‡ฝๆ•ฐ
    from pyspark.sql.window import Window
    from pyspark.sql import functions as F
    
    # ๅฎšไน‰็ช—ๅฃ่ง„่Œƒ
    window_spec = Window \
        .partitionBy("device_id", "session_id") \
        .orderBy("timestamp") \
        .rowsBetween(-5000, 0)  # 5000ๆ ทๆœฌ็ช—ๅฃ
    
    # ่ฎก็ฎ—็ช—ๅฃ็‰นๅพ
    windowed_df = silver_df \
        .withColumn("window_id", 
            (F.row_number().over(window_spec) / 2500).cast("int")
        ) \
        .groupBy("device_id", "session_id", "window_id") \
        .agg(
            # ๆ”ถ้›†็ช—ๅฃๅ†…็š„ๆ—ถๅบๆ•ฐๆฎ
            F.collect_list(F.struct("aG", "bG", "cG", "xG", "yG", "zG")).alias("sequence"),
            F.collect_list("label").alias("labels"),
            
            # ็ปŸ่ฎก็‰นๅพ
            F.mean("aG").alias("aG_mean"),
            F.stddev("aG").alias("aG_std"),
            F.skewness("aG").alias("aG_skew"),
            F.kurtosis("aG").alias("aG_kurt"),
            # ... ๅ…ถไป–้€š้“
            
            # ้ข‘ๅŸŸ็‰นๅพ๏ผˆไฝฟ็”จUDF๏ผ‰
            compute_fft_features(F.col("sequence")).alias("fft_features"),
            
            # ๆ ‡็ญพๆŠ•็ฅจ
            mode_label(F.col("labels")).alias("window_label"),
            label_purity(F.col("labels")).alias("label_purity")
        ) \
        .filter(F.col("label_purity") >= 0.8)  # ไฝ ็š„MIN_LABEL_RATIO
    
    # ๅ†™ๅ…ฅGoldๅฑ‚
    windowed_df.write \
        .format("delta") \
        .mode("overwrite") \
        .partitionBy("window_label") \
        .save(gold_path)
    
    # ๆณจๅ†ŒๅˆฐFeature Store
    register_features_to_store(windowed_df, feature_table="smartpouch_windows_v1")

ๅ…ณ้”ฎUDFๅฎž็Žฐ๏ผš

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DoubleType
import numpy as np

@udf(returnType=ArrayType(DoubleType()))
def compute_fft_features(sequence):
    """่ฎก็ฎ—FFT็‰นๅพ๏ผˆ้ข‘ๅŸŸๅˆ†ๆž๏ผ‰"""
    if not sequence or len(sequence) < 100:
        return [0.0] * 10
    
    # ๆๅ–ๅ•้€š้“
    signal = [s['aG'] for s in sequence]
    
    # FFTๅ˜ๆข
    fft_vals = np.fft.fft(signal)
    power_spectrum = np.abs(fft_vals[:len(fft_vals)//2])
    
    # ๆๅ–้ข‘ๅŸŸ็ปŸ่ฎก็‰นๅพ
    features = [
        float(np.mean(power_spectrum)),
        float(np.std(power_spectrum)),
        float(np.max(power_spectrum)),
        # ... ไธป้ข‘็އใ€่ƒฝ้‡ๅˆ†ๅธƒ็ญ‰
    ]
    
    return features

@udf(returnType=StringType())
def mode_label(labels):
    """ๅคšๆ•ฐๆŠ•็ฅจ"""
    from collections import Counter
    if not labels:
        return None
    return Counter(labels).most_common(1)[0][0]

@udf(returnType=DoubleType())
def label_purity(labels):
    """ๆ ‡็ญพ็บฏๅบฆ"""
    if not labels:
        return 0.0
    from collections import Counter
    counts = Counter(labels)
    return counts.most_common(1)[0][1] / len(labels)

4๏ธโƒฃ Training Pipeline - ้›†ๆˆไฝ ็š„ๆจกๅž‹

# training_pipeline.py

class SmartPouchTrainingPipeline:
    def __init__(self, lakehouse_path, mlflow_uri):
        self.spark = SparkSession.builder \
            .appName("SmartPouch-Training") \
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
            .getOrCreate()
        
        self.lakehouse_path = lakehouse_path
        
        # MLflowๅฎž้ชŒ่ฟฝ่ธช
        import mlflow
        mlflow.set_tracking_uri(mlflow_uri)
        mlflow.set_experiment("smartpouch-cbam-lstm")
    
    def load_features_from_gold(self):
        """ไปŽGoldๅฑ‚ๅŠ ่ฝฝ็‰นๅพ"""
        gold_df = self.spark.read.format("delta").load(
            f"{self.lakehouse_path}/gold/features/"
        )
        
        # ่ฝฌๆขไธบPandas๏ผˆๅฐๆ•ฐๆฎ้›†๏ผ‰ๆˆ– ็ปง็ปญ็”จSpark ML
        pdf = gold_df.toPandas()
        
        # ่งฃๅŒ…ๅบๅˆ—ๆ•ฐๆฎ
        X = np.stack([
            np.array([list(s.values()) for s in seq]) 
            for seq in pdf['sequence']
        ])
        y = pdf['window_label'].map(self.label_encoder).values
        
        return X, y
    
    def train_with_mlflow(self):
        """่ฎญ็ปƒๅนถ่ฎฐๅฝ•ๅˆฐMLflow"""
        X, y = self.load_features_from_gold()
        
        with mlflow.start_run():
            # ่ฎฐๅฝ•ๅ‚ๆ•ฐ
            mlflow.log_params({
                "window_length": 5000,
                "model_architecture": "CBAM-CNN-BiLSTM",
                "optimizer": "Adam",
                "learning_rate": 1e-3
            })
            
            # ่ฎญ็ปƒไฝ ็š„ๆจกๅž‹๏ผˆไฟๆŒๅŽŸๆœ‰ไปฃ็ ๏ผ‰
            model = build_smartpouch_model(...)
            history = model.fit(X, y, ...)
            
            # ่ฎฐๅฝ•ๆŒ‡ๆ ‡
            mlflow.log_metrics({
                "train_accuracy": history.history['accuracy'][-1],
                "val_accuracy": history.history['val_accuracy'][-1]
            })
            
            # ไฟๅญ˜ๆจกๅž‹
            mlflow.tensorflow.log_model(model, "model")
            
            # ๆณจๅ†Œๆจกๅž‹ๅˆฐModel Registry
            mlflow.register_model(
                f"runs:/{mlflow.active_run().info.run_id}/model",
                "smartpouch-classifier"
            )

5๏ธโƒฃ Inference Pipeline - ๅฎžๆ—ถๆŽจ็†

# inference_service.py
from fastapi import FastAPI
from pyspark.sql import SparkSession
import mlflow

app = FastAPI()

# ๅŠ ่ฝฝๆจกๅž‹
model = mlflow.tensorflow.load_model("models:/smartpouch-classifier/Production")

# Sparkๅฎžไพ‹๏ผˆๅค็”จ่ฟžๆŽฅ๏ผ‰
spark = SparkSession.builder.getOrCreate()

@app.post("/predict")
async def predict(sensor_data: SensorInput):
    """
    ๅฎžๆ—ถๆŽจ็†API
    """
    # 1. ๅ†™ๅ…ฅBronzeๅฑ‚๏ผˆๆตๅผ๏ผ‰
    raw_df = spark.createDataFrame([sensor_data.dict()])
    raw_df.write.format("delta").mode("append").save("s3://bronze/")
    
    # 2. ๅฎžๆ—ถ็‰นๅพๅทฅ็จ‹
    features = extract_features_realtime(sensor_data)
    
    # 3. ๆจกๅž‹้ข„ๆต‹
    prediction = model.predict(features)
    
    # 4. ๅ†™ๅ…ฅ้ข„ๆต‹็ป“ๆžœ๏ผˆ็”จไบŽ็›‘ๆŽง๏ผ‰
    save_prediction_to_gold(prediction, sensor_data.device_id)
    
    return {
        "predicted_label": prediction,
        "confidence": float(prediction.max()),
        "timestamp": datetime.now()
    }

๐ŸŽฏ ๅ…ณ้”ฎๆŠ€ๆœฏไบฎ็‚น๏ผˆ้ข่ฏ•ๆ—ถๅผบ่ฐƒ๏ผ‰

1. Lambda Architectureๅ˜ไฝ“

ๆ‰นๅค„็†่ทฏๅพ„๏ผˆๅކๅฒๆ•ฐๆฎ๏ผ‰๏ผšBronze โ†’ Silver โ†’ Gold โ†’ Training
             โ†“
ๅฎžๆ—ถ่ทฏๅพ„๏ผˆๆ–ฐๆ•ฐๆฎ๏ผ‰๏ผšStreaming โ†’ Feature Store โ†’ Inference
             โ†“
็ปŸไธ€ๆœๅŠกๅฑ‚๏ผšDelta Lakeๆไพ›ไธ€่‡ดๆ€ง่ง†ๅ›พ

2. ๆ•ฐๆฎ็‰ˆๆœฌๆŽงๅˆถ

# Delta Lake็š„ๆ—ถ้—ดๆ—…่กŒ
df_v1 = spark.read.format("delta").option("versionAsOf", 1).load(...)
df_v2 = spark.read.format("delta").option("timestampAsOf", "2024-01-01").load(...)

# Schema Evolution
spark.read.format("delta").option("mergeSchema", "true").load(...)

3. ๅฏ่ง‚ๆต‹ๆ€งๆ ˆ

็›‘ๆŽง็ป„ไปถ:
  - Databricks Dashboard: ๆ•ฐๆฎ่ดจ้‡็›‘ๆŽง
  - MLflow: ๆจกๅž‹ๆ€ง่ƒฝ่ฟฝ่ธช
  - Prometheus + Grafana: ๆŽจ็†ๅปถ่ฟŸ็›‘ๆŽง
  - Great Expectations: ๆ•ฐๆฎๅฅ‘็บฆ้ชŒ่ฏ

๐Ÿ“Š ๆˆๆœฌๅ’Œๆ€ง่ƒฝไผฐ็ฎ—

ๅฑ‚็บง ๆ•ฐๆฎ้‡ ๅค„็†ๆ—ถ้—ด ๆˆๆœฌ๏ผˆๆœˆ๏ผ‰
Bronze (ๅŽŸๅง‹) 1TB - โ‚ฌ10 (S3ๅญ˜ๅ‚จ)
Silver (ๆธ…ๆด—) 800GB 30min/ๆ‰นๆฌก โ‚ฌ8
Gold (็‰นๅพ) 200GB 2ๅฐๆ—ถ/ๆ‰นๆฌก โ‚ฌ5
ๆ€ป่ฎก 2TB <3ๅฐๆ—ถ โ‚ฌ23/ๆœˆ

็Žฐๅœจๅ‘Š่ฏ‰ๆˆ‘ไฝ ็š„SoloLakehouse็Žฏๅขƒ๏ผŒๆˆ‘ไผš็ป™ไฝ ๏ผš

  1. ๅ…ทไฝ“็š„้ƒจ็ฝฒ้…็ฝฎ๏ผˆDocker Compose / Terraform่„šๆœฌ๏ผ‰
  2. ็ซฏๅˆฐ็ซฏ็š„ไปฃ็ ๅฎž็Žฐ๏ผˆๅฏ็›ดๆŽฅ่ฟ่กŒ๏ผ‰
  3. ็ฎ€ๅކ้กน็›ฎๆ่ฟฐๆจกๆฟ๏ผˆ็ชๅ‡บๆžถๆž„่ƒฝๅŠ›๏ผ‰

่ฟ™ไธชๆžถๆž„่ƒฝๅฑ•็คบ๏ผš โœ… ๆ•ฐๆฎๅทฅ็จ‹๏ผˆLakehouseๆžถๆž„๏ผ‰ โœ… MLๅทฅ็จ‹๏ผˆ็ซฏๅˆฐ็ซฏpipeline๏ผ‰ โœ… ๅนณๅฐๆ€็ปด๏ผˆๅฏๆ‰ฉๅฑ•ใ€ๅฏ็›‘ๆŽง๏ผ‰ โœ… ๆˆๆœฌๆ„่ฏ†๏ผˆๆททๅˆ่ฎก็ฎ—็ญ–็•ฅ๏ผ‰

ๅฏน100k+ offer็š„ๅธฎๅŠฉ๏ผšโญโญโญโญโญ๏ผˆ่ฟ™ๆ˜ฏๅนณๅฐๆžถๆž„ๅธˆ็š„ๆ ธๅฟƒ่ƒฝๅŠ›๏ผ‰