Observability in Data Pipelines: Monitoring & Alerting
Difficulty: Senior Level | Companies: Netflix, Uber, Airbnb, Spotify, Stripe
1. The Three Pillars of Data Observability
Architecture Diagram
Data Observability
βββ Metrics (Quantitative)
β βββ Row counts, null rates, freshness
β βββ Pipeline duration, throughput
β βββ Resource utilization (CPU, memory, storage)
βββ Logs (Discrete Events)
β βββ Pipeline start/end events
β βββ Schema changes, data quality failures
β βββ Error traces, stack traces
βββ Lineage (Provenance)
βββ Upstream dependencies
βββ Downstream impact
βββ Column-level transformations
βΉοΈ
Key Insight: Data observability is NOT just monitoring. It's the ability to understand, diagnose, and manage data health across the entire pipeline lifecycle.
2. Key Data Metrics (SLIs)
Freshness Metrics
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
@dataclass
class FreshnessSLI:
table_name: str
last_updated: datetime
expected_frequency: timedelta
timestamp_column: str = "updated_at"
@property
def delay(self) -> timedelta:
return datetime.now() - self.last_updated
@property
def is_healthy(self) -> bool:
return self.delay <= self.expected_frequency
@property
def sli_value(self) -> float:
"""1.0 = perfectly on time, 0.0 = way overdue"""
if self.delay <= self.expected_frequency:
return 1.0
overdue_ratio = self.delay.total_seconds() / self.expected_frequency.total_seconds()
return max(0, 1.0 - (overdue_ratio - 1) * 0.5)
# Usage
sli = FreshnessSLI(
table_name="fact_orders",
last_updated=datetime.now() - timedelta(minutes=45),
expected_frequency=timedelta(hours=1)
)
print(f"Freshness: {sli.sli_value:.2f}") # 0.5
Volume Metrics
@dataclass
class VolumeSLI:
table_name: str
current_count: int
historical_avg: int
historical_std: int
window_hours: int = 24
@property
def z_score(self) -> float:
if self.historical_std == 0:
return 0
return (self.current_count - self.historical_avg) / self.historical_std
@property
def is_healthy(self) -> bool:
return abs(self.z_score) <= 3 # Within 3 std devs
@property
def anomaly_type(self) -> Optional[str]:
if self.z_score > 3:
return "VOLUME_SPIKE"
elif self.z_score < -3:
return "VOLUME_DROP"
return None
Quality Metrics
@dataclass
class QualityMetrics:
total_rows: int
null_counts: dict # column -> null_count
duplicate_count: int
schema_changes: list # list of detected changes
@property
def null_ratio(self) -> float:
total_cells = sum(self.null_counts.values())
return total_cells / (self.total_rows * len(self.null_counts)) if self.total_rows > 0 else 0
@property
def duplicate_ratio(self) -> float:
return self.duplicate_count / self.total_rows if self.total_rows > 0 else 0
def column_null_rates(self) -> dict:
return {col: count / self.total_rows for col, count in self.null_counts.items()}
3. Pipeline Health Dashboard
Metrics Collection
import time
from functools import wraps
from typing import Callable
from prometheus_client import Counter, Histogram, Gauge
# Prometheus metrics
PIPELINE_DURATION = Histogram(
'pipeline_duration_seconds',
'Pipeline execution duration',
['pipeline_name', 'stage']
)
PIPELINE_ROWS = Counter(
'pipeline_rows_processed',
'Total rows processed',
['pipeline_name', 'stage', 'status']
)
PIPELINE_ERRORS = Counter(
'pipeline_errors_total',
'Total pipeline errors',
['pipeline_name', 'error_type']
)
DATA_FRESHNESS = Gauge(
'data_freshness_minutes',
'Data freshness in minutes',
['table_name']
)
def monitor_pipeline(pipeline_name: str):
def decorator(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
try:
result = func(*args, **kwargs)
duration = time.time() - start
PIPELINE_DURATION.labels(pipeline_name=pipeline_name, stage='complete').observe(duration)
PIPELINE_ROWS.labels(pipeline_name=pipeline_name, stage='complete', status='success').inc()
return result
except Exception as e:
PIPELINE_ERRORS.labels(pipeline_name=pipeline_name, error_type=type(e).__name__).inc()
raise
return wrapper
return decorator
# Usage
@monitor_pipeline("daily_sales_etl")
def run_daily_sales():
# Your pipeline code
pass
Alert Rules
# prometheus/alerts.yml
groups:
- name: data_pipeline_alerts
rules:
- alert: PipelineDelayed
expr: pipeline_duration_seconds{stage="complete"} > 3600
for: 5m
labels:
severity: warning
annotations:
summary: "Pipeline {{ $labels.pipeline_name }} running for >1 hour"
- alert: DataStale
expr: data_freshness_minutes > 120
for: 10m
labels:
severity: critical
annotations:
summary: "Table {{ $labels.table_name }} is stale for >2 hours"
- alert: PipelineFailing
expr: rate(pipeline_errors_total[5m]) > 0
for: 1m
labels:
severity: critical
annotations:
summary: "Pipeline {{ $labels.pipeline_name }} has errors"
- alert: VolumeAnomaly
expr: abs(pipeline_rows - avg_over_time(pipeline_rows[24h])) / avg_over_time(pipeline_rows[24h]) > 0.3
for: 5m
labels:
severity: warning
annotations:
summary: "Volume anomaly detected for {{ $labels.pipeline_name }}"
4. Distributed Tracing for Data Pipelines
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanExporter
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
# Setup tracing
provider = TracerProvider()
jaeger_exporter = JaegerExporter(
agent_host_name="jaeger",
agent_port=6831,
)
provider.add_span_processor(BatchSpanExporter(jaeger_exporter))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("data-pipeline")
class PipelineTracer:
def __init__(self, pipeline_name: str):
self.pipeline_name = pipeline_name
self.tracer = trace.get_tracer(pipeline_name)
def trace_stage(self, stage_name: str):
def decorator(func):
def wrapper(*args, **kwargs):
with self.tracer.start_as_current_span(
f"{self.pipeline_name}.{stage_name}",
attributes={
"pipeline.name": self.pipeline_name,
"stage.name": stage_name,
}
) as span:
try:
result = func(*args, **kwargs)
span.set_status(trace.Status(trace.StatusCode.OK))
return result
except Exception as e:
span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
return wrapper
return decorator
# Usage
tracer = PipelineTracer("daily_etl")
@tracer.trace_stage("extract")
def extract_data():
# Extraction logic
pass
@tracer.trace_stage("transform")
def transform_data():
# Transformation logic
pass
5. Incident Response for Data Issues
Automated Triage
from enum import Enum
from dataclasses import dataclass
class Severity(Enum):
P1 = "critical" # Revenue impact, dashboard broken
P2 = "high" # Data delayed, partial impact
P3 = "medium" # Quality issue, no immediate impact
P4 = "low" # Cosmetic, no data impact
@dataclass
class DataIncident:
incident_id: str
severity: Severity
description: str
affected_tables: list
root_cause: str
mitigation: str
owner: str
class IncidentTriager:
SEVERITY_RULES = {
"PIPELINE_FAILURE": Severity.P1,
"SCHEMA_CHANGE": Severity.P2,
"VOLUME_DROP": Severity.P2,
"FRESHNESS_BREACH": Severity.P2,
"NULL_SPIKE": Severity.P3,
"DUPLICATE_ROWS": Severity.P3,
}
def triage(self, alert_type: str, context: dict) -> DataIncident:
severity = self.SEVERITY_RULES.get(alert_type, Severity.P4)
# Escalate if multiple downstream consumers affected
if context.get("downstream_count", 0) > 10:
severity = Severity.P1
return DataIncident(
incident_id=f"INC-{int(time.time())}",
severity=severity,
description=f"{alert_type} on {context.get('table', 'unknown')}",
affected_tables=[context.get('table', '')],
root_cause="Under investigation",
mitigation="Pending",
owner=context.get('owner', 'data-platform')
)
6. Observability Stack Architecture
βΉοΈ
Best Practice: Start with freshness and volume monitoring β they catch 80% of data issues. Add quality checks and lineage tracking incrementally.
Follow-Up Questions
- How would you implement data observability for real-time streaming pipelines?
- Design an incident response runbook for data quality issues.
- How do you correlate metrics across pipeline stages?
- How would you reduce alert fatigue in a data platform?
- Design a data health scorecard for executive reporting.