πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Model Monitoring & Drift Detection

⭐ Premium

Advertisement

Model Monitoring & Drift Detection

A model that worked perfectly in staging can silently fail in production. Learn to detect when your model is degrading and implement safe deployment strategies.

Types of Model Degradation

Model Drift Detection FlowProductionDataStatisticalTests (KS, PSI)DriftDetectionAlertSystemRetrainModelInput DistributionCompare to Referencep-value < 0.05?Notify TeamUpdate Reference

Statistical Drift Detection

import numpy as np
from scipy import stats
from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime

@dataclass
class DriftResult:
    feature: str
    statistic: float
    p_value: float
    drifted: bool
    method: str

class DriftDetector:
    def __init__(self, reference_data, significance_level=0.05):
        self.reference = reference_data
        self.alpha = significance_level
    
    def ks_test(self, current_data: np.ndarray, feature_idx: int) -> DriftResult:
        stat, p_value = stats.ks_2samp(
            self.reference[:, feature_idx], 
            current_data[:, feature_idx]
        )
        return DriftResult(
            feature=f"feature_{feature_idx}",
            statistic=stat,
            p_value=p_value,
            drifted=p_value < self.alpha,
            method="ks_test"
        )
    
    def psi(self, expected, actual, buckets=10) -> float:
        """Population Stability Index"""
        expected_pct = np.histogram(expected, bins=buckets)[0] / len(expected) + 1e-6
        actual_pct = np.histogram(actual, bins=buckets)[0] / len(actual) + 1e-6
        
        psi = np.sum((actual_pct - expected_pct) * np.log(actual_pct / expected_pct))
        return psi
    
    def wasserstein_distance(self, current_data: np.ndarray, feature_idx: int) -> float:
        return stats.wasserstein_distance(
            self.reference[:, feature_idx],
            current_data[:, feature_idx]
        )
    
    def detect_all(self, current_data: np.ndarray) -> List[DriftResult]:
        results = []
        for i in range(current_data.shape[1]):
            result = self.ks_test(current_data, i)
            results.append(result)
        return results
    
    def should_alert(self, results: List[DriftResult]) -> bool:
        drifted_count = sum(1 for r in results if r.drifted)
        return drifted_count > len(results) * 0.3  # Alert if 30%+ features drift

Concept Drift Detection

Concept drift means the relationship between features and target changes.

from sklearn.linear_model import SGDClassifier
from collections import deque
import numpy as np

class ADWIN:
    """Adaptive Windowing for concept drift detection"""
    
    def __init__(self, delta=0.002):
        self.delta = delta
        self.window = deque()
    
    def update(self, value):
        self.window.append(value)
        if len(self.window) > 10:
            self._check_drift()
    
    def _check_drift(self):
        n = len(self.window)
        for i in range(1, n):
            left = list(self.window)[:i]
            right = list(self.window)[i:]
            
            mean_left = np.mean(left)
            mean_right = np.mean(right)
            
            n_left = len(left)
            n_right = len(right)
            
            epsilon = np.sqrt(
                (1.0 / (2 * n_left) + 1.0 / (2 * n_right)) * 
                2 * np.log(2 * n / self.delta)
            )
            
            if abs(mean_left - mean_right) > epsilon:
                # Drift detected, shrink window
                self.window = deque(right)
                return True
        return False

class ConceptDriftDetector:
    def __init__(self, window_size=1000):
        self.window_size = window_size
        self.predictions = deque(maxlen=window_size)
        self.actuals = deque(maxlen=window_size)
        self.adwin = ADWIN()
    
    def update(self, prediction, actual):
        error = 1.0 if prediction != actual else 0.0
        self.predictions.append(prediction)
        self.actuals.append(actual)
        return self.adwin.update(error)
    
    def get_error_rate(self):
        if len(self.actuals) == 0:
            return 0.0
        errors = sum(p != a for p, a in zip(self.predictions, self.actuals))
        return errors / len(self.actuals)

class OnlineLearningDetector:
    def __init__(self, model=None):
        self.model = model or SGDClassifier(loss='log_loss')
        self.batch_X = []
        self.batch_y = []
        self.batch_size = 100
    
    def partial_fit(self, X, y):
        self.batch_X.extend(X)
        self.batch_y.extend(y)
        
        if len(self.batch_X) >= self.batch_size:
            self.model.partial_fit(
                np.array(self.batch_X), 
                np.array(self.batch_y),
                classes=np.unique(self.batch_y)
            )
            self.batch_X = []
            self.batch_y = []

Shadow Deployments

Run the new model alongside production without affecting users.

import time
import json
from datetime import datetime
from typing import Dict, Any

class ShadowDeployment:
    def __init__(self, production_model, candidate_model):
        self.production = production_model
        self.candidate = candidate_model
        self.metrics = {
            "production": {"latencies": [], "predictions": []},
            "candidate": {"latencies": [], "predictions": []}
        }
    
    def predict(self, features):
        # Production prediction (used by users)
        start = time.time()
        prod_pred = self.production.predict(features)
        prod_latency = (time.time() - start) * 1000
        
        # Candidate prediction (shadow, not served)
        start = time.time()
        cand_pred = self.candidate.predict(features)
        cand_latency = (time.time() - start) * 1000
        
        self.metrics["production"]["latencies"].append(prod_latency)
        self.metrics["production"]["predictions"].append(prod_pred)
        self.metrics["candidate"]["latencies"].append(cand_latency)
        self.metrics["candidate"]["predictions"].append(cand_pred)
        
        return prod_pred  # Only production goes to user
    
    def get_comparison(self):
        return {
            "production_latency_p50": np.percentile(self.metrics["production"]["latencies"], 50),
            "production_latency_p99": np.percentile(self.metrics["production"]["latencies"], 99),
            "candidate_latency_p50": np.percentile(self.metrics["candidate"]["latencies"], 50),
            "candidate_latency_p99": np.percentile(self.metrics["candidate"]["latencies"], 99),
            "prediction_agreement": np.mean(
                np.array(self.metrics["production"]["predictions"]) == 
                np.array(self.metrics["candidate"]["predictions"])
            )
        }

Canary Releases

Gradually shift traffic to the new model.

import random
from dataclasses import dataclass

@dataclass
class CanaryConfig:
    initial_traffic_pct: float = 0.05
    increment_pct: float = 0.05
    max_traffic_pct: float = 1.0
    evaluation_window_hours: float = 24
    min_samples: int = 1000
    rollback_threshold: float = 0.05  # Rollback if performance drops 5%

class CanaryRelease:
    def __init__(self, production_model, canary_model, config: CanaryConfig):
        self.production = production_model
        self.canary = canary_model
        self.config = config
        self.current_traffic_pct = config.initial_traffic_pct
        self.production_metrics = []
        self.canary_metrics = []
    
    def route_request(self, request_id):
        if random.random() < self.current_traffic_pct:
            return "canary", self.canary
        return "production", self.production
    
    def record_outcome(self, variant, prediction, actual):
        correct = prediction == actual
        if variant == "production":
            self.production_metrics.append(correct)
        else:
            self.canary_metrics.append(correct)
    
    def should_promote(self):
        if len(self.canary_metrics) < self.config.min_samples:
            return None
        
        prod_rate = np.mean(self.production_metrics[-self.config.min_samples:])
        canary_rate = np.mean(self.canary_metrics[-self.config.min_samples:])
        
        if canary_rate < prod_rate - self.config.rollback_threshold:
            return "rollback"
        
        if canary_rate >= prod_rate:
            if self.current_traffic_pct < self.config.max_traffic_pct:
                self.current_traffic_pct = min(
                    self.current_traffic_pct + self.config.increment_pct,
                    self.config.max_traffic_pct
                )
                return "promote"
        
        return "maintain"

Key Takeaways

  • Monitor both data distributions and model performance metrics
  • Use ADWIN or Page-Hinkley tests for automated concept drift detection
  • Shadow deployments let you validate models without user impact
  • Canary releases provide gradual, reversible rollouts with automatic rollback triggers

Advertisement