CW

Performance Tuning and Optimization in Apache Airflow

Free Lesson

Advertisement

Performance Tuning and Optimization

Architecture Diagram

Formal Definitions

DfScheduler Throughput

Scheduler Throughput is the number of tasks the scheduler can queue per unit time. It is bounded by the parsing rate RparseR_{\text{parse}}, database query rate RdbR_{\text{db}}, and executor dispatch rate RdispatchR_{\text{dispatch}}. The effective throughput is Teff=min(Rparse,Rdb,Rdispatch)T_{\text{eff}} = \min(R_{\text{parse}}, R_{\text{db}}, R_{\text{dispatch}}).

DfTask Latency

Task Latency is the time from task queuing to execution start. It includes executor queue time TqueueT_{\text{queue}}, worker assignment time TassignT_{\text{assign}}, and task startup time TstartupT_{\text{startup}}. The total latency is Ltask=Tqueue+Tassign+TstartupL_{\text{task}} = T_{\text{queue}} + T_{\text{assign}} + T_{\text{startup}}.

DfDatabase Connection Pool Efficiency

Connection Pool Efficiency measures how effectively database connections are reused. The efficiency is η=NreusedNtotal×100%\eta = \frac{N_{\text{reused}}}{N_{\text{total}}} \times 100\% where NreusedN_{\text{reused}} is connections reused from the pool and NtotalN_{\text{total}} is total connection attempts.

Detailed Explanation

Scheduler Optimization

# airflow.cfg - Scheduler performance tuning
[scheduler]
# How often to scan for new DAG files (seconds)
min_file_process_interval = 30

# How often to list DAG files (seconds)
dag_dir_list_interval = 300

# Maximum number of processes to parse DAG files
parsing_processes = 2

# Scheduler heartbeat interval (seconds)
scheduler_heartbeat_sec = 5

# Maximum number of tasks that can run concurrently
parallelism = 32

# Maximum number of task instances allowed to run concurrently
# across all DAGs
max_active_tasks_per_dag = 16

# Maximum number of DAG runs per DAG
max_active_runs_per_dag = 16

# File parsing timeout (seconds)
file_parsing_timeout = 30

# Use pickled DAG files
store_dag_code = True

# DAG serialization
store_serialized_dags = True

Database Optimization

# database_optimization.py
from airflow import settings
from sqlalchemy import text

def optimize_database():
    """Apply database optimizations."""
    session = settings.Session()
    
    # Analyze query performance
    analysis = session.execute(text("""
        SELECT query, mean_time, calls
        FROM pg_stat_statements
        WHERE dbid = (SELECT oid FROM pg_database WHERE datname = current_database())
        ORDER BY mean_time DESC
        LIMIT 10;
    """))
    
    print("Slow queries:")
    for row in analysis:
        print(f"  {row[0][:50]}... - {row[1]:.2f}ms ({row[2]} calls)")
    
    # Check index usage
    indexes = session.execute(text("""
        SELECT indexrelname, idx_scan, idx_tup_read, idx_tup_fetch
        FROM pg_stat_user_indexes
        WHERE schemaname = 'public'
        ORDER BY idx_scan DESC;
    """))
    
    print("\nIndex usage:")
    for row in indexes:
        print(f"  {row[0]}: {row[1]} scans, {row[2]} tuples read")
    
    # Create missing indexes
    missing_indexes = [
        """
        CREATE INDEX IF NOT EXISTS idx_task_instance_dag_run 
        ON task_instance(dag_id, run_id);
        """,
        """
        CREATE INDEX IF NOT EXISTS idx_task_instance_state 
        ON task_instance(state);
        """,
        """
        CREATE INDEX IF NOT EXISTS idx_dag_run_state 
        ON dag_run(state);
        """,
        """
        CREATE INDEX IF NOT EXISTS idx_task_instance_executor_state 
        ON task_instance(executor_state);
        """,
    ]
    
    for idx_sql in missing_indexes:
        try:
            session.execute(text(idx_sql))
            session.commit()
        except Exception as e:
            print(f"Index creation failed: {e}")
            session.rollback()

# Connection pool optimization
def configure_connection_pool():
    """Configure optimal connection pool settings."""
    engine = settings.engine
    
    # Recommended settings for 100 concurrent tasks
    pool_config = {
        'pool_size': 20,           # Base connections
        'max_overflow': 30,        # Extra connections for peaks
        'pool_timeout': 30,        # Wait time for connection
        'pool_recycle': 1800,      # Recycle after 30 minutes
        'pool_pre_ping': True,     # Verify connections
    }
    
    return pool_config

Worker Optimization

# worker_optimization.py
import psutil
import os

def get_worker_recommendations():
    """Get resource recommendations based on system specs."""
    cpu_count = psutil.cpu_count()
    memory = psutil.virtual_memory()
    
    # Celery worker configuration
    worker_config = {
        # Concurrency = CPU cores (for CPU-bound tasks)
        # Concurrency = 2 * CPU cores (for I/O-bound tasks)
        'concurrency': min(cpu_count, 16),
        
        # Prefetch multiplier - how many tasks to prefetch
        'prefetch_multiplier': 1,
        
        # Maximum tasks per child before worker restart
        'max_tasks_per_child': 200,
        
        # Worker memory limit
        'max_memory_per_child': int(memory.total * 0.8 / cpu_count),
        
        # Task time limit (seconds)
        'task_time_limit': 3600,
        
        # Soft time limit (seconds) - raises SoftTimeLimitExceeded
        'task_soft_time_limit': 3000,
    }
    
    return worker_config

def monitor_worker_health():
    """Monitor worker health metrics."""
    import psutil
    
    metrics = {
        'cpu_percent': psutil.cpu_percent(interval=1),
        'memory_percent': psutil.virtual_memory().percent,
        'disk_usage': psutil.disk_usage('/').percent,
        'open_files': len(psutil.Process().open_files()),
        'connections': len(psutil.Process().connections()),
    }
    
    # Alert thresholds
    alerts = []
    if metrics['cpu_percent'] > 90:
        alerts.append(f"High CPU: {metrics['cpu_percent']}%")
    if metrics['memory_percent'] > 85:
        alerts.append(f"High Memory: {metrics['memory_percent']}%")
    if metrics['disk_usage'] > 90:
        alerts.append(f"High Disk: {metrics['disk_usage']}%")
    
    return {
        'metrics': metrics,
        'alerts': alerts,
        'healthy': len(alerts) == 0,
    }
Scheduler Throughput
Teff=min(Rparse,Rdb,Rdispatch)T_{\text{eff}} = \min(R_{\text{parse}}, R_{\text{db}}, R_{\text{dispatch}})

Here,

  • TexteffT_{ ext{eff}}=Effective scheduler throughput
  • RextparseR_{ ext{parse}}=DAG parsing rate (DAGs/second)
  • RextdbR_{ ext{db}}=Database query rate (queries/second)
  • RextdispatchR_{ ext{dispatch}}=Executor dispatch rate (tasks/second)

Task End-to-End Latency

Le2e=Lqueue+Lassign+Lstartup+TexecL_{\text{e2e}} = L_{\text{queue}} + L_{\text{assign}} + L_{\text{startup}} + T_{\text{exec}}

Here,

  • Lexte2eL_{ ext{e2e}}=End-to-end task latency
  • LextqueueL_{ ext{queue}}=Time in executor queue
  • LextassignL_{ ext{assign}}=Worker assignment time
  • LextstartupL_{ ext{startup}}=Task process startup time
  • TextexecT_{ ext{exec}}=Actual task execution time

Database Query Optimization Score

Sdb=Nindex_hitsNtotal_scans×100%S_{\text{db}} = \frac{N_{\text{index\_hits}}}{N_{\text{total\_scans}}} \times 100\%

Here,

  • SextdbS_{ ext{db}}=Index hit ratio (higher is better)
  • NextindexhitsN_{ ext{index_hits}}=Queries using indexes
  • NexttotalscansN_{ ext{total_scans}}=Total table scans

For CPU-bound tasks, set worker concurrency equal to CPU cores. For I/O-bound tasks, set concurrency to 2-4x CPU cores. Monitor worker memory usage to prevent OOM kills.

Enable DAG serialization (store_serialized_dags = True) to reduce scheduler memory usage. This stores DAG definitions in the database instead of re-parsing files.

Key Concepts Table

Optimization AreaMetricTargetImpact
DAG ParsingParse time< 1s per DAGHigh
Task LatencyQueue to start< 5sHigh
DB Query TimeAverage query< 100msHigh
Worker MemoryPer-worker< 4GBMedium
XCom SizePer operation< 48KBMedium
Log StorageDaily volume< 10GB/dayLow
Scheduler HeartbeatInterval5sLow

Code Examples

Performance Monitoring Dashboard

# performance_monitoring.py
from airflow import settings
from airflow.models import DagRun, TaskInstance, DagModel
from sqlalchemy import text, func
from datetime import datetime, timedelta

def get_performance_metrics():
    """Collect comprehensive performance metrics."""
    session = settings.Session()
    
    # Scheduler metrics
    scheduler_metrics = {
        'active_dags': session.query(DagModel).filter(
            DagModel.is_active == True
        ).count(),
        'total_dag_runs': session.query(DagRun).count(),
        'recent_runs_1h': session.query(DagRun).filter(
            DagRun.execution_date >= datetime.now() - timedelta(hours=1)
        ).count(),
    }
    
    # Task metrics
    task_stats = session.query(
        TaskInstance.state,
        func.count(TaskInstance.task_id)
    ).group_by(TaskInstance.state).all()
    
    task_metrics = dict(task_stats)
    
    # Performance metrics
    avg_task_duration = session.query(
        func.avg(TaskInstance.duration)
    ).filter(
        TaskInstance.state == 'success',
        TaskInstance.execution_date >= datetime.now() - timedelta(hours=24)
    ).scalar()
    
    # Error rate
    total_tasks_24h = session.query(TaskInstance).filter(
        TaskInstance.execution_date >= datetime.now() - timedelta(hours=24)
    ).count()
    
    failed_tasks_24h = session.query(TaskInstance).filter(
        TaskInstance.state == 'failed',
        TaskInstance.execution_date >= datetime.now() - timedelta(hours=24)
    ).count()
    
    error_rate = failed_tasks_24h / total_tasks_24h if total_tasks_24h > 0 else 0
    
    return {
        'scheduler': scheduler_metrics,
        'tasks': task_metrics,
        'avg_duration_seconds': avg_task_duration,
        'error_rate_24h': error_rate,
        'timestamp': datetime.now().isoformat(),
    }

def identify_slow_dags(top_n=10):
    """Identify slowest DAGs by average task duration."""
    session = settings.Session()
    
    slow_dags = session.query(
        TaskInstance.dag_id,
        func.avg(TaskInstance.duration).label('avg_duration'),
        func.count(TaskInstance.task_id).label('task_count')
    ).filter(
        TaskInstance.state == 'success',
        TaskInstance.execution_date >= datetime.now() - timedelta(days=7)
    ).group_by(
        TaskInstance.dag_id
    ).order_by(
        func.avg(TaskInstance.duration).desc()
    ).limit(top_n).all()
    
    return [
        {
            'dag_id': row[0],
            'avg_duration': row[1],
            'task_count': row[2],
        }
        for row in slow_dags
    ]

if __name__ == "__main__":
    metrics = get_performance_metrics()
    slow_dags = identify_slow_dags()
    
    print("Performance Metrics:")
    print(f"  Active DAGs: {metrics['scheduler']['active_dags']}")
    print(f"  Avg task duration: {metrics['avg_duration_seconds']:.2f}s")
    print(f"  Error rate (24h): {metrics['error_rate_24h']:.2%}")
    
    print("\nSlowest DAGs:")
    for dag in slow_dags:
        print(f"  {dag['dag_id']}: {dag['avg_duration']:.2f}s avg ({dag['task_count']} tasks)")

DAG Optimization Patterns

from airflow.decorators import task, dag
from datetime import datetime, timedelta
import asyncio

@dag(
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['performance', 'optimization'],
)
def optimized_dag():
    
    @task
    def batch_processing():
        """Process data in batches for better performance."""
        import pandas as pd
        
        # Read data in chunks
        chunk_size = 10000
        total_processed = 0
        
        for chunk in pd.read_csv('/data/large_file.csv', chunksize=chunk_size):
            # Process chunk
            processed = chunk.dropna().drop_duplicates()
            total_processed += len(processed)
        
        return {'processed': total_processed}
    
    @task
    def parallel_processing():
        """Process independent tasks in parallel."""
        import concurrent.futures
        
        def process_item(item):
            # Simulate processing
            return item * 2
        
        items = list(range(1000))
        
        # Use thread pool for I/O-bound tasks
        with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
            results = list(executor.map(process_item, items))
        
        return {'processed': len(results)}
    
    @task
    def cached_computation():
        """Cache expensive computations."""
        from airflow.models import Variable
        import json
        import hashlib
        
        # Check cache
        cache_key = "expensive_computation_result"
        cached = Variable.get(cache_key, default_var=None)
        
        if cached:
            return json.loads(cached)
        
        # Perform expensive computation
        result = sum(i ** 2 for i in range(100000))
        
        # Cache result
        Variable.set(cache_key, json.dumps({'result': result}))
        
        return {'result': result}
    
    batch_processing() >> parallel_processing() >> cached_computation()

optimized_dag()

Resource-Aware Task Scheduling

from airflow.decorators import task, dag
from datetime import datetime
import psutil

@dag(
    schedule_interval="@hourly",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['performance', 'resource-aware'],
)
def resource_aware_dag():
    
    @task
    def check_resources():
        """Check available resources before processing."""
        cpu_percent = psutil.cpu_percent(interval=1)
        memory = psutil.virtual_memory()
        
        return {
            'cpu_available': 100 - cpu_percent,
            'memory_available_percent': 100 - memory.percent,
            'should_process': cpu_percent < 80 and memory.percent < 85,
        }
    
    @task
    def adaptive_processing(resources: dict):
        """Adapt processing based on available resources."""
        if not resources['should_process']:
            return {'status': 'skipped', 'reason': 'insufficient_resources'}
        
        # Adjust batch size based on available memory
        batch_size = int(resources['memory_available_percent'] * 100)
        
        return {
            'status': 'processing',
            'batch_size': batch_size,
        }
    
    resources = check_resources()
    adaptive_processing(resources)

resource_aware_dag()

Performance Metrics

Optimization Impact

OptimizationBeforeAfterImprovement
DAG Serialization10s parse2s parse80% faster
DB Indexing500ms query50ms query90% faster
Connection Pooling100ms connect10ms connect90% faster
Worker Concurrency4 tasks16 tasks4x throughput
XCom Backend500ms push50ms push90% faster

Resource Utilization

ResourceRecommendedWarningCritical
CPU< 70%70-85%> 85%
Memory< 70%70-85%> 85%
Disk I/O< 70%70-85%> 85%
Network< 50%50-80%> 80%
DB Connections< 70%70-85%> 85%

Key Takeaways:

  • Tune scheduler parameters: min_file_process_interval, parsing_processes, parallelism
  • Optimize database with proper indexing and connection pooling
  • Set worker concurrency based on task type (CPU vs I/O bound)
  • Use DAG serialization to reduce scheduler memory usage
  • Monitor resource utilization and set alert thresholds
  • Batch large operations and cache expensive computations

See Also

Advertisement

Need Expert Airflow Help?

Get personalized DAG design, scheduling optimization, or production Airflow consulting.

Advertisement