Monitoring and Alerting
Architecture Diagram
Formal Definitions
DfMetrics Collection
Metrics Collection is the process of gathering quantitative data about system behavior. In Airflow, metrics include task counts, execution times, queue depths, and resource utilization. The metric function is mapping system state to numeric values.
DfAlert Rule
An Alert Rule defines a condition that triggers a notification when violated. Formally, where is the monitored metric, is the condition, is the threshold, and is the action (notification).
DfSLA Monitoring
SLA Monitoring tracks whether tasks and DAGs complete within defined time limits. The SLA violation condition is where is actual completion time and is the target.
Detailed Explanation
Prometheus Configuration
# airflow.cfg
[metrics]
# Enable Prometheus metrics
statsd_on = True
statsd_host = localhost
statsd_port = 9125
statsd_prefix = airflow
# Prometheus exporter
prometheus_enabled = True
prometheus_port = 9091
# Custom metrics
statsd_allow_list = airflow.*.success,airflow.*.failure,airflow.*.running
Custom Metrics Implementation
# plugins/custom_metrics.py
from airflow.stats import Stats
from airflow.models import DagRun, TaskInstance
from airflow import settings
from datetime import datetime, timedelta
class AirflowMetrics:
"""Custom Airflow metrics for monitoring."""
@staticmethod
def emit_scheduler_metrics():
"""Emit scheduler performance metrics."""
session = settings.Session()
# Scheduler lag
last_heartbeat = session.query(
func.max(DagRun.last_scheduling_decision)
).scalar()
if last_heartbeat:
lag = (datetime.now() - last_heartbeat).total_seconds()
Stats.gauge('scheduler_lag_seconds', lag)
# Active DAG runs
active_runs = session.query(DagRun).filter(
DagRun.state == 'running'
).count()
Stats.gauge('dag_runs_active', active_runs)
# Task counts by state
for state in ['queued', 'running', 'success', 'failed']:
count = session.query(TaskInstance).filter(
TaskInstance.state == state
).count()
Stats.gauge(f'task_instances_{state}', count)
@staticmethod
def emit_task_metrics(dag_id, task_id, state, duration):
"""Emit task-level metrics."""
Stats.timing(f'task.duration.{dag_id}.{task_id}', duration)
Stats.incr(f'task.count.{dag_id}.{task_id}.{state}')
@staticmethod
def emit_queue_metrics():
"""Emit queue depth metrics."""
session = settings.Session()
# Executor queue depth
queued_tasks = session.query(TaskInstance).filter(
TaskInstance.state == 'queued'
).count()
Stats.gauge('executor.queue_depth', queued_tasks)
# Pool usage
from airflow.models import Pool
pools = session.query(Pool).all()
for pool in pools:
utilization = pool.occupied_slots / pool.slots if pool.slots > 0 else 0
Stats.gauge(f'pool.{pool.pool}.utilization', utilization)
# Register callbacks to emit metrics
from airflow.models import TaskInstance
def task_success_callback(context):
"""Emit metrics on task success."""
ti = context['task_instance']
duration = ti.duration
AirflowMetrics.emit_task_metrics(
dag_id=ti.dag_id,
task_id=ti.task_id,
state='success',
duration=duration,
)
def task_failure_callback(context):
"""Emit metrics on task failure."""
ti = context['task_instance']
AirflowMetrics.emit_task_metrics(
dag_id=ti.dag_id,
task_id=ti.task_id,
state='failed',
duration=0,
)
Grafana Dashboard Configuration
{
"dashboard": {
"title": "Airflow Overview",
"panels": [
{
"title": "Task Success Rate",
"type": "stat",
"targets": [
{
"expr": "rate(airflow_task_success_total[5m]) / (rate(airflow_task_success_total[5m]) + rate(airflow_task_failure_total[5m]))",
"legendFormat": "Success Rate"
}
],
"thresholds": [
{"value": 0.95, "color": "green"},
{"value": 0.9, "color": "yellow"},
{"value": 0.8, "color": "red"}
]
},
{
"title": "Task Duration",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(airflow_task_duration_seconds_bucket[5m]))",
"legendFormat": "P95 Duration"
}
]
},
{
"title": "Queue Depth",
"type": "graph",
"targets": [
{
"expr": "airflow_executor_queue_depth",
"legendFormat": "Queued Tasks"
}
]
},
{
"title": "Scheduler Lag",
"type": "stat",
"targets": [
{
"expr": "airflow_scheduler_lag_seconds",
"legendFormat": "Lag (seconds)"
}
],
"thresholds": [
{"value": 60, "color": "green"},
{"value": 300, "color": "yellow"},
{"value": 600, "color": "red"}
]
}
]
}
}
Here,
- =Alert rate (alerts per time unit)
- =Number of alerts in period
- =Monitoring period duration
Mean Time to Resolution
Here,
- =Mean time to resolution
- =Resolution time of incident i
- =Alert time of incident i
- =Number of resolved incidents
Use StatsD or Prometheus for metrics collection. Grafana provides visualization dashboards. Alertmanager handles alert routing and deduplication.
Set up alerts for scheduler lag > 5 minutes, task failure rate > 5%, and queue depth > 100. These indicate potential system issues.
Key Concepts Table
| Metric Category | Examples | Collection Method | Alert Threshold |
|---|---|---|---|
| Scheduler | Lag, parse time | StatsD/Prometheus | > 5min lag |
| Tasks | Success rate, duration | Callbacks | < 95% success |
| Queue | Depth, wait time | Database queries | > 100 queued |
| Resources | CPU, memory, disk | System metrics | > 85% utilization |
| Database | Query time, connections | SQLAlchemy | > 100ms query |
| SLA | Miss rate | SLA callbacks | Any SLA miss |
Code Examples
Alert Rules Configuration
# prometheus/alert_rules.yml
groups:
- name: airflow_alerts
rules:
- alert: AirflowSchedulerLagHigh
expr: airflow_scheduler_lag_seconds > 300
for: 5m
labels:
severity: warning
annotations:
summary: "Airflow scheduler lag is high"
description: "Scheduler lag is {{ $value }} seconds"
- alert: AirflowTaskFailureRateHigh
expr: rate(airflow_task_failure_total[5m]) / rate(airflow_task_total[5m]) > 0.05
for: 10m
labels:
severity: critical
annotations:
summary: "High task failure rate"
description: "Task failure rate is {{ $value | humanizePercentage }}"
- alert: AirflowQueueDepthHigh
expr: airflow_executor_queue_depth > 100
for: 15m
labels:
severity: warning
annotations:
summary: "High queue depth"
description: "{{ $value }} tasks queued"
- alert: AirflowDagRunStale
expr: time() - airflow_dag_run_last_scheduling_decision > 3600
for: 30m
labels:
severity: critical
annotations:
summary: "Stale DAG run detected"
description: "DAG run has not been scheduled for {{ $value }} seconds"
Slack Alerting Integration
# alerting/slack_alert.py
import requests
import json
from datetime import datetime
class SlackAlerter:
"""Send alerts to Slack."""
def __init__(self, webhook_url, channel='#airflow-alerts'):
self.webhook_url = webhook_url
self.channel = channel
def send_alert(self, title, message, severity='warning', details=None):
"""Send alert to Slack."""
color_map = {
'info': '#36a64f',
'warning': '#ff9900',
'critical': '#ff0000',
}
payload = {
'channel': self.channel,
'username': 'Airflow Monitor',
'icon_emoji': ':airflow:',
'attachments': [{
'color': color_map.get(severity, '#999'),
'title': title,
'text': message,
'fields': [
{'title': 'Severity', 'value': severity, 'short': True},
{'title': 'Time', 'value': datetime.now().isoformat(), 'short': True},
],
'footer': 'Airflow Monitoring',
}],
}
if details:
payload['attachments'][0]['fields'].append({
'title': 'Details',
'value': details,
'short': False,
})
response = requests.post(
self.webhook_url,
data=json.dumps(payload),
headers={'Content-Type': 'application/json'},
)
return response.status_code == 200
# Usage
alerter = SlackAlerter(webhook_url='https://hooks.slack.com/services/xxx')
def task_failure_alert(context):
"""Send alert on task failure."""
ti = context['task_instance']
alerter.send_alert(
title=f"Task Failed: {ti.task_id}",
message=f"DAG: {ti.dag_id}\nTask: {ti.task_id}\nTry: {ti.try_number}",
severity='critical',
details=f"Exception: {context.get('exception', 'Unknown')}",
)
Monitoring Dashboard Script
# monitoring/dashboard.py
from airflow import settings
from airflow.models import DagRun, TaskInstance, DagModel
from sqlalchemy import func
from datetime import datetime, timedelta
def get_dashboard_data():
"""Get data for operational dashboard."""
session = settings.Session()
# Overall metrics
total_dags = session.query(DagModel).filter(DagModel.is_active == True).count()
# Task metrics (last 24h)
task_stats = session.query(
TaskInstance.state,
func.count(TaskInstance.task_id)
).filter(
TaskInstance.execution_date >= datetime.now() - timedelta(hours=24)
).group_by(TaskInstance.state).all()
# Success rate
total_tasks = sum(count for _, count in task_stats)
successful_tasks = dict(task_stats).get('success', 0)
success_rate = successful_tasks / total_tasks if total_tasks > 0 else 0
# Average duration
avg_duration = session.query(
func.avg(TaskInstance.duration)
).filter(
TaskInstance.state == 'success',
TaskInstance.execution_date >= datetime.now() - timedelta(hours=24)
).scalar()
# Slowest DAGs
slow_dags = session.query(
TaskInstance.dag_id,
func.avg(TaskInstance.duration).label('avg_duration')
).filter(
TaskInstance.state == 'success',
TaskInstance.execution_date >= datetime.now() - timedelta(days=7)
).group_by(TaskInstance.dag_id).order_by(
func.avg(TaskInstance.duration).desc()
).limit(5).all()
# Error rate by DAG
error_rates = session.query(
TaskInstance.dag_id,
func.count(TaskInstance.task_id).label('total'),
func.count(TaskInstance.task_id).filter(TaskInstance.state == 'failed').label('failed')
).filter(
TaskInstance.execution_date >= datetime.now() - timedelta(hours=24)
).group_by(TaskInstance.dag_id).all()
return {
'total_dags': total_dags,
'task_stats': dict(task_stats),
'success_rate': success_rate,
'avg_duration': avg_duration,
'slowest_dags': [{'dag_id': d[0], 'avg_duration': d[1]} for d in slow_dags],
'error_rates': [
{'dag_id': e[0], 'error_rate': e[2] / e[1] if e[1] > 0 else 0}
for e in error_rates
],
}
if __name__ == "__main__":
data = get_dashboard_data()
print(f"Total DAGs: {data['total_dags']}")
print(f"Success Rate: {data['success_rate']:.2%}")
print(f"Avg Duration: {data['avg_duration']:.2f}s")
Performance Metrics
Key Performance Indicators
| KPI | Target | Warning | Critical |
|---|---|---|---|
| Task Success Rate | > 99% | 95-99% | < 95% |
| Scheduler Lag | < 60s | 60-300s | > 300s |
| Avg Task Duration | < 5min | 5-15min | > 15min |
| Queue Depth | < 50 | 50-100 | > 100 |
| MTTR | < 15min | 15-30min | > 30min |
| SLA Miss Rate | 0% | < 1% | > 1% |
Alert Distribution
| Severity | Response Time | Escalation | Auto-resolve |
|---|---|---|---|
| Critical | 5min | Immediate | No |
| Warning | 30min | 1 hour | Possible |
| Info | Next business day | None | Yes |
Key Takeaways:
- Use StatsD/Prometheus for metrics collection; Grafana for visualization
- Monitor scheduler lag, task success rates, queue depth, and resource utilization
- Set up alerts for critical thresholds: scheduler lag > 5min, failure rate > 5%
- Implement custom metrics for business-specific KPIs
- Use Alertmanager for alert routing and deduplication
- Track MTTR and SLA miss rates for operational excellence
See Also
- Error Handling — Error tracking and recovery
- Performance Tuning — Performance optimization
- Kubernetes Executor — K8s-specific monitoring
- Multi-Tenancy — Tenant-level monitoring