πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Production Monitoring: Latency, Throughput, Accuracy Decay

MLOpsML Monitoring⭐ Premium

Advertisement

Interview Question (Hard) β€” Asked at: Google, Netflix, Uber, Amazon, Stripe

"Design a comprehensive ML monitoring system that tracks latency, throughput, accuracy decay, and data drift. How do you implement alerting, dashboards, and automated remediation?"

ML Monitoring Architecture

ML monitoring is critical for maintaining model performance in production. It encompasses infrastructure monitoring, model performance monitoring, and business metrics tracking.

Monitoring Architecture Diagram

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                ML Monitoring Architecture                        β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚  β”‚  Data    │───▢│  Model   │───▢│Business  │───▢│ Alert    β”‚ β”‚
β”‚  β”‚Collectionβ”‚    β”‚ Metrics  β”‚    β”‚ Metrics  β”‚    β”‚ Manager  β”‚ β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚       β”‚              β”‚              β”‚                β”‚         β”‚
β”‚       β–Ό              β–Ό              β–Ό                β–Ό         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚  β”‚Prometheusβ”‚    β”‚ Grafana  β”‚    β”‚PagerDuty β”‚    β”‚Auto      β”‚ β”‚
β”‚  β”‚Thanos    β”‚    β”‚Dashboardsβ”‚    β”‚Slack     β”‚    β”‚Remediationβ”‚β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚              Time Series Database                        β”‚   β”‚
β”‚  β”‚         (Prometheus / InfluxDB / CloudWatch)             β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Infrastructure Monitoring

Prometheus Metrics Collection

from prometheus_client import (
    Counter, Histogram, Gauge, Summary,
    start_http_server, REGISTRY
)
from prometheus_client.core import GaugeMetricFamily
import time
import numpy as np
from typing import Dict, Optional
from datetime import datetime

class MLPrometheusMetrics:
    """Prometheus metrics for ML systems."""
    
    def __init__(self, prefix: str = "ml"):
        self.prefix = prefix
        
        # Inference metrics
        self.inference_latency = Histogram(
            f'{prefix}_inference_latency_seconds',
            'Inference latency in seconds',
            ['model_name', 'model_version', 'endpoint'],
            buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5]
        )
        
        self.inference_requests = Counter(
            f'{prefix}_inference_requests_total',
            'Total inference requests',
            ['model_name', 'model_version', 'status']
        )
        
        self.inference_batch_size = Histogram(
            f'{prefix}_inference_batch_size',
            'Batch size distribution',
            ['model_name', 'model_version'],
            buckets=[1, 2, 4, 8, 16, 32, 64, 128, 256]
        )
        
        # Model performance metrics
        self.model_accuracy = Gauge(
            f'{prefix}_model_accuracy',
            'Current model accuracy',
            ['model_name', 'model_version', 'metric_type']
        )
        
        self.prediction_distribution = Histogram(
            f'{prefix}_prediction_distribution',
            'Distribution of predictions',
            ['model_name', 'model_version'],
            buckets=np.arange(0, 1.1, 0.1).tolist()
        )
        
        # Data drift metrics
        self.feature_drift_score = Gauge(
            f'{prefix}_feature_drift_score',
            'Feature drift score (KS statistic)',
            ['model_name', 'feature_name']
        )
        
        self.data_quality_score = Gauge(
            f'{prefix}_data_quality_score',
            'Data quality score',
            ['model_name', 'quality_check']
        )
        
        # Resource metrics
        self.gpu_utilization = Gauge(
            f'{prefix}_gpu_utilization_percent',
            'GPU utilization percentage',
            ['model_name', 'gpu_id']
        )
        
        self.memory_usage = Gauge(
            f'{prefix}_memory_usage_bytes',
            'Memory usage in bytes',
            ['model_name', 'container']
        )
        
        self.queue_depth = Gauge(
            f'{prefix}_queue_depth',
            'Request queue depth',
            ['model_name', 'queue_name']
        )
    
    def record_inference(self, model_name: str, model_version: str,
                        latency: float, batch_size: int,
                        predictions: np.ndarray, status: str = "success"):
        """Record inference metrics."""
        
        self.inference_latency.labels(
            model_name=model_name,
            model_version=model_version,
            endpoint="predict"
        ).observe(latency)
        
        self.inference_requests.labels(
            model_name=model_name,
            model_version=model_version,
            status=status
        ).inc()
        
        self.inference_batch_size.labels(
            model_name=model_name,
            model_version=model_version
        ).observe(batch_size)
        
        # Record prediction distribution
        for pred in predictions:
            self.prediction_distribution.labels(
                model_name=model_name,
                model_version=model_version
            ).observe(float(pred))
    
    def update_model_performance(self, model_name: str, 
                                model_version: str,
                                metrics: Dict[str, float]):
        """Update model performance metrics."""
        
        for metric_name, value in metrics.items():
            self.model_accuracy.labels(
                model_name=model_name,
                model_version=model_version,
                metric_type=metric_name
            ).set(value)
    
    def update_drift_scores(self, model_name: str,
                           drift_scores: Dict[str, float]):
        """Update feature drift scores."""
        
        for feature_name, score in drift_scores.items():
            self.feature_drift_score.labels(
                model_name=model_name,
                feature_name=feature_name
            ).set(score)
    
    def update_resource_metrics(self, model_name: str,
                               gpu_util: float,
                               memory_bytes: int,
                               queue_depth: int):
        """Update resource utilization metrics."""
        
        self.gpu_utilization.labels(
            model_name=model_name,
            gpu_id="0"
        ).set(gpu_util)
        
        self.memory_usage.labels(
            model_name=model_name,
            container="serving"
        ).set(memory_bytes)
        
        self.queue_depth.labels(
            model_name=model_name,
            queue_name="inference"
        ).set(queue_depth)

class CustomCollector:
    """Custom Prometheus collector for ML-specific metrics."""
    
    def __init__(self, model_registry):
        self.model_registry = model_registry
    
    def collect(self):
        """Collect custom metrics."""
        
        # Model registry metrics
        models = self.model_registry.get_all_models()
        
        model_info = GaugeMetricFamily(
            'ml_model_info',
            'Model metadata',
            labels=['model_name', 'version', 'framework', 'created_at']
        )
        
        for model in models:
            model_info.add_metric(
                [model['name'], model['version'], 
                 model['framework'], model['created_at']],
                1
            )
        
        yield model_info
        
        # Model staleness
        staleness = GaugeMetricFamily(
            'ml_model_staleness_hours',
            'Hours since model was last retrained',
            labels=['model_name', 'version']
        )
        
        for model in models:
            hours_since_retrain = (
                datetime.now() - datetime.fromisoformat(model['last_retrained'])
            ).total_seconds() / 3600
            
            staleness.add_metric(
                [model['name'], model['version']],
                hours_since_retrain
            )
        
        yield staleness

# Start metrics server
def start_metrics_server(port: int = 8000):
    """Start Prometheus metrics server."""
    start_http_server(port)
    print(f"Metrics server started on port {port}")

ℹ️

Use Prometheus for time-series metrics collection and Grafana for visualization. Implement custom collectors for ML-specific metrics like model staleness and feature drift.

Latency Monitoring

Latency Tracking Implementation

import time
import numpy as np
from typing import List, Dict, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import threading
from collections import deque

@dataclass
class LatencyMeasurement:
    timestamp: datetime
    latency_ms: float
    model_name: str
    model_version: str
    batch_size: int

class LatencyMonitor:
    """Monitor and analyze inference latency."""
    
    def __init__(self, window_size: int = 1000):
        self.window_size = window_size
        self.measurements = deque(maxlen=window_size)
        self.lock = threading.Lock()
        
        # Latency percentiles
        self.percentiles = [50, 90, 95, 99, 99.9]
    
    def record_latency(self, latency_ms: float, model_name: str,
                      model_version: str, batch_size: int = 1):
        """Record a latency measurement."""
        
        measurement = LatencyMeasurement(
            timestamp=datetime.now(),
            latency_ms=latency_ms,
            model_name=model_name,
            model_version=model_version,
            batch_size=batch_size
        )
        
        with self.lock:
            self.measurements.append(measurement)
    
    def get_latency_statistics(self, model_name: str = None,
                              window_minutes: int = 5) -> Dict:
        """Get latency statistics for a time window."""
        
        with self.lock:
            cutoff_time = datetime.now() - timedelta(minutes=window_minutes)
            
            recent_measurements = [
                m for m in self.measurements
                if m.timestamp > cutoff_time and
                (model_name is None or m.model_name == model_name)
            ]
        
        if not recent_measurements:
            return {'status': 'no_data'}
        
        latencies = [m.latency_ms for m in recent_measurements]
        
        stats = {
            'count': len(latencies),
            'mean': np.mean(latencies),
            'std': np.std(latencies),
            'min': np.min(latencies),
            'max': np.max(latencies),
        }
        
        # Calculate percentiles
        for p in self.percentiles:
            stats[f'p{p}'] = np.percentile(latencies, p)
        
        # Calculate throughput
        time_span = (
            recent_measurements[-1].timestamp - 
            recent_measurements[0].timestamp
        ).total_seconds()
        
        if time_span > 0:
            stats['throughput_rps'] = len(latencies) / time_span
        
        # Calculate error rate (if latency > threshold)
        threshold_ms = 1000  # 1 second
        stats['slow_requests'] = sum(
            1 for l in latencies if l > threshold_ms
        )
        stats['slow_request_rate'] = (
            stats['slow_requests'] / len(latencies)
        )
        
        return stats
    
    def detect_latency_anomalies(self, model_name: str,
                                 window_minutes: int = 5,
                                 threshold_std: float = 3.0) -> List[Dict]:
        """Detect latency anomalies using statistical methods."""
        
        stats = self.get_latency_statistics(model_name, window_minutes)
        
        if 'status' in stats:
            return []
        
        anomalies = []
        
        # Check if current latency is above threshold
        with self.lock:
            if self.measurements:
                latest = self.measurements[-1]
                mean = stats['mean']
                std = stats['std']
                
                z_score = (latest.latency_ms - mean) / std if std > 0 else 0
                
                if abs(z_score) > threshold_std:
                    anomalies.append({
                        'type': 'latency_spike',
                        'timestamp': latest.timestamp,
                        'latency_ms': latest.latency_ms,
                        'z_score': z_score,
                        'mean': mean,
                        'std': std,
                        'severity': 'high' if z_score > 4 else 'medium'
                    })
        
        return anomalies
    
    def predict_sla_breach(self, model_name: str,
                          sla_latency_ms: float,
                          prediction_window_minutes: int = 30) -> Dict:
        """Predict potential SLA breaches."""
        
        stats = self.get_latency_statistics(model_name, window_minutes=5)
        
        if 'status' in stats:
            return {'prediction': 'insufficient_data'}
        
        # Simple linear extrapolation
        recent_trend = self._calculate_trend(model_name, window_minutes=5)
        
        predicted_latency = stats['p99'] + (recent_trend * prediction_window_minutes)
        
        breach_probability = min(1.0, max(0.0,
            (predicted_latency - sla_latency_ms) / sla_latency_ms
        ))
        
        return {
            'current_p99': stats['p99'],
            'predicted_latency': predicted_latency,
            'sla_latency': sla_latency_ms,
            'breach_probability': breach_probability,
            'prediction_window_minutes': prediction_window_minutes,
            'recommendation': self._get_recommendation(breach_probability)
        }
    
    def _calculate_trend(self, model_name: str, 
                        window_minutes: int) -> float:
        """Calculate latency trend (ms per minute)."""
        
        with self.lock:
            cutoff_time = datetime.now() - timedelta(minutes=window_minutes)
            
            recent = [
                m for m in self.measurements
                if m.timestamp > cutoff_time and m.model_name == model_name
            ]
        
        if len(recent) < 10:
            return 0.0
        
        # Simple linear regression
        x = np.arange(len(recent))
        y = [m.latency_ms for m in recent]
        
        slope = np.polyfit(x, y, 1)[0]
        
        return slope
    
    def _get_recommendation(self, breach_probability: float) -> str:
        """Get recommendation based on breach probability."""
        
        if breach_probability > 0.8:
            return "URGENT: Immediate action required. Scale up or rollback."
        elif breach_probability > 0.5:
            return "WARNING: High risk of SLA breach. Consider scaling up."
        elif breach_probability > 0.2:
            return "MONITOR: Moderate risk. Continue monitoring closely."
        else:
            return "OK: Low risk of SLA breach."

class LatencyAlertManager:
    """Manage latency-based alerts."""
    
    def __init__(self, config: Dict):
        self.config = config
        self.alert_history = []
    
    def check_latency_alerts(self, latency_stats: Dict) -> List[Dict]:
        """Check for latency-based alerts."""
        
        alerts = []
        
        # P99 latency alert
        if 'p99' in latency_stats:
            threshold = self.config.get('p99_threshold_ms', 200)
            if latency_stats['p99'] > threshold:
                alerts.append({
                    'type': 'high_p99_latency',
                    'severity': 'high',
                    'metric': 'p99_latency',
                    'value': latency_stats['p99'],
                    'threshold': threshold,
                    'message': f"P99 latency {latency_stats['p99']:.2f}ms exceeds threshold {threshold}ms"
                })
        
        # Throughput drop alert
        if 'throughput_rps' in latency_stats:
            threshold = self.config.get('min_throughput_rps', 100)
            if latency_stats['throughput_rps'] < threshold:
                alerts.append({
                    'type': 'low_throughput',
                    'severity': 'medium',
                    'metric': 'throughput_rps',
                    'value': latency_stats['throughput_rps'],
                    'threshold': threshold,
                    'message': f"Throughput {latency_stats['throughput_rps']:.2f} rps below threshold {threshold} rps"
                })
        
        # Slow request rate alert
        if 'slow_request_rate' in latency_stats:
            threshold = self.config.get('max_slow_request_rate', 0.01)
            if latency_stats['slow_request_rate'] > threshold:
                alerts.append({
                    'type': 'high_slow_request_rate',
                    'severity': 'medium',
                    'metric': 'slow_request_rate',
                    'value': latency_stats['slow_request_rate'],
                    'threshold': threshold,
                    'message': f"Slow request rate {latency_stats['slow_request_rate']:.4f} exceeds threshold {threshold}"
                })
        
        return alerts

⚠️

Monitor latency at multiple percentiles (p50, p90, p95, p99). Mean latency can be misleading - focus on tail latencies for SLA compliance.

Accuracy Decay Detection

Model Performance Monitoring

import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timedelta
from scipy import stats
from sklearn.metrics import (
    roc_auc_score, precision_score, recall_score,
    f1_score, confusion_matrix
)

class AccuracyDecayDetector:
    """Detect model accuracy decay in production."""
    
    def __init__(self, baseline_metrics: Dict[str, float],
                 decay_thresholds: Dict[str, float] = None):
        """
        Args:
            baseline_metrics: Baseline performance metrics
            decay_thresholds: Thresholds for each metric to trigger alert
        """
        self.baseline_metrics = baseline_metrics
        self.decay_thresholds = decay_thresholds or {
            'auc_roc': 0.05,  # 5% absolute drop
            'precision': 0.10,
            'recall': 0.10,
            'f1_score': 0.10
        }
        
        self.metric_history = []
        self.alerts = []
    
    def add_observation(self, y_true: np.ndarray, y_pred: np.ndarray,
                       y_proba: np.ndarray = None,
                       timestamp: datetime = None):
        """Add a batch of observations for tracking."""
        
        if timestamp is None:
            timestamp = datetime.now()
        
        # Calculate metrics
        metrics = {
            'timestamp': timestamp,
            'n_samples': len(y_true),
            'positive_rate': float(np.mean(y_true)),
            'prediction_rate': float(np.mean(y_pred))
        }
        
        if y_proba is not None:
            metrics['auc_roc'] = float(roc_auc_score(y_true, y_proba))
        
        metrics['precision'] = float(precision_score(y_true, y_pred, zero_division=0))
        metrics['recall'] = float(recall_score(y_true, y_pred, zero_division=0))
        metrics['f1_score'] = float(f1_score(y_true, y_pred, zero_division=0))
        
        # Confusion matrix
        cm = confusion_matrix(y_true, y_pred)
        if cm.shape == (2, 2):
            tn, fp, fn, tp = cm.ravel()
            metrics['true_positives'] = int(tp)
            metrics['false_positives'] = int(fp)
            metrics['true_negatives'] = int(tn)
            metrics['false_negatives'] = int(fn)
        
        self.metric_history.append(metrics)
        
        # Check for decay
        decay_detected = self.check_decay()
        
        return metrics
    
    def check_decay(self) -> bool:
        """Check for accuracy decay."""
        
        if len(self.metric_history) < 10:
            return False
        
        decay_detected = False
        
        for metric_name, threshold in self.decay_thresholds.items():
            if metric_name not in self.baseline_metrics:
                continue
            
            # Get recent values
            recent_values = [
                m.get(metric_name) for m in self.metric_history[-100:]
                if m.get(metric_name) is not None
            ]
            
            if len(recent_values) < 10:
                continue
            
            baseline = self.baseline_metrics[metric_name]
            current_mean = np.mean(recent_values)
            
            # Calculate decay
            decay = baseline - current_mean
            
            # Statistical significance test
            t_stat, p_value = stats.ttest_1samp(
                recent_values, baseline
            )
            
            if decay > threshold and p_value < 0.05:
                decay_detected = True
                
                # Create alert
                alert = {
                    'type': 'accuracy_decay',
                    'timestamp': datetime.now(),
                    'metric': metric_name,
                    'baseline_value': baseline,
                    'current_value': current_mean,
                    'decay': decay,
                    'threshold': threshold,
                    'p_value': p_value,
                    'severity': 'high' if decay > threshold * 2 else 'medium'
                }
                
                self.alerts.append(alert)
        
        return decay_detected
    
    def get_performance_trend(self, metric_name: str,
                             window_hours: int = 24) -> Dict:
        """Get performance trend for a metric."""
        
        cutoff_time = datetime.now() - timedelta(hours=window_hours)
        
        recent_metrics = [
            m for m in self.metric_history
            if m['timestamp'] > cutoff_time and metric_name in m
        ]
        
        if len(recent_metrics) < 2:
            return {'status': 'insufficient_data'}
        
        values = [m[metric_name] for m in recent_metrics]
        timestamps = [m['timestamp'] for m in recent_metrics]
        
        # Calculate trend
        x = np.arange(len(values))
        slope, intercept = np.polyfit(x, values, 1)
        
        # Calculate statistics
        stats_dict = {
            'mean': np.mean(values),
            'std': np.std(values),
            'min': np.min(values),
            'max': np.max(values),
            'slope': slope,
            'intercept': intercept,
            'n_observations': len(values),
            'window_hours': window_hours
        }
        
        # Predict future value (1 hour ahead)
        stats_dict['predicted_1h'] = slope * (len(values) + 60) + intercept
        
        return stats_dict
    
    def generate_decay_report(self) -> Dict:
        """Generate comprehensive decay report."""
        
        report = {
            'timestamp': datetime.now().isoformat(),
            'total_observations': len(self.metric_history),
            'baseline_metrics': self.baseline_metrics,
            'current_metrics': self._get_current_metrics(),
            'decay_analysis': {},
            'alerts': self.alerts[-10:],  # Last 10 alerts
            'recommendations': []
        }
        
        for metric_name in self.baseline_metrics:
            trend = self.get_performance_trend(metric_name, window_hours=24)
            
            if 'status' not in trend:
                report['decay_analysis'][metric_name] = {
                    'baseline': self.baseline_metrics[metric_name],
                    'current_mean': trend['mean'],
                    'decay': self.baseline_metrics[metric_name] - trend['mean'],
                    'trend_slope': trend['slope'],
                    'predicted_1h': trend['predicted_1h']
                }
        
        # Generate recommendations
        if self.alerts:
            recent_alerts = self.alerts[-5:]
            high_severity = [a for a in recent_alerts if a['severity'] == 'high']
            
            if high_severity:
                report['recommendations'].append(
                    "URGENT: Multiple high-severity accuracy decay alerts. "
                    "Consider immediate model rollback or retraining."
                )
            else:
                report['recommendations'].append(
                    "Monitor accuracy trends closely. "
                    "Schedule retraining if decay continues."
                )
        
        return report
    
    def _get_current_metrics(self) -> Dict:
        """Get current (most recent) metrics."""
        
        if self.metric_history:
            return self.metric_history[-1]
        return {}

class ProductionAccuracyTracker:
    """Track accuracy using ground truth labels as they become available."""
    
    def __init__(self, lag_hours: int = 24):
        self.lag_hours = lag_hours
        self.predictions = []
        self.ground_truth = []
    
    def log_prediction(self, prediction_id: str,
                      prediction: float,
                      features: Dict,
                      timestamp: datetime):
        """Log a prediction for later comparison."""
        
        self.predictions.append({
            'prediction_id': prediction_id,
            'prediction': prediction,
            'features': features,
            'timestamp': timestamp,
            'ground_truth_received': False
        })
    
    def log_ground_truth(self, prediction_id: str,
                        actual_value: float,
                        timestamp: datetime):
        """Log ground truth for a prediction."""
        
        for pred in self.predictions:
            if pred['prediction_id'] == prediction_id:
                pred['actual_value'] = actual_value
                pred['ground_truth_timestamp'] = timestamp
                pred['ground_truth_received'] = True
                break
    
    def calculate_accuracy(self, window_hours: int = 24) -> Dict:
        """Calculate accuracy for predictions with ground truth."""
        
        cutoff_time = datetime.now() - timedelta(hours=window_hours)
        
        # Get predictions with ground truth
        labeled_predictions = [
            p for p in self.predictions
            if p['ground_truth_received'] and
            p['timestamp'] > cutoff_time
        ]
        
        if not labeled_predictions:
            return {'status': 'no_labeled_data'}
        
        y_true = np.array([p['actual_value'] for p in labeled_predictions])
        y_pred = np.array([p['prediction'] for p in labeled_predictions])
        
        # Calculate metrics
        metrics = {
            'n_samples': len(y_true),
            'accuracy': float(np.mean(y_true == (y_pred > 0.5).astype(int))),
            'positive_rate': float(np.mean(y_true)),
            'prediction_rate': float(np.mean(y_pred > 0.5)),
        }
        
        if len(np.unique(y_true)) == 2:
            metrics['auc_roc'] = float(roc_auc_score(y_true, y_pred))
            metrics['precision'] = float(precision_score(y_true, (y_pred > 0.5).astype(int)))
            metrics['recall'] = float(recall_score(y_true, (y_pred > 0.5).astype(int)))
            metrics['f1_score'] = float(f1_score(y_true, (y_pred > 0.5).astype(int)))
        
        return metrics

ℹ️

Accuracy decay detection requires ground truth labels, which may have a lag. Implement both proxy metrics (prediction distribution) and actual accuracy tracking for comprehensive monitoring.

Alerting System

Multi-Channel Alerting

import smtplib
import json
from email.mime.text import MIMEText
from typing import Dict, List, Optional
from datetime import datetime
import requests

class MLAlertManager:
    """Manage ML alerts across multiple channels."""
    
    def __init__(self, config: Dict):
        self.config = config
        self.alert_history = []
        
        # Initialize channels
        self.slack_webhook = config.get('slack_webhook')
        self.pagerduty_key = config.get('pagerduty_key')
        self.email_config = config.get('email_config')
    
    def create_alert(self, alert_type: str, severity: str,
                    message: str, metrics: Dict = None,
                    recommendations: List[str] = None) -> Dict:
        """Create and send an alert."""
        
        alert = {
            'id': f"alert_{datetime.now():%Y%m%d%H%M%S}",
            'type': alert_type,
            'severity': severity,
            'message': message,
            'metrics': metrics or {},
            'recommendations': recommendations or [],
            'timestamp': datetime.now().isoformat(),
            'status': 'active'
        }
        
        # Store alert
        self.alert_history.append(alert)
        
        # Send to appropriate channels
        if severity in ['critical', 'high']:
            self._send_pagerduty(alert)
            self._send_slack(alert)
            self._send_email(alert)
        elif severity == 'medium':
            self._send_slack(alert)
        else:
            self._send_slack(alert, channel='info')
        
        return alert
    
    def _send_slack(self, alert: Dict, channel: str = None):
        """Send alert to Slack."""
        
        if not self.slack_webhook:
            return
        
        channel = channel or self.config.get('slack_channel', '#ml-alerts')
        
        color = {
            'critical': '#FF0000',
            'high': '#FF6600',
            'medium': '#FFCC00',
            'low': '#00CC00'
        }.get(alert['severity'], '#808080')
        
        payload = {
            'channel': channel,
            'attachments': [{
                'color': color,
                'title': f"ML Alert: {alert['type']}",
                'text': alert['message'],
                'fields': [
                    {'title': 'Severity', 'value': alert['severity'], 'short': True},
                    {'title': 'Time', 'value': alert['timestamp'], 'short': True},
                ],
                'footer': 'ML Monitoring System'
            }]
        }
        
        if alert['recommendations']:
            payload['attachments'][0]['fields'].append({
                'title': 'Recommendations',
                'value': '\n'.join(alert['recommendations'][:3]),
                'short': False
            })
        
        requests.post(self.slack_webhook, json=payload)
    
    def _send_pagerduty(self, alert: Dict):
        """Send alert to PagerDuty."""
        
        if not self.pagerduty_key:
            return
        
        severity_map = {
            'critical': 'critical',
            'high': 'error',
            'medium': 'warning',
            'low': 'info'
        }
        
        payload = {
            'routing_key': self.pagerduty_key,
            'event_action': 'trigger',
            'payload': {
                'summary': f"{alert['type']}: {alert['message'][:100]}",
                'severity': severity_map.get(alert['severity'], 'info'),
                'source': 'ml-monitoring',
                'component': alert['type'],
                'group': 'ml-production',
                'class': 'accuracy_decay',
                'custom_details': {
                    'alert_id': alert['id'],
                    'metrics': alert['metrics'],
                    'recommendations': alert['recommendations']
                }
            }
        }
        
        requests.post(
            'https://events.pagerduty.com/v2/enqueue',
            json=payload
        )
    
    def _send_email(self, alert: Dict):
        """Send alert via email."""
        
        if not self.email_config:
            return
        
        msg = MIMEText(f"""
ML Alert: {alert['type']}

Severity: {alert['severity']}
Time: {alert['timestamp']}

Message:
{alert['message']}

Metrics:
{json.dumps(alert['metrics'], indent=2)}

Recommendations:
{chr(10).join(f"- {r}" for r in alert['recommendations'])}

Alert ID: {alert['id']}
        """)
        
        msg['Subject'] = f"[ML Alert - {alert['severity'].upper()}] {alert['type']}"
        msg['From'] = self.email_config['from']
        msg['To'] = ', '.join(self.email_config['to'])
        
        with smtplib.SMTP(self.email_config['smtp_host']) as server:
            server.send_message(msg)
    
    def resolve_alert(self, alert_id: str, resolution: str = ""):
        """Resolve an active alert."""
        
        for alert in self.alert_history:
            if alert['id'] == alert_id:
                alert['status'] = 'resolved'
                alert['resolved_at'] = datetime.now().isoformat()
                alert['resolution'] = resolution
                break

⚠️

Implement alert escalation policies. Start with low-severity alerts and escalate if issues persist. Use deduplication and grouping to avoid alert fatigue.

Dashboards

Grafana Dashboard Configuration

{
  "dashboard": {
    "title": "ML Model Monitoring",
    "tags": ["ml", "production", "monitoring"],
    "timezone": "browser",
    "panels": [
      {
        "title": "Inference Latency",
        "type": "graph",
        "targets": [
          {
            "expr": "histogram_quantile(0.99, rate(ml_inference_latency_seconds_bucket[5m]))",
            "legendFormat": "P99 Latency",
            "refId": "A"
          },
          {
            "expr": "histogram_quantile(0.95, rate(ml_inference_latency_seconds_bucket[5m]))",
            "legendFormat": "P95 Latency",
            "refId": "B"
          },
          {
            "expr": "histogram_quantile(0.50, rate(ml_inference_latency_seconds_bucket[5m]))",
            "legendFormat": "P50 Latency",
            "refId": "C"
          }
        ],
        "yaxes": [
          {"label": "Latency (seconds)", "format": "s"},
          {"show": false}
        ],
        "thresholds": [
          {"value": 0.5, "colorMode": "warning", "op": "gt"},
          {"value": 1.0, "colorMode": "critical", "op": "gt"}
        ]
      },
      {
        "title": "Throughput",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(ml_inference_requests_total[5m])",
            "legendFormat": "Requests/second",
            "refId": "A"
          }
        ],
        "yaxes": [
          {"label": "Requests/second", "format": "reqps"},
          {"show": false}
        ]
      },
      {
        "title": "Model Accuracy",
        "type": "gauge",
        "targets": [
          {
            "expr": "ml_model_accuracy{metric_type='auc_roc'}",
            "legendFormat": "AUC-ROC",
            "refId": "A"
          }
        ],
        "options": {
          "thresholds": [
            {"value": 0, "color": "red"},
            {"value": 0.85, "color": "yellow"},
            {"value": 0.90, "color": "green"}
          ],
          "min": 0,
          "max": 1
        }
      },
      {
        "title": "Feature Drift Scores",
        "type": "heatmap",
        "targets": [
          {
            "expr": "ml_feature_drift_score",
            "legendFormat": "{{feature_name}}",
            "refId": "A"
          }
        ],
        "options": {
          "calculate": false,
          "dataFormat": "tsbuckets",
          "yAxis": {
            "unit": "short"
          }
        }
      },
      {
        "title": "GPU Utilization",
        "type": "graph",
        "targets": [
          {
            "expr": "ml_gpu_utilization_percent",
            "legendFormat": "GPU {{gpu_id}}",
            "refId": "A"
          }
        ],
        "yaxes": [
          {"label": "Utilization %", "format": "percent"},
          {"show": false}
        ],
        "thresholds": [
          {"value": 80, "colorMode": "warning", "op": "gt"},
          {"value": 95, "colorMode": "critical", "op": "gt"}
        ]
      }
    ],
    "time": {
      "from": "now-1h",
      "to": "now"
    },
    "refresh": "30s"
  }
}

Summary

ML production monitoring requires:

  1. Infrastructure Monitoring: GPU, CPU, memory, network
  2. Latency Monitoring: P50, P90, P95, P99 percentiles
  3. Accuracy Decay Detection: Statistical tests for performance degradation
  4. Alerting: Multi-channel alerts with escalation policies
  5. Dashboards: Real-time visualization of key metrics

Implement comprehensive monitoring to maintain model reliability in production.

Advertisement