Performance Tuning and Optimization
Architecture Diagram
Formal Definitions
DfScheduler Throughput
Scheduler Throughput is the number of tasks the scheduler can queue per unit time. It is bounded by the parsing rate , database query rate , and executor dispatch rate . The effective throughput is .
DfTask Latency
Task Latency is the time from task queuing to execution start. It includes executor queue time , worker assignment time , and task startup time . The total latency is .
DfDatabase Connection Pool Efficiency
Connection Pool Efficiency measures how effectively database connections are reused. The efficiency is where is connections reused from the pool and is total connection attempts.
Detailed Explanation
Scheduler Optimization
# airflow.cfg - Scheduler performance tuning
[scheduler]
# How often to scan for new DAG files (seconds)
min_file_process_interval = 30
# How often to list DAG files (seconds)
dag_dir_list_interval = 300
# Maximum number of processes to parse DAG files
parsing_processes = 2
# Scheduler heartbeat interval (seconds)
scheduler_heartbeat_sec = 5
# Maximum number of tasks that can run concurrently
parallelism = 32
# Maximum number of task instances allowed to run concurrently
# across all DAGs
max_active_tasks_per_dag = 16
# Maximum number of DAG runs per DAG
max_active_runs_per_dag = 16
# File parsing timeout (seconds)
file_parsing_timeout = 30
# Use pickled DAG files
store_dag_code = True
# DAG serialization
store_serialized_dags = True
Database Optimization
# database_optimization.py
from airflow import settings
from sqlalchemy import text
def optimize_database():
"""Apply database optimizations."""
session = settings.Session()
# Analyze query performance
analysis = session.execute(text("""
SELECT query, mean_time, calls
FROM pg_stat_statements
WHERE dbid = (SELECT oid FROM pg_database WHERE datname = current_database())
ORDER BY mean_time DESC
LIMIT 10;
"""))
print("Slow queries:")
for row in analysis:
print(f" {row[0][:50]}... - {row[1]:.2f}ms ({row[2]} calls)")
# Check index usage
indexes = session.execute(text("""
SELECT indexrelname, idx_scan, idx_tup_read, idx_tup_fetch
FROM pg_stat_user_indexes
WHERE schemaname = 'public'
ORDER BY idx_scan DESC;
"""))
print("\nIndex usage:")
for row in indexes:
print(f" {row[0]}: {row[1]} scans, {row[2]} tuples read")
# Create missing indexes
missing_indexes = [
"""
CREATE INDEX IF NOT EXISTS idx_task_instance_dag_run
ON task_instance(dag_id, run_id);
""",
"""
CREATE INDEX IF NOT EXISTS idx_task_instance_state
ON task_instance(state);
""",
"""
CREATE INDEX IF NOT EXISTS idx_dag_run_state
ON dag_run(state);
""",
"""
CREATE INDEX IF NOT EXISTS idx_task_instance_executor_state
ON task_instance(executor_state);
""",
]
for idx_sql in missing_indexes:
try:
session.execute(text(idx_sql))
session.commit()
except Exception as e:
print(f"Index creation failed: {e}")
session.rollback()
# Connection pool optimization
def configure_connection_pool():
"""Configure optimal connection pool settings."""
engine = settings.engine
# Recommended settings for 100 concurrent tasks
pool_config = {
'pool_size': 20, # Base connections
'max_overflow': 30, # Extra connections for peaks
'pool_timeout': 30, # Wait time for connection
'pool_recycle': 1800, # Recycle after 30 minutes
'pool_pre_ping': True, # Verify connections
}
return pool_config
Worker Optimization
# worker_optimization.py
import psutil
import os
def get_worker_recommendations():
"""Get resource recommendations based on system specs."""
cpu_count = psutil.cpu_count()
memory = psutil.virtual_memory()
# Celery worker configuration
worker_config = {
# Concurrency = CPU cores (for CPU-bound tasks)
# Concurrency = 2 * CPU cores (for I/O-bound tasks)
'concurrency': min(cpu_count, 16),
# Prefetch multiplier - how many tasks to prefetch
'prefetch_multiplier': 1,
# Maximum tasks per child before worker restart
'max_tasks_per_child': 200,
# Worker memory limit
'max_memory_per_child': int(memory.total * 0.8 / cpu_count),
# Task time limit (seconds)
'task_time_limit': 3600,
# Soft time limit (seconds) - raises SoftTimeLimitExceeded
'task_soft_time_limit': 3000,
}
return worker_config
def monitor_worker_health():
"""Monitor worker health metrics."""
import psutil
metrics = {
'cpu_percent': psutil.cpu_percent(interval=1),
'memory_percent': psutil.virtual_memory().percent,
'disk_usage': psutil.disk_usage('/').percent,
'open_files': len(psutil.Process().open_files()),
'connections': len(psutil.Process().connections()),
}
# Alert thresholds
alerts = []
if metrics['cpu_percent'] > 90:
alerts.append(f"High CPU: {metrics['cpu_percent']}%")
if metrics['memory_percent'] > 85:
alerts.append(f"High Memory: {metrics['memory_percent']}%")
if metrics['disk_usage'] > 90:
alerts.append(f"High Disk: {metrics['disk_usage']}%")
return {
'metrics': metrics,
'alerts': alerts,
'healthy': len(alerts) == 0,
}
Here,
- =Effective scheduler throughput
- =DAG parsing rate (DAGs/second)
- =Database query rate (queries/second)
- =Executor dispatch rate (tasks/second)
Task End-to-End Latency
Here,
- =End-to-end task latency
- =Time in executor queue
- =Worker assignment time
- =Task process startup time
- =Actual task execution time
Database Query Optimization Score
Here,
- =Index hit ratio (higher is better)
- =Queries using indexes
- =Total table scans
For CPU-bound tasks, set worker concurrency equal to CPU cores. For I/O-bound tasks, set concurrency to 2-4x CPU cores. Monitor worker memory usage to prevent OOM kills.
Enable DAG serialization (store_serialized_dags = True) to reduce scheduler memory usage. This stores DAG definitions in the database instead of re-parsing files.
Key Concepts Table
| Optimization Area | Metric | Target | Impact |
|---|---|---|---|
| DAG Parsing | Parse time | < 1s per DAG | High |
| Task Latency | Queue to start | < 5s | High |
| DB Query Time | Average query | < 100ms | High |
| Worker Memory | Per-worker | < 4GB | Medium |
| XCom Size | Per operation | < 48KB | Medium |
| Log Storage | Daily volume | < 10GB/day | Low |
| Scheduler Heartbeat | Interval | 5s | Low |
Code Examples
Performance Monitoring Dashboard
# performance_monitoring.py
from airflow import settings
from airflow.models import DagRun, TaskInstance, DagModel
from sqlalchemy import text, func
from datetime import datetime, timedelta
def get_performance_metrics():
"""Collect comprehensive performance metrics."""
session = settings.Session()
# Scheduler metrics
scheduler_metrics = {
'active_dags': session.query(DagModel).filter(
DagModel.is_active == True
).count(),
'total_dag_runs': session.query(DagRun).count(),
'recent_runs_1h': session.query(DagRun).filter(
DagRun.execution_date >= datetime.now() - timedelta(hours=1)
).count(),
}
# Task metrics
task_stats = session.query(
TaskInstance.state,
func.count(TaskInstance.task_id)
).group_by(TaskInstance.state).all()
task_metrics = dict(task_stats)
# Performance metrics
avg_task_duration = session.query(
func.avg(TaskInstance.duration)
).filter(
TaskInstance.state == 'success',
TaskInstance.execution_date >= datetime.now() - timedelta(hours=24)
).scalar()
# Error rate
total_tasks_24h = session.query(TaskInstance).filter(
TaskInstance.execution_date >= datetime.now() - timedelta(hours=24)
).count()
failed_tasks_24h = session.query(TaskInstance).filter(
TaskInstance.state == 'failed',
TaskInstance.execution_date >= datetime.now() - timedelta(hours=24)
).count()
error_rate = failed_tasks_24h / total_tasks_24h if total_tasks_24h > 0 else 0
return {
'scheduler': scheduler_metrics,
'tasks': task_metrics,
'avg_duration_seconds': avg_task_duration,
'error_rate_24h': error_rate,
'timestamp': datetime.now().isoformat(),
}
def identify_slow_dags(top_n=10):
"""Identify slowest DAGs by average task duration."""
session = settings.Session()
slow_dags = session.query(
TaskInstance.dag_id,
func.avg(TaskInstance.duration).label('avg_duration'),
func.count(TaskInstance.task_id).label('task_count')
).filter(
TaskInstance.state == 'success',
TaskInstance.execution_date >= datetime.now() - timedelta(days=7)
).group_by(
TaskInstance.dag_id
).order_by(
func.avg(TaskInstance.duration).desc()
).limit(top_n).all()
return [
{
'dag_id': row[0],
'avg_duration': row[1],
'task_count': row[2],
}
for row in slow_dags
]
if __name__ == "__main__":
metrics = get_performance_metrics()
slow_dags = identify_slow_dags()
print("Performance Metrics:")
print(f" Active DAGs: {metrics['scheduler']['active_dags']}")
print(f" Avg task duration: {metrics['avg_duration_seconds']:.2f}s")
print(f" Error rate (24h): {metrics['error_rate_24h']:.2%}")
print("\nSlowest DAGs:")
for dag in slow_dags:
print(f" {dag['dag_id']}: {dag['avg_duration']:.2f}s avg ({dag['task_count']} tasks)")
DAG Optimization Patterns
from airflow.decorators import task, dag
from datetime import datetime, timedelta
import asyncio
@dag(
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['performance', 'optimization'],
)
def optimized_dag():
@task
def batch_processing():
"""Process data in batches for better performance."""
import pandas as pd
# Read data in chunks
chunk_size = 10000
total_processed = 0
for chunk in pd.read_csv('/data/large_file.csv', chunksize=chunk_size):
# Process chunk
processed = chunk.dropna().drop_duplicates()
total_processed += len(processed)
return {'processed': total_processed}
@task
def parallel_processing():
"""Process independent tasks in parallel."""
import concurrent.futures
def process_item(item):
# Simulate processing
return item * 2
items = list(range(1000))
# Use thread pool for I/O-bound tasks
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(process_item, items))
return {'processed': len(results)}
@task
def cached_computation():
"""Cache expensive computations."""
from airflow.models import Variable
import json
import hashlib
# Check cache
cache_key = "expensive_computation_result"
cached = Variable.get(cache_key, default_var=None)
if cached:
return json.loads(cached)
# Perform expensive computation
result = sum(i ** 2 for i in range(100000))
# Cache result
Variable.set(cache_key, json.dumps({'result': result}))
return {'result': result}
batch_processing() >> parallel_processing() >> cached_computation()
optimized_dag()
Resource-Aware Task Scheduling
from airflow.decorators import task, dag
from datetime import datetime
import psutil
@dag(
schedule_interval="@hourly",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['performance', 'resource-aware'],
)
def resource_aware_dag():
@task
def check_resources():
"""Check available resources before processing."""
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
return {
'cpu_available': 100 - cpu_percent,
'memory_available_percent': 100 - memory.percent,
'should_process': cpu_percent < 80 and memory.percent < 85,
}
@task
def adaptive_processing(resources: dict):
"""Adapt processing based on available resources."""
if not resources['should_process']:
return {'status': 'skipped', 'reason': 'insufficient_resources'}
# Adjust batch size based on available memory
batch_size = int(resources['memory_available_percent'] * 100)
return {
'status': 'processing',
'batch_size': batch_size,
}
resources = check_resources()
adaptive_processing(resources)
resource_aware_dag()
Performance Metrics
Optimization Impact
| Optimization | Before | After | Improvement |
|---|---|---|---|
| DAG Serialization | 10s parse | 2s parse | 80% faster |
| DB Indexing | 500ms query | 50ms query | 90% faster |
| Connection Pooling | 100ms connect | 10ms connect | 90% faster |
| Worker Concurrency | 4 tasks | 16 tasks | 4x throughput |
| XCom Backend | 500ms push | 50ms push | 90% faster |
Resource Utilization
| Resource | Recommended | Warning | Critical |
|---|---|---|---|
| CPU | < 70% | 70-85% | > 85% |
| Memory | < 70% | 70-85% | > 85% |
| Disk I/O | < 70% | 70-85% | > 85% |
| Network | < 50% | 50-80% | > 80% |
| DB Connections | < 70% | 70-85% | > 85% |
Key Takeaways:
- Tune scheduler parameters:
min_file_process_interval,parsing_processes,parallelism - Optimize database with proper indexing and connection pooling
- Set worker concurrency based on task type (CPU vs I/O bound)
- Use DAG serialization to reduce scheduler memory usage
- Monitor resource utilization and set alert thresholds
- Batch large operations and cache expensive computations
See Also
- Airflow Architecture — Core architecture components
- Executors Comparison — Choosing the right executor
- Kubernetes Executor — Dynamic scaling with K8s
- Monitoring and Alerting — Performance monitoring