CW

Monitoring and Alerting in Apache Airflow

Free Lesson

Advertisement

Monitoring and Alerting

Architecture Diagram

Formal Definitions

DfMetrics Collection

Metrics Collection is the process of gathering quantitative data about system behavior. In Airflow, metrics include task counts, execution times, queue depths, and resource utilization. The metric function is m:SRm: S \rightarrow \mathbb{R} mapping system state to numeric values.

DfAlert Rule

An Alert Rule defines a condition that triggers a notification when violated. Formally, R=(M,C,T,A)R = (M, C, T, A) where MM is the monitored metric, CC is the condition, TT is the threshold, and AA is the action (notification).

DfSLA Monitoring

SLA Monitoring tracks whether tasks and DAGs complete within defined time limits. The SLA violation condition is violation    Tactual>TSLA\text{violation} \iff T_{\text{actual}} > T_{\text{SLA}} where TactualT_{\text{actual}} is actual completion time and TSLAT_{\text{SLA}} is the target.

Detailed Explanation

Prometheus Configuration

# airflow.cfg
[metrics]
# Enable Prometheus metrics
statsd_on = True
statsd_host = localhost
statsd_port = 9125
statsd_prefix = airflow

# Prometheus exporter
prometheus_enabled = True
prometheus_port = 9091

# Custom metrics
statsd_allow_list = airflow.*.success,airflow.*.failure,airflow.*.running

Custom Metrics Implementation

# plugins/custom_metrics.py
from airflow.stats import Stats
from airflow.models import DagRun, TaskInstance
from airflow import settings
from datetime import datetime, timedelta

class AirflowMetrics:
    """Custom Airflow metrics for monitoring."""
    
    @staticmethod
    def emit_scheduler_metrics():
        """Emit scheduler performance metrics."""
        session = settings.Session()
        
        # Scheduler lag
        last_heartbeat = session.query(
            func.max(DagRun.last_scheduling_decision)
        ).scalar()
        
        if last_heartbeat:
            lag = (datetime.now() - last_heartbeat).total_seconds()
            Stats.gauge('scheduler_lag_seconds', lag)
        
        # Active DAG runs
        active_runs = session.query(DagRun).filter(
            DagRun.state == 'running'
        ).count()
        Stats.gauge('dag_runs_active', active_runs)
        
        # Task counts by state
        for state in ['queued', 'running', 'success', 'failed']:
            count = session.query(TaskInstance).filter(
                TaskInstance.state == state
            ).count()
            Stats.gauge(f'task_instances_{state}', count)
    
    @staticmethod
    def emit_task_metrics(dag_id, task_id, state, duration):
        """Emit task-level metrics."""
        Stats.timing(f'task.duration.{dag_id}.{task_id}', duration)
        Stats.incr(f'task.count.{dag_id}.{task_id}.{state}')
    
    @staticmethod
    def emit_queue_metrics():
        """Emit queue depth metrics."""
        session = settings.Session()
        
        # Executor queue depth
        queued_tasks = session.query(TaskInstance).filter(
            TaskInstance.state == 'queued'
        ).count()
        Stats.gauge('executor.queue_depth', queued_tasks)
        
        # Pool usage
        from airflow.models import Pool
        pools = session.query(Pool).all()
        for pool in pools:
            utilization = pool.occupied_slots / pool.slots if pool.slots > 0 else 0
            Stats.gauge(f'pool.{pool.pool}.utilization', utilization)

# Register callbacks to emit metrics
from airflow.models import TaskInstance

def task_success_callback(context):
    """Emit metrics on task success."""
    ti = context['task_instance']
    duration = ti.duration
    
    AirflowMetrics.emit_task_metrics(
        dag_id=ti.dag_id,
        task_id=ti.task_id,
        state='success',
        duration=duration,
    )

def task_failure_callback(context):
    """Emit metrics on task failure."""
    ti = context['task_instance']
    
    AirflowMetrics.emit_task_metrics(
        dag_id=ti.dag_id,
        task_id=ti.task_id,
        state='failed',
        duration=0,
    )

Grafana Dashboard Configuration

{
  "dashboard": {
    "title": "Airflow Overview",
    "panels": [
      {
        "title": "Task Success Rate",
        "type": "stat",
        "targets": [
          {
            "expr": "rate(airflow_task_success_total[5m]) / (rate(airflow_task_success_total[5m]) + rate(airflow_task_failure_total[5m]))",
            "legendFormat": "Success Rate"
          }
        ],
        "thresholds": [
          {"value": 0.95, "color": "green"},
          {"value": 0.9, "color": "yellow"},
          {"value": 0.8, "color": "red"}
        ]
      },
      {
        "title": "Task Duration",
        "type": "graph",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, rate(airflow_task_duration_seconds_bucket[5m]))",
            "legendFormat": "P95 Duration"
          }
        ]
      },
      {
        "title": "Queue Depth",
        "type": "graph",
        "targets": [
          {
            "expr": "airflow_executor_queue_depth",
            "legendFormat": "Queued Tasks"
          }
        ]
      },
      {
        "title": "Scheduler Lag",
        "type": "stat",
        "targets": [
          {
            "expr": "airflow_scheduler_lag_seconds",
            "legendFormat": "Lag (seconds)"
          }
        ],
        "thresholds": [
          {"value": 60, "color": "green"},
          {"value": 300, "color": "yellow"},
          {"value": 600, "color": "red"}
        ]
      }
    ]
  }
}
Alert Rate Calculation
Ralert=NalertsTperiodR_{\text{alert}} = \frac{N_{\text{alerts}}}{T_{\text{period}}}

Here,

  • RextalertR_{ ext{alert}}=Alert rate (alerts per time unit)
  • NextalertsN_{ ext{alerts}}=Number of alerts in period
  • TextperiodT_{ ext{period}}=Monitoring period duration

Mean Time to Resolution

MTTR=i=1N(Tresolve,iTalert,i)NMTTR = \frac{\sum_{i=1}^{N} (T_{\text{resolve},i} - T_{\text{alert},i})}{N}

Here,

  • MTTRMTTR=Mean time to resolution
  • Textresolve,iT_{ ext{resolve},i}=Resolution time of incident i
  • Textalert,iT_{ ext{alert},i}=Alert time of incident i
  • NN=Number of resolved incidents

Use StatsD or Prometheus for metrics collection. Grafana provides visualization dashboards. Alertmanager handles alert routing and deduplication.

Set up alerts for scheduler lag > 5 minutes, task failure rate > 5%, and queue depth > 100. These indicate potential system issues.

Key Concepts Table

Metric CategoryExamplesCollection MethodAlert Threshold
SchedulerLag, parse timeStatsD/Prometheus> 5min lag
TasksSuccess rate, durationCallbacks< 95% success
QueueDepth, wait timeDatabase queries> 100 queued
ResourcesCPU, memory, diskSystem metrics> 85% utilization
DatabaseQuery time, connectionsSQLAlchemy> 100ms query
SLAMiss rateSLA callbacksAny SLA miss

Code Examples

Alert Rules Configuration

# prometheus/alert_rules.yml
groups:
  - name: airflow_alerts
    rules:
      - alert: AirflowSchedulerLagHigh
        expr: airflow_scheduler_lag_seconds > 300
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Airflow scheduler lag is high"
          description: "Scheduler lag is {{ $value }} seconds"
      
      - alert: AirflowTaskFailureRateHigh
        expr: rate(airflow_task_failure_total[5m]) / rate(airflow_task_total[5m]) > 0.05
        for: 10m
        labels:
          severity: critical
        annotations:
          summary: "High task failure rate"
          description: "Task failure rate is {{ $value | humanizePercentage }}"
      
      - alert: AirflowQueueDepthHigh
        expr: airflow_executor_queue_depth > 100
        for: 15m
        labels:
          severity: warning
        annotations:
          summary: "High queue depth"
          description: "{{ $value }} tasks queued"
      
      - alert: AirflowDagRunStale
        expr: time() - airflow_dag_run_last_scheduling_decision > 3600
        for: 30m
        labels:
          severity: critical
        annotations:
          summary: "Stale DAG run detected"
          description: "DAG run has not been scheduled for {{ $value }} seconds"

Slack Alerting Integration

# alerting/slack_alert.py
import requests
import json
from datetime import datetime

class SlackAlerter:
    """Send alerts to Slack."""
    
    def __init__(self, webhook_url, channel='#airflow-alerts'):
        self.webhook_url = webhook_url
        self.channel = channel
    
    def send_alert(self, title, message, severity='warning', details=None):
        """Send alert to Slack."""
        color_map = {
            'info': '#36a64f',
            'warning': '#ff9900',
            'critical': '#ff0000',
        }
        
        payload = {
            'channel': self.channel,
            'username': 'Airflow Monitor',
            'icon_emoji': ':airflow:',
            'attachments': [{
                'color': color_map.get(severity, '#999'),
                'title': title,
                'text': message,
                'fields': [
                    {'title': 'Severity', 'value': severity, 'short': True},
                    {'title': 'Time', 'value': datetime.now().isoformat(), 'short': True},
                ],
                'footer': 'Airflow Monitoring',
            }],
        }
        
        if details:
            payload['attachments'][0]['fields'].append({
                'title': 'Details',
                'value': details,
                'short': False,
            })
        
        response = requests.post(
            self.webhook_url,
            data=json.dumps(payload),
            headers={'Content-Type': 'application/json'},
        )
        
        return response.status_code == 200

# Usage
alerter = SlackAlerter(webhook_url='https://hooks.slack.com/services/xxx')

def task_failure_alert(context):
    """Send alert on task failure."""
    ti = context['task_instance']
    
    alerter.send_alert(
        title=f"Task Failed: {ti.task_id}",
        message=f"DAG: {ti.dag_id}\nTask: {ti.task_id}\nTry: {ti.try_number}",
        severity='critical',
        details=f"Exception: {context.get('exception', 'Unknown')}",
    )

Monitoring Dashboard Script

# monitoring/dashboard.py
from airflow import settings
from airflow.models import DagRun, TaskInstance, DagModel
from sqlalchemy import func
from datetime import datetime, timedelta

def get_dashboard_data():
    """Get data for operational dashboard."""
    session = settings.Session()
    
    # Overall metrics
    total_dags = session.query(DagModel).filter(DagModel.is_active == True).count()
    
    # Task metrics (last 24h)
    task_stats = session.query(
        TaskInstance.state,
        func.count(TaskInstance.task_id)
    ).filter(
        TaskInstance.execution_date >= datetime.now() - timedelta(hours=24)
    ).group_by(TaskInstance.state).all()
    
    # Success rate
    total_tasks = sum(count for _, count in task_stats)
    successful_tasks = dict(task_stats).get('success', 0)
    success_rate = successful_tasks / total_tasks if total_tasks > 0 else 0
    
    # Average duration
    avg_duration = session.query(
        func.avg(TaskInstance.duration)
    ).filter(
        TaskInstance.state == 'success',
        TaskInstance.execution_date >= datetime.now() - timedelta(hours=24)
    ).scalar()
    
    # Slowest DAGs
    slow_dags = session.query(
        TaskInstance.dag_id,
        func.avg(TaskInstance.duration).label('avg_duration')
    ).filter(
        TaskInstance.state == 'success',
        TaskInstance.execution_date >= datetime.now() - timedelta(days=7)
    ).group_by(TaskInstance.dag_id).order_by(
        func.avg(TaskInstance.duration).desc()
    ).limit(5).all()
    
    # Error rate by DAG
    error_rates = session.query(
        TaskInstance.dag_id,
        func.count(TaskInstance.task_id).label('total'),
        func.count(TaskInstance.task_id).filter(TaskInstance.state == 'failed').label('failed')
    ).filter(
        TaskInstance.execution_date >= datetime.now() - timedelta(hours=24)
    ).group_by(TaskInstance.dag_id).all()
    
    return {
        'total_dags': total_dags,
        'task_stats': dict(task_stats),
        'success_rate': success_rate,
        'avg_duration': avg_duration,
        'slowest_dags': [{'dag_id': d[0], 'avg_duration': d[1]} for d in slow_dags],
        'error_rates': [
            {'dag_id': e[0], 'error_rate': e[2] / e[1] if e[1] > 0 else 0}
            for e in error_rates
        ],
    }

if __name__ == "__main__":
    data = get_dashboard_data()
    print(f"Total DAGs: {data['total_dags']}")
    print(f"Success Rate: {data['success_rate']:.2%}")
    print(f"Avg Duration: {data['avg_duration']:.2f}s")

Performance Metrics

Key Performance Indicators

KPITargetWarningCritical
Task Success Rate> 99%95-99%< 95%
Scheduler Lag< 60s60-300s> 300s
Avg Task Duration< 5min5-15min> 15min
Queue Depth< 5050-100> 100
MTTR< 15min15-30min> 30min
SLA Miss Rate0%< 1%> 1%

Alert Distribution

SeverityResponse TimeEscalationAuto-resolve
Critical5minImmediateNo
Warning30min1 hourPossible
InfoNext business dayNoneYes

Key Takeaways:

  • Use StatsD/Prometheus for metrics collection; Grafana for visualization
  • Monitor scheduler lag, task success rates, queue depth, and resource utilization
  • Set up alerts for critical thresholds: scheduler lag > 5min, failure rate > 5%
  • Implement custom metrics for business-specific KPIs
  • Use Alertmanager for alert routing and deduplication
  • Track MTTR and SLA miss rates for operational excellence

See Also

Advertisement

Need Expert Airflow Help?

Get personalized DAG design, scheduling optimization, or production Airflow consulting.

Advertisement