πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Real-Time ML Pipelines: Streaming Feature Engineering

Data EngineeringML Infrastructure⭐ Premium

Advertisement

Real-Time ML Pipelines: Streaming Feature Engineering

Difficulty: Senior Level | Companies: Uber, Netflix, Stripe, DoorDash, Instacart

1. Batch vs Real-Time ML Pipelines

Architecture Diagram
Batch ML Pipeline (traditional):
Source β†’ Batch ETL β†’ Feature Store β†’ Model Training β†’ Batch Prediction
(Hourly/Daily)

Real-Time ML Pipeline:
Source β†’ Stream Processing β†’ Feature Store β†’ Model Serving β†’ Online Prediction
(Millisecond/Latency < 100ms)

Latency Requirements

Use CaseLatencyExample
Recommendation< 100msProduct recommendations
Fraud Detection< 50msTransaction scoring
Ad Bidding< 10msReal-time CTR prediction
Search Ranking< 50msQuery result ranking
Content Moderation< 5sUser-generated content

ℹ️

Key Insight: Real-time ML isn't just about streaming data β€” it's about streaming FEATURES. The feature computation is often the bottleneck, not the model inference.

2. Streaming Feature Engineering

Kafka β†’ Spark Structured Streaming β†’ Feature Store

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("RealTimeFeatures") \
    .config("spark.sql.streaming.schemaInference", "true") \
    .getOrCreate()

# Read from Kafka
raw_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "user_events") \
    .option("startingOffsets", "latest") \
    .load()

# Parse JSON events
events = raw_stream \
    .select(F.from_json(F.col("value").cast("string"), event_schema).alias("data")) \
    .select("data.*")

# Compute windowed features
windowed_features = events \
    .withWatermark("event_timestamp", "10 minutes") \
    .groupBy(
        F.col("user_id"),
        F.window("event_timestamp", "1 hour", "5 minutes")
    ).agg(
        F.count("*").alias("events_last_hour"),
        F.sum(F.when(F.col("event_type") == "PURCHASE", F.col("amount")).otherwise(0)).alias("spend_last_hour"),
        F.countDistinct("session_id").alias("sessions_last_hour"),
    )

# Write to feature store (online)
query = windowed_features.writeStream \
    .format("redis") \
    .option("checkpointLocation", "s3://checkpoints/user-features") \
    .outputMode("update") \
    .start()

# Write to feature store (offline)
offline_query = windowed_features.writeStream \
    .format("delta") \
    .option("checkpointLocation", "s3://checkpoints/user-features-offline") \
    .trigger(processingTime="5 minutes") \
    .start("s3://feature-store/user_features_realtime")

Flink Feature Engineering

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
tenv = StreamTableEnvironment.create(env)

# Define source table from Kafka
tenv.execute_sql("""
    CREATE TABLE user_events (
        event_id STRING,
        user_id BIGINT,
        event_type STRING,
        amount DOUBLE,
        event_timestamp TIMESTAMP(3),
        WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_events',
        'properties.bootstrap.servers' = 'kafka:9092',
        'format' = 'json',
        'scan.startup.mode' = 'latest-offset'
    )
""")

# Windowed aggregation
tenv.execute_sql("""
    CREATE TABLE user_features_realtime AS
    SELECT
        user_id,
        TUMBLE_START(event_timestamp, INTERVAL '1' HOUR) AS window_start,
        COUNT(*) AS events_last_hour,
        SUM(CASE WHEN event_type = 'PURCHASE' THEN amount ELSE 0 END) AS spend_last_hour,
        COUNT(DISTINCT event_id) AS unique_events
    FROM user_events
    GROUP BY user_id, TUMBLE(event_timestamp, INTERVAL '1' HOUR)
""")

3. Online Model Serving

Real-Time Inference Architecture

Real-Time Inference Request FlowClient (App)β†’API GW (FastAPI)β†’Feature Fetch (Redis)β†’Model Serving↓ Prediction Response↓ Feature Store (Online)

FastAPI Model Server

from fastapi import FastAPI
import numpy as np
import redis
import joblib
from pydantic import BaseModel
from typing import Dict, List

app = FastAPI()
redis_client = redis.Redis(host="redis", port=6379)
model = joblib.load("model.pkl")

class PredictionRequest(BaseModel):
    user_id: int
    context: Dict

class PredictionResponse(BaseModel):
    user_id: int
    prediction: float
    features_used: Dict[str, float]

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    # 1. Fetch features from online store (< 5ms)
    features = fetch_features(request.user_id)
    
    # 2. Create feature vector
    feature_vector = prepare_feature_vector(features, request.context)
    
    # 3. Run model inference (< 10ms)
    prediction = model.predict_proba(feature_vector.reshape(1, -1))[0][1]
    
    return PredictionResponse(
        user_id=request.user_id,
        prediction=prediction,
        features_used=features
    )

def fetch_features(user_id: int) -> Dict[str, float]:
    """Fetch from online feature store"""
    pipe = redis_client.pipeline()
    feature_names = ["avg_purchase_30d", "purchase_count_7d", "days_since_last"]
    for name in feature_names:
        pipe.hget(f"user_features:{user_id}", name)
    values = pipe.execute()
    return {name: float(v) if v else 0.0 for name, v in zip(feature_names, values)}

4. Feature Freshness Monitoring

from datetime import datetime, timedelta

class FeatureFreshnessMonitor:
    def __init__(self, feature_store):
        self.feature_store = feature_store
        self.alerts = []
    
    def check_freshness(self, feature_group: str, max_age_minutes: int):
        """Check if features are fresh enough"""
        latest_timestamp = self.feature_store.get_latest_timestamp(feature_group)
        age = datetime.now() - latest_timestamp
        
        if age > timedelta(minutes=max_age_minutes):
            self.alerts.append({
                "feature_group": feature_group,
                "age_minutes": age.total_seconds() / 60,
                "max_age_minutes": max_age_minutes,
                "severity": "CRITICAL" if age > timedelta(hours=1) else "WARNING"
            })
            return False
        return True
    
    def check_feature_coverage(self, feature_group: str, entity_ids: List[int]):
        """Check what percentage of entities have features"""
        total = len(entity_ids)
        present = sum(1 for eid in entity_ids 
                     if self.feature_store.has_features(eid, feature_group))
        
        coverage = present / total
        if coverage < 0.95:
            self.alerts.append({
                "feature_group": feature_group,
                "coverage": coverage,
                "missing_count": total - present,
                "severity": "HIGH"
            })
        
        return coverage

5. Lambda vs Kappa Architecture for ML

Architecture Diagram
Lambda Architecture (pragmatic):
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Batch Layer                      β”‚
β”‚   (Historical features, model training)       β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚              Speed Layer                      β”‚
β”‚   (Real-time features, online prediction)     β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚              Serving Layer                    β”‚
β”‚   (Merged batch + real-time features)         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Kappa Architecture (simpler):
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚           Stream Processing                   β”‚
β”‚   (All computation through event stream)      β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚           Serving Layer                       β”‚
β”‚   (Features + Predictions)                    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

ℹ️

Best Practice: Use Kappa for new ML systems (simpler ops). Use Lambda when you need batch replay for model retraining. The feature store is the bridge between both.

Follow-Up Questions

  1. How do you handle feature staleness in real-time predictions?
  2. Design a real-time fraud detection pipeline with < 50ms latency.
  3. How do you A/B test between batch and real-time ML models?
  4. How would you handle model retraining with streaming features?
  5. Design a real-time recommendation system for 100M+ users.

Advertisement