πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Topic: Airflow Monitoring and Observability

Apache Airflow AdvancedMonitoring⭐ Premium

Advertisement

Airflow Monitoring & Observability

Metrics, SLAs, and Alerting

UberGoogleDifficulty: Advanced

Interview Question

ℹ️Interview Context

Company: Uber / Google Role: Senior Data Engineer / Platform Engineer Difficulty: Advanced Time: 45-60 minutes

Question: "How do you monitor Airflow in production? Explain metrics collection, SLA management, and alerting strategies. What tools would you use and why?"


Detailed Theory

Monitoring Fundamentals

# monitoring_fundamentals.py
"""
Airflow Monitoring Components:

1. Metrics:
   - Task metrics (success, failure, duration)
   - DAG metrics (run time, schedule delay)
   - System metrics (CPU, memory, disk)

2. Logging:
   - Task logs
   - Scheduler logs
   - Error logs

3. Alerting:
   - Email alerts
   - Slack/PagerDuty integration
   - Custom webhooks

4. Dashboards:
   - Grafana
   - Airflow UI
   - Custom dashboards

5. SLAs:
   - Task SLAs
   - DAG SLAs
   - Missed SLA alerts
"""

1. Metrics Collection

# metrics_collection.py
"""
Airflow Metrics Collection:

Airflow exposes metrics via statsd or Prometheus.
"""

# StatsD Configuration
STATSD_CONFIG = """
[metrics]
# Enable metrics
statsd_on = True
statsd_host = localhost
statsd_port = 9125
statsd_prefix = airflow

# StatsD datadog
statsd_datadog_enabled = True
statsd_datadog_tags = {"env": "production", "team": "data-engineering"}
"""

# Prometheus Configuration
PROMETHEUS_CONFIG = """
[metrics]
# Enable Prometheus
prometheus_enabled = True
prometheus_port = 9090
prometheus_save_metrics_dag_runs = True
"""

# Custom metrics in tasks
from airflow.stats import Stats

def task_with_metrics():
    """Task with custom metrics"""
    import time
    
    start_time = time.time()
    
    # Execute task
    result = execute_task()
    
    # Record metrics
    duration = time.time() - start_time
    
    Stats.timing('task.duration', duration)
    Stats.incr('task.success')
    Stats.gauge('task.records_processed', result['records'])
    
    return result

# Metric types
METRIC_TYPES = """
StatsD Metric Types:

1. Counter (Stats.incr):
   - Increment a counter
   - Use for: Success/failure counts

2. Gauge (Stats.gauge):
   - Set a gauge value
   - Use for: Current values (queue depth)

3. Timing (Stats.timing):
   - Record timing
   - Use for: Duration measurements

4. Set (Stats.set):
   - Record unique values
   - Use for: Unique counts
"""

ℹ️Pro Tip

Use Prometheus for modern monitoring with Grafana dashboards. It provides better query capabilities and integrates well with Kubernetes.

2. SLA Management

# sla_management.py
from airflow.decorators import dag, task
from airflow.models import SLAmiss
from datetime import datetime, timedelta
from airflow.utils.session import provide_session

# DAG with SLA
@dag(
    dag_id='sla_example',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    sla_miss_callback=sla_miss_callback,
    default_args={
        'sla': timedelta(hours=2),  # 2 hour SLA for all tasks
    },
)
def sla_dag():
    @task(sla=timedelta(hours=1))  # 1 hour SLA for this task
    def critical_task() -> dict:
        """Critical task with short SLA"""
        return {'status': 'success'}
    
    @task(sla=timedelta(hours=4))  # 4 hour SLA
    def non_critical_task() -> dict:
        """Non-critical task with longer SLA"""
        return {'status': 'success'}
    
    critical_task() >> non_critical_task()

sla_dag()

# SLA miss callback
def sla_miss_callback(
    dag,
    task_list,
    blocking_task_list,
    slas,
    blocking_tis,
):
    """Handle SLA miss"""
    from airflow.operators.email import EmailOperator
    
    # Send email
    EmailOperator(
        to=['team@company.com'],
        subject=f'SLA Missed: {dag.dag_id}',
        html_content=f"""
        <h2>SLA Missed</h2>
        <p>DAG: {dag.dag_id}</p>
        <p>Tasks: {task_list}</p>
        <p>Blocking Tasks: {blocking_task_list}</p>
        """,
    ).execute(context={})

# Check SLA misses
@provide_session
def check_sla_misses(session=None):
    """Check for SLA misses"""
    sla_misses = session.query(SLAmiss).filter(
        SLAmiss.execution_date >= datetime(2024, 1, 1)
    ).all()
    
    for sla_miss in sla_misses:
        print(f"SLA missed: {sla_miss.dag_id} - {sla_miss.task_id}")

3. Alerting Configuration

# alerting_configuration.py
"""
Airflow Alerting:

1. Email Alerts:
   - DAG failure alerts
   - Task failure alerts
   - SLA miss alerts

2. Slack Alerts:
   - Real-time notifications
   - Rich formatting
   - Channel routing

3. PagerDuty:
   - Critical alerts
   - On-call routing
   - Incident management

4. Custom Webhooks:
   - Integration with internal tools
   - Custom formatting
   - Flexible routing
"""

# Slack alerting
def send_slack_alert(
    title: str,
    message: str,
    severity: str = "info",
    channel: str = "#airflow-alerts",
):
    """Send Slack alert"""
    import requests
    import os
    
    webhook_url = os.environ.get('SLACK_WEBHOOK_URL')
    
    colors = {
        "info": "#36a64f",
        "warning": "#ff9900",
        "critical": "#ff0000",
    }
    
    payload = {
        "channel": channel,
        "attachments": [{
            "color": colors.get(severity, "#000000"),
            "title": title,
            "text": message,
            "footer": "Airflow Alert",
        }]
    }
    
    requests.post(webhook_url, json=payload, timeout=10)

# PagerDuty alerting
def send_pagerduty_alert(
    title: str,
    message: str,
    severity: str = "critical",
):
    """Send PagerDuty alert"""
    import requests
    import os
    
    routing_key = os.environ.get('PAGERDUTY_ROUTING_KEY')
    
    payload = {
        "routing_key": routing_key,
        "event_action": "trigger",
        "payload": {
            "summary": f"{title}: {message}",
            "severity": severity,
            "source": "airflow",
        }
    }
    
    requests.post(
        "https://events.pagerduty.com/v2/enqueue",
        json=payload,
        timeout=10,
    )

# Callback for task failures
def task_failure_callback(context):
    """Callback for task failures"""
    task_id = context['task_instance'].task_id
    dag_id = context['dag'].dag_id
    exception = context.get('exception')
    
    send_slack_alert(
        title=f"Task Failed: {task_id}",
        message=f"DAG: {dag_id}\nException: {exception}",
        severity="critical",
    )

4. Grafana Dashboards

# grafana_dashboards.py
"""
Grafana Dashboard Configuration:

Create dashboards for Airflow monitoring.
"""

# Dashboard JSON (simplified)
GRAFANA_DASHBOARD = """
{
  "dashboard": {
    "title": "Airflow Monitoring",
    "panels": [
      {
        "title": "Task Success Rate",
        "type": "stat",
        "targets": [{
          "expr": "rate(airflow_task_succeeded_total[5m])",
          "legendFormat": "{{dag_id}}"
        }]
      },
      {
        "title": "Task Duration",
        "type": "graph",
        "targets": [{
          "expr": "histogram_quantile(0.95, rate(airflow_task_duration_seconds_bucket[5m]))",
          "legendFormat": "{{dag_id}} - {{task_id}}"
        }]
      },
      {
        "title": "DAG Run Duration",
        "type": "graph",
        "targets": [{
          "expr": "airflow_dag_run_duration_seconds",
          "legendFormat": "{{dag_id}}"
        }]
      }
    ]
  }
}
"""

# Key metrics to monitor
KEY_METRICS = """
1. Task Metrics:
   - airflow_task_succeeded_total
   - airflow_task_failed_total
   - airflow_task_duration_seconds

2. DAG Metrics:
   - airflow_dag_run_duration_seconds
   - airflow_dag_run_scheduler_delay_seconds

3. System Metrics:
   - airflow_scheduler_heartbeat
   - airflow_pool_available_slots

4. Custom Metrics:
   - airflow_custom_records_processed
   - airflow_custom_api_calls
"""

Real-World Scenarios

Scenario 1: Uber's Monitoring Setup

# uber_monitoring.py
"""
Uber-style monitoring:
- Real-time dashboards
- Automated alerting
- SLA tracking
"""

from airflow.decorators import dag, task
from datetime import datetime, timedelta
from airflow.stats import Stats

@dag(
    dag_id='uber_monitored_pipeline',
    schedule_interval='*/5 * * * *',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    default_args={
        'sla': timedelta(minutes=10),
    },
    sla_miss_callback=sla_miss_callback,
    tags=['uber', 'monitoring', 'production'],
)
def uber_monitored():
    @task
    def process_data() -> dict:
        """Process data with metrics"""
        import time
        
        start = time.time()
        
        # Process
        result = perform_processing()
        
        # Record metrics
        duration = time.time() - start
        Stats.timing('uber.processing.duration', duration)
        Stats.gauge('uber.processing.records', result['records'])
        
        if result['records'] > 0:
            Stats.incr('uber.processing.success')
        else:
            Stats.incr('uber.processing.empty')
        
        return result
    
    process_data()

uber_monitored()

Scenario 2: Google's SLA Management

# google_sla.py
"""
Google-style SLA management:
- Multi-level SLAs
- Automated escalation
- Performance tracking
"""

from airflow.decorators import dag, task
from datetime import datetime, timedelta

@dag(
    dag_id='google_sla_pipeline',
    schedule_interval='@hourly',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    default_args={
        'sla': timedelta(hours=1),
    },
    sla_miss_callback=sla_miss_callback,
    tags=['google', 'sla', 'production'],
)
def google_sla():
    @task(sla=timedelta(minutes=30))
    def critical_processing() -> dict:
        """Critical processing with tight SLA"""
        return {'status': 'success'}
    
    @task(sla=timedelta(hours=2))
    def batch_processing() -> dict:
        """Batch processing with relaxed SLA"""
        return {'status': 'success'}
    
    critical = critical_processing()
    batch = batch_processing()
    
    critical >> batch

google_sla()

Edge Cases

⚠️Common Pitfalls

  1. Alert Fatigue: Too many alerts can cause teams to ignore them.

  2. Missing Metrics: Not tracking all important metrics.

  3. SLA Definition: SLAs that are too tight or too loose.

  4. Dashboard Overload: Too many dashboards can be overwhelming.

# edge_cases.py
from airflow.stats import Stats
from datetime import datetime

# Alert fatigue issue
def alert_with_throttling():
    """Alert with throttling to prevent fatigue"""
    import time
    
    last_alert_time = {}
    throttle_seconds = 300  # 5 minutes
    
    def should_alert(alert_key: str) -> bool:
        now = time.time()
        last_time = last_alert_time.get(alert_key, 0)
        
        if now - last_time > throttle_seconds:
            last_alert_time[alert_key] = now
            return True
        return False
    
    # Use throttling
    if should_alert('task_failure'):
        send_alert('Task failed')

# Missing metrics issue
def comprehensive_metrics():
    """Track all important metrics"""
    # Task metrics
    Stats.incr('task.success')
    Stats.incr('task.failure')
    Stats.timing('task.duration', duration)
    
    # DAG metrics
    Stats.timing('dag.run_duration', duration)
    Stats.gauge('dag.schedule_delay', delay)
    
    # Custom metrics
    Stats.gauge('pipeline.records_processed', count)
    Stats.gauge('pipeline.api_calls', count)

QuizBox


Best Practices

# best_practices.py
"""
Monitoring Best Practices:

1. Metrics Collection:
   - Track task and DAG metrics
   - Record custom business metrics
   - Monitor system resources

2. Alerting:
   - Implement alert throttling
   - Use appropriate severity levels
   - Route alerts to right teams

3. SLA Management:
   - Define realistic SLAs
   - Monitor SLA compliance
   - Escalate missed SLAs

4. Dashboards:
   - Create role-specific dashboards
   - Keep dashboards focused
   - Regular dashboard review

5. Continuous Improvement:
   - Regular monitoring review
   - Update metrics based on needs
   - Optimize alert thresholds
"""

ℹ️Uber Interview Tip

At Uber, they emphasize real-time monitoring and alerting. When discussing monitoring, highlight the importance of comprehensive metrics, alert throttling, and SLA management. Also mention how they use dashboards for operational visibility.


Summary

Monitoring is critical for production Airflow. Key takeaways:

  1. Metrics for visibility
  2. Alerting for proactive response
  3. SLAs for reliability
  4. Dashboards for operational insight
  5. Continuous improvement for optimization

For Uber and Google interviews, focus on:

  • Comprehensive metrics collection
  • Alert throttling and management
  • SLA definition and tracking
  • Dashboard design
  • Operational excellence

This question is part of the Apache Airflow Advanced interview preparation series. Practice explaining these concepts before your interview.

Advertisement