Interview Question
βΉοΈInterview Context
Company: Uber / Google Role: Senior Data Engineer / Platform Engineer Difficulty: Advanced Time: 45-60 minutes
Question: "How do you monitor Airflow in production? Explain metrics collection, SLA management, and alerting strategies. What tools would you use and why?"
Detailed Theory
Monitoring Fundamentals
# monitoring_fundamentals.py
"""
Airflow Monitoring Components:
1. Metrics:
- Task metrics (success, failure, duration)
- DAG metrics (run time, schedule delay)
- System metrics (CPU, memory, disk)
2. Logging:
- Task logs
- Scheduler logs
- Error logs
3. Alerting:
- Email alerts
- Slack/PagerDuty integration
- Custom webhooks
4. Dashboards:
- Grafana
- Airflow UI
- Custom dashboards
5. SLAs:
- Task SLAs
- DAG SLAs
- Missed SLA alerts
"""
1. Metrics Collection
# metrics_collection.py
"""
Airflow Metrics Collection:
Airflow exposes metrics via statsd or Prometheus.
"""
# StatsD Configuration
STATSD_CONFIG = """
[metrics]
# Enable metrics
statsd_on = True
statsd_host = localhost
statsd_port = 9125
statsd_prefix = airflow
# StatsD datadog
statsd_datadog_enabled = True
statsd_datadog_tags = {"env": "production", "team": "data-engineering"}
"""
# Prometheus Configuration
PROMETHEUS_CONFIG = """
[metrics]
# Enable Prometheus
prometheus_enabled = True
prometheus_port = 9090
prometheus_save_metrics_dag_runs = True
"""
# Custom metrics in tasks
from airflow.stats import Stats
def task_with_metrics():
"""Task with custom metrics"""
import time
start_time = time.time()
# Execute task
result = execute_task()
# Record metrics
duration = time.time() - start_time
Stats.timing('task.duration', duration)
Stats.incr('task.success')
Stats.gauge('task.records_processed', result['records'])
return result
# Metric types
METRIC_TYPES = """
StatsD Metric Types:
1. Counter (Stats.incr):
- Increment a counter
- Use for: Success/failure counts
2. Gauge (Stats.gauge):
- Set a gauge value
- Use for: Current values (queue depth)
3. Timing (Stats.timing):
- Record timing
- Use for: Duration measurements
4. Set (Stats.set):
- Record unique values
- Use for: Unique counts
"""
βΉοΈPro Tip
Use Prometheus for modern monitoring with Grafana dashboards. It provides better query capabilities and integrates well with Kubernetes.
2. SLA Management
# sla_management.py
from airflow.decorators import dag, task
from airflow.models import SLAmiss
from datetime import datetime, timedelta
from airflow.utils.session import provide_session
# DAG with SLA
@dag(
dag_id='sla_example',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
sla_miss_callback=sla_miss_callback,
default_args={
'sla': timedelta(hours=2), # 2 hour SLA for all tasks
},
)
def sla_dag():
@task(sla=timedelta(hours=1)) # 1 hour SLA for this task
def critical_task() -> dict:
"""Critical task with short SLA"""
return {'status': 'success'}
@task(sla=timedelta(hours=4)) # 4 hour SLA
def non_critical_task() -> dict:
"""Non-critical task with longer SLA"""
return {'status': 'success'}
critical_task() >> non_critical_task()
sla_dag()
# SLA miss callback
def sla_miss_callback(
dag,
task_list,
blocking_task_list,
slas,
blocking_tis,
):
"""Handle SLA miss"""
from airflow.operators.email import EmailOperator
# Send email
EmailOperator(
to=['team@company.com'],
subject=f'SLA Missed: {dag.dag_id}',
html_content=f"""
<h2>SLA Missed</h2>
<p>DAG: {dag.dag_id}</p>
<p>Tasks: {task_list}</p>
<p>Blocking Tasks: {blocking_task_list}</p>
""",
).execute(context={})
# Check SLA misses
@provide_session
def check_sla_misses(session=None):
"""Check for SLA misses"""
sla_misses = session.query(SLAmiss).filter(
SLAmiss.execution_date >= datetime(2024, 1, 1)
).all()
for sla_miss in sla_misses:
print(f"SLA missed: {sla_miss.dag_id} - {sla_miss.task_id}")
3. Alerting Configuration
# alerting_configuration.py
"""
Airflow Alerting:
1. Email Alerts:
- DAG failure alerts
- Task failure alerts
- SLA miss alerts
2. Slack Alerts:
- Real-time notifications
- Rich formatting
- Channel routing
3. PagerDuty:
- Critical alerts
- On-call routing
- Incident management
4. Custom Webhooks:
- Integration with internal tools
- Custom formatting
- Flexible routing
"""
# Slack alerting
def send_slack_alert(
title: str,
message: str,
severity: str = "info",
channel: str = "#airflow-alerts",
):
"""Send Slack alert"""
import requests
import os
webhook_url = os.environ.get('SLACK_WEBHOOK_URL')
colors = {
"info": "#36a64f",
"warning": "#ff9900",
"critical": "#ff0000",
}
payload = {
"channel": channel,
"attachments": [{
"color": colors.get(severity, "#000000"),
"title": title,
"text": message,
"footer": "Airflow Alert",
}]
}
requests.post(webhook_url, json=payload, timeout=10)
# PagerDuty alerting
def send_pagerduty_alert(
title: str,
message: str,
severity: str = "critical",
):
"""Send PagerDuty alert"""
import requests
import os
routing_key = os.environ.get('PAGERDUTY_ROUTING_KEY')
payload = {
"routing_key": routing_key,
"event_action": "trigger",
"payload": {
"summary": f"{title}: {message}",
"severity": severity,
"source": "airflow",
}
}
requests.post(
"https://events.pagerduty.com/v2/enqueue",
json=payload,
timeout=10,
)
# Callback for task failures
def task_failure_callback(context):
"""Callback for task failures"""
task_id = context['task_instance'].task_id
dag_id = context['dag'].dag_id
exception = context.get('exception')
send_slack_alert(
title=f"Task Failed: {task_id}",
message=f"DAG: {dag_id}\nException: {exception}",
severity="critical",
)
4. Grafana Dashboards
# grafana_dashboards.py
"""
Grafana Dashboard Configuration:
Create dashboards for Airflow monitoring.
"""
# Dashboard JSON (simplified)
GRAFANA_DASHBOARD = """
{
"dashboard": {
"title": "Airflow Monitoring",
"panels": [
{
"title": "Task Success Rate",
"type": "stat",
"targets": [{
"expr": "rate(airflow_task_succeeded_total[5m])",
"legendFormat": "{{dag_id}}"
}]
},
{
"title": "Task Duration",
"type": "graph",
"targets": [{
"expr": "histogram_quantile(0.95, rate(airflow_task_duration_seconds_bucket[5m]))",
"legendFormat": "{{dag_id}} - {{task_id}}"
}]
},
{
"title": "DAG Run Duration",
"type": "graph",
"targets": [{
"expr": "airflow_dag_run_duration_seconds",
"legendFormat": "{{dag_id}}"
}]
}
]
}
}
"""
# Key metrics to monitor
KEY_METRICS = """
1. Task Metrics:
- airflow_task_succeeded_total
- airflow_task_failed_total
- airflow_task_duration_seconds
2. DAG Metrics:
- airflow_dag_run_duration_seconds
- airflow_dag_run_scheduler_delay_seconds
3. System Metrics:
- airflow_scheduler_heartbeat
- airflow_pool_available_slots
4. Custom Metrics:
- airflow_custom_records_processed
- airflow_custom_api_calls
"""
Real-World Scenarios
Scenario 1: Uber's Monitoring Setup
# uber_monitoring.py
"""
Uber-style monitoring:
- Real-time dashboards
- Automated alerting
- SLA tracking
"""
from airflow.decorators import dag, task
from datetime import datetime, timedelta
from airflow.stats import Stats
@dag(
dag_id='uber_monitored_pipeline',
schedule_interval='*/5 * * * *',
start_date=datetime(2024, 1, 1),
catchup=False,
default_args={
'sla': timedelta(minutes=10),
},
sla_miss_callback=sla_miss_callback,
tags=['uber', 'monitoring', 'production'],
)
def uber_monitored():
@task
def process_data() -> dict:
"""Process data with metrics"""
import time
start = time.time()
# Process
result = perform_processing()
# Record metrics
duration = time.time() - start
Stats.timing('uber.processing.duration', duration)
Stats.gauge('uber.processing.records', result['records'])
if result['records'] > 0:
Stats.incr('uber.processing.success')
else:
Stats.incr('uber.processing.empty')
return result
process_data()
uber_monitored()
Scenario 2: Google's SLA Management
# google_sla.py
"""
Google-style SLA management:
- Multi-level SLAs
- Automated escalation
- Performance tracking
"""
from airflow.decorators import dag, task
from datetime import datetime, timedelta
@dag(
dag_id='google_sla_pipeline',
schedule_interval='@hourly',
start_date=datetime(2024, 1, 1),
catchup=False,
default_args={
'sla': timedelta(hours=1),
},
sla_miss_callback=sla_miss_callback,
tags=['google', 'sla', 'production'],
)
def google_sla():
@task(sla=timedelta(minutes=30))
def critical_processing() -> dict:
"""Critical processing with tight SLA"""
return {'status': 'success'}
@task(sla=timedelta(hours=2))
def batch_processing() -> dict:
"""Batch processing with relaxed SLA"""
return {'status': 'success'}
critical = critical_processing()
batch = batch_processing()
critical >> batch
google_sla()
Edge Cases
β οΈCommon Pitfalls
-
Alert Fatigue: Too many alerts can cause teams to ignore them.
-
Missing Metrics: Not tracking all important metrics.
-
SLA Definition: SLAs that are too tight or too loose.
-
Dashboard Overload: Too many dashboards can be overwhelming.
# edge_cases.py
from airflow.stats import Stats
from datetime import datetime
# Alert fatigue issue
def alert_with_throttling():
"""Alert with throttling to prevent fatigue"""
import time
last_alert_time = {}
throttle_seconds = 300 # 5 minutes
def should_alert(alert_key: str) -> bool:
now = time.time()
last_time = last_alert_time.get(alert_key, 0)
if now - last_time > throttle_seconds:
last_alert_time[alert_key] = now
return True
return False
# Use throttling
if should_alert('task_failure'):
send_alert('Task failed')
# Missing metrics issue
def comprehensive_metrics():
"""Track all important metrics"""
# Task metrics
Stats.incr('task.success')
Stats.incr('task.failure')
Stats.timing('task.duration', duration)
# DAG metrics
Stats.timing('dag.run_duration', duration)
Stats.gauge('dag.schedule_delay', delay)
# Custom metrics
Stats.gauge('pipeline.records_processed', count)
Stats.gauge('pipeline.api_calls', count)
QuizBox
Best Practices
# best_practices.py
"""
Monitoring Best Practices:
1. Metrics Collection:
- Track task and DAG metrics
- Record custom business metrics
- Monitor system resources
2. Alerting:
- Implement alert throttling
- Use appropriate severity levels
- Route alerts to right teams
3. SLA Management:
- Define realistic SLAs
- Monitor SLA compliance
- Escalate missed SLAs
4. Dashboards:
- Create role-specific dashboards
- Keep dashboards focused
- Regular dashboard review
5. Continuous Improvement:
- Regular monitoring review
- Update metrics based on needs
- Optimize alert thresholds
"""
βΉοΈUber Interview Tip
At Uber, they emphasize real-time monitoring and alerting. When discussing monitoring, highlight the importance of comprehensive metrics, alert throttling, and SLA management. Also mention how they use dashboards for operational visibility.
Summary
Monitoring is critical for production Airflow. Key takeaways:
- Metrics for visibility
- Alerting for proactive response
- SLAs for reliability
- Dashboards for operational insight
- Continuous improvement for optimization
For Uber and Google interviews, focus on:
- Comprehensive metrics collection
- Alert throttling and management
- SLA definition and tracking
- Dashboard design
- Operational excellence
This question is part of the Apache Airflow Advanced interview preparation series. Practice explaining these concepts before your interview.