🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Topic: Apache Airflow Architecture Deep Dive

Apache Airflow AdvancedAirflow Architecture⭐ Premium

Advertisement

Apache Airflow Architecture

Deep Dive into Airflow's Core Components

AmazonGoogleDifficulty: Advanced

Interview Question

ℹ️Interview Context

Company: Amazon AWS / Google Cloud Role: Senior Data Engineer / Data Platform Engineer Difficulty: Advanced Time: 45-60 minutes

Question: "Explain the complete architecture of Apache Airflow. How do the Scheduler, Worker, Webserver, and Metadata DB interact? What happens when a DAG file is parsed, and how does the Scheduler determine which tasks to execute?"


Detailed Theory

Core Architecture Overview

Apache Airflow follows a distributed architecture with four primary components that work together to orchestrate workflows:

AIRFLOW ARCHITECTUREWebserver(Flask)Metadata DB(PostgreSQL)Scheduler(Daemon)Nginx(Optional)Workers(Celery / K8s)DAG Files(Filesystem)

1. Metadata Database

The Metadata Database is the central nervous system of Airflow:

# Metadata DB Schema Key Tables
"""
Tables and their purposes:
- dag: Stores DAG metadata (dag_id, file_token, owners, etc.)
- dag_run: Records each DAG execution
- task_instance: Tracks individual task execution status
- xcom: Stores cross-task communication data
- connection: Stores connection details for hooks/operators
- variable: Key-value store for Airflow variables
- slas: Tracks SLA miss information
- log: Stores task execution logs
"""

# Database Connection Configuration
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN = "postgresql+psycopg2://airflow:airflow@localhost/airflow"

# Important: Use connection pooling
AIRFLOW__DATABASE__SQL_ALCHEMY_POOL_SIZE = 5
AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_OVERFLOW = 10
AIRFLOW__DATABASE__SQL_ALCHEMY_POOL_RECYCLE = 1800

⚠️Production Tip

Never use SQLite in production! SQLite doesn't support concurrent writes and will cause issues with multiple Scheduler/Worker processes. Always use PostgreSQL or MySQL.

2. Scheduler

The Scheduler is the brain of Airflow that orchestrates everything:

# scheduler_config.py
from airflow.configuration import conf

# Scheduler Configuration
SCHEDULER_CONFIG = {
    # How often the scheduler checks for new DAG runs
    "scheduler_heartbeat_sec": 5,
    
    # Number of times to try to schedule DAG runs
    "max_tis_per_query": 512,
    
    # Number of DAG files to parse per loop
    "parsing_processes": 2,
    
    # Min files to process before starting
    "min_file_process_interval": 30,
    
    # DagBag size limit
    "dag_bag_import_timeout": 30,
    
    # How often to clean up old task instances
    "clean_tis_interval": 300,
    
    # Scheduler mode: sequential, local, or distributed
    "scheduler_mode": "local",
}

Scheduler Lifecycle:

# scheduler_lifecycle.py
"""
The Scheduler performs these operations in a loop:

1. PARSE PHASE:
   - Scans DAG folder for Python files
   - Imports DAGs and builds DagBag
   - Detects changes in DAG files (via file hash)
   - Updates dag table in metadata DB

2. SCHEDULE PHASE:
   - Checks for active DAGs
   - Creates DagRun entries based on schedule_interval
   - Respects start_date, end_date, and catchup settings

3. EXECUTE PHASE:
   - Identifies tasks ready for execution
   - Checks pool slots availability
   - Respects task dependencies
   - Queues tasks to executor

4. MONITOR PHASE:
   - Tracks running task instances
   - Handles task timeouts
   - Processes callbacks
   - Cleans up old records
"""

# Example: Scheduler health check
from airflow.jobs.scheduler_job import SchedulerJob
from airflow.utils.session import create_session

def check_scheduler_health():
    with create_session() as session:
        scheduler = session.query(SchedulerJob).filter(
            SchedulerJob.state == 'running'
        ).first()
        
        if scheduler:
            print(f"Scheduler is running with PID: {scheduler.pid}")
            print(f"Last heartbeat: {scheduler.latest_heartbeat}")
        else:
            print("WARNING: No active scheduler found!")

3. Webserver

The Webserver provides the user interface and API:

# webserver_config.py
"""
Webserver Components:
- Flask application serving the UI
- Role-Based Access Control (RBAC)
- REST API endpoints
- Authentication backends
"""

# RBAC Configuration
ROLES = [
    {
        'role': 'Admin',
        'perms': [
            ('can_read', 'Dag'),
            ('can_write', 'Dag'),
            ('can_read', 'Connection'),
            ('can_write', 'Connection'),
            ('can_read', 'Variable'),
            ('can_write', 'Variable'),
            ('menu_access', 'Admin'),
        ]
    },
    {
        'role': 'Op',
        'perms': [
            ('can_read', 'Dag'),
            ('can_write', 'Dag'),
            ('can_read', 'Connection'),
            ('menu_access', 'DAGs'),
        ]
    },
    {
        'role': 'Viewer',
        'perms': [
            ('can_read', 'Dag'),
            ('menu_access', 'DAGs'),
        ]
    },
]

# Authentication Backend
AUTH_TYPE = AUTH_LDAP  # or AUTH_DB, AUTH_OAUTH, AUTH_REMOTE_USER

# API Configuration
AIRFLOW__API__AUTH_BACKENDS = [
    'airflow.api.auth.backend.basic_auth',
    'airflow.api.auth.backend.session_auth',
]

4. Executors

Executors determine how tasks are run:

# executor_comparison.py
"""
Executor Types and Use Cases:

1. SequentialExecutor:
   - Runs tasks one at a time
   - Uses metadata DB for state
   - Good for: Testing, small deployments
   - Limitation: Single task at a time

2. LocalExecutor:
   - Runs tasks in parallel using processes
   - Uses metadata DB for state
   - Good for: Single-machine production
   - Limitation: Limited by machine resources

3. CeleryExecutor:
   - Distributes tasks across multiple workers
   - Uses Redis/RabbitMQ as broker
   - Good for: Multi-machine production
   - Limitation: Requires message broker setup

4. KubernetesExecutor:
   - Runs each task in a separate Pod
   - Dynamic resource allocation
   - Good for: Cloud-native, elastic scaling
   - Limitation: Kubernetes overhead
"""

# Celery Executor Configuration
AIRFLOW__CORE__EXECUTOR = 'airflow.executors.celery_executor.CeleryExecutor'
AIRFLOW__CELERY__BROKER_URL = 'redis://localhost:6379/0'
AIRFLOW__CELERY__RESULT_BACKEND = 'db+postgresql://airflow:airflow@localhost/airflow'

# Kubernetes Executor Configuration
AIRFLOW__CORE__EXECUTOR = 'airflow.executors.kubernetes_executor.KubernetesExecutor'
AIRFLOW__KUBERNETES__NAMESPACE = 'airflow'
AIRFLOW__KUBERNETES__WORKER_CONTAINER_IMAGE = 'apache/airflow:latest'
AIRFLOW__KUBERNETES__DELETE_WORKER_PODS = True

ℹ️Pro Tip

At Amazon scale, they use CeleryExecutor with Redis cluster for broker. The key insight is to size your Celery workers based on task complexity, not just task count. CPU-intensive tasks need fewer concurrent tasks per worker.

5. DAG Parsing Mechanism

# dag_parsing.py
"""
DAG Parsing Flow:

1. Scheduler starts parsing process
2. For each DAG file in dags_folder:
   a. Check file modification time (or hash)
   b. If changed, import the module
   c. Execute module-level code to find DAG objects
   d. Register DAG in DagBag
   e. Update metadata DB

3. Parsed DAGs are cached
4. Re-parsing happens every min_file_process_interval

DagBag is an in-memory dictionary of DAG objects.
"""

from airflow.models.dagbag import DagBag
import os

# Example: Manual DagBag inspection
def inspect_dagbag():
    dag_bag = DagBag(dag_folder='/opt/airflow/dags', include_examples=False)
    
    print(f"Total DAGs loaded: {len(dag_bag.dags)}")
    print(f"Import errors: {dag_bag.import_errors}")
    
    for dag_id, dag in dag_bag.dags.items():
        print(f"\nDAG: {dag_id}")
        print(f"  Schedule: {dag.schedule_interval}")
        print(f"  Tasks: {len(dag.tasks)}")
        print(f"  Owners: {dag.owner}")

Real-World Scenarios

Scenario 1: High-Availability Scheduler Setup

# high_availability_scheduler.py
"""
Problem: You need to set up Airflow with high availability
for a mission-critical pipeline at Amazon.

Solution: Active-Passive Scheduler with Celery Executor
"""

# airflow.cfg for HA setup
HA_CONFIG = """
[scheduler]
# Active scheduler
scheduler_heartbeat_sec = 5
max_tis_per_query = 512
parsing_processes = 4
child_process_connection_timeout = 300

[celery]
# Use Redis Cluster for HA broker
broker_url = redis://redis-cluster:6379/0
result_backend = db+postgresql://airflow:airflow@postgres-cluster/airflow
worker_concurrency = 16
worker_prefetch_multiplier = 1

[core]
# Use PostgreSQL with replication
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@pgbouncer/airflow
sql_alchemy_pool_size = 20
sql_alchemy_max_overflow = 30
"""

# Health check script
def check_scheduler_ha():
    """
    In HA setup:
    1. Multiple scheduler instances can run
    2. Database locking prevents conflicts
    3. One scheduler becomes leader
    4. If leader dies, another takes over
    """
    pass

Scenario 2: Scheduler Performance Issue

# scheduler_performance.py
"""
Problem: Scheduler is falling behind, tasks aren't being scheduled
on time at Google's data platform.

Diagnosis Steps:
1. Check scheduler logs for delays
2. Monitor metadata DB performance
3. Review DAG complexity
4. Check file processing time
"""

from datetime import datetime, timedelta

def diagnose_scheduler_issue():
    # Check 1: DAG file processing time
    """
    If DAG files take too long to parse:
    - Reduce DAG complexity
    - Use dynamic DAG generation sparingly
    - Increase min_file_process_interval
    - Use @dag decorator for cleaner code
    """
    
    # Check 2: Task queue depth
    """
    If too many tasks are queued:
    - Increase worker concurrency
    - Add more workers
    - Optimize task dependencies
    - Use pools for resource management
    """
    
    # Check 3: Database bottlenecks
    """
    If DB is slow:
    - Add connection pooling (PgBouncer)
    - Increase pool_size
    - Check for long-running transactions
    - Add indexes on task_instance table
    """
    pass

Scenario 3: Multi-Environment Deployment

# multi_environment.py
"""
Problem: Deploy Airflow across dev, staging, and production
environments with different configurations.
"""

# Environment-specific configs
ENVIRONMENTS = {
    "dev": {
        "executor": "SequentialExecutor",
        "database": "sqlite:///airflow.db",
        "workers": 1,
        "parallelism": 2,
    },
    "staging": {
        "executor": "LocalExecutor",
        "database": "postgresql+psycopg2://airflow:airflow@staging-pg/airflow",
        "workers": 4,
        "parallelism": 32,
    },
    "production": {
        "executor": "CeleryExecutor",
        "database": "postgresql+psycopg2://airflow:airflow@prod-pg/airflow",
        "workers": 16,
        "parallelism": 128,
    },
}

# Dynamic configuration loader
def load_config(environment: str):
    config = ENVIRONMENTS.get(environment)
    if not config:
        raise ValueError(f"Unknown environment: {environment}")
    
    # Set Airflow configurations
    import os
    os.environ["AIRFLOW__CORE__EXECUTOR"] = config["executor"]
    os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] = config["database"]
    os.environ["AIRFLOW__CORE__PARALLELISM"] = str(config["parallelism"])
    
    return config

Edge Cases and Gotchas

⚠️Critical Edge Cases

  1. DAG Serialization: In Airflow 2.x, DAGs are serialized and stored in the DB. If serialization fails, the Webserver won't show DAGs.

  2. DAG Picking: The Scheduler uses file hashes to detect changes. If you have symbolic links, changes might not be detected.

  3. Task Serialization: XCom values must be JSON-serializable. Large payloads will cause performance issues.

  4. Connection Pool Exhaustion: Default pool size (5) is too small for production. Monitor and increase as needed.

# edge_cases.py
from airflow.models.dag import DAG
from datetime import datetime

# Edge Case 1: DAG with no schedule
with DAG(
    dag_id='manual_only_dag',
    schedule_interval=None,  # Manual trigger only
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:
    pass

# Edge Case 2: DAG with complex dependencies
with DAG(
    dag_id='complex_deps_dag',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
) as dag:
    # Cross-DAG dependencies via ExternalTaskSensor
    from airflow.operators.external_task import ExternalTaskSensor
    
    wait_for_upstream = ExternalTaskSensor(
        task_id='wait_for_upstream',
        external_dag_id='upstream_dag',
        external_task_id='final_task',
        poke_interval=60,
        timeout=3600,
    )

QuizBox


Best Practices

Architecture Best Practices

# best_practices.py
"""
1. Database Configuration:
   - Use PostgreSQL or MySQL (never SQLite in production)
   - Enable connection pooling (PgBouncer)
   - Set appropriate pool_size (20+) and max_overflow (30+)
   - Enable statement timeout to prevent long queries

2. Scheduler Configuration:
   - Use multiple parsing_processes (match CPU cores)
   - Set appropriate max_tis_per_query (512-1024)
   - Monitor scheduler heartbeat
   - Use min_file_process_interval wisely

3. Executor Selection:
   - CeleryExecutor for most production use cases
   - KubernetesExecutor for cloud-native deployments
   - LocalExecutor for single-machine setups
   - SequentialExecutor only for testing

4. Webserver Configuration:
   - Use Nginx as reverse proxy
   - Enable RBAC
   - Set appropriate session timeout
   - Use HTTPS in production
"""

Monitoring Checklist

# monitoring_checklist.py
"""
Key Metrics to Monitor:

1. Scheduler:
   - scheduler_heartbeat (should be < 5 seconds)
   - parsing_time (should be < 10 seconds)
   - dag_file_processing_time
   - scheduler_loop_duration

2. Database:
   - connection_pool_active
   - connection_pool_idle
   - query_duration
   - deadlock_count

3. Workers:
   - worker_heartbeat
   - task_execution_time
   - task_failure_rate
   - queue_depth

4. Webserver:
   - request_duration
   - response_time
   - error_rate
   - active_sessions
"""

ℹ️Amazon Interview Tip

When discussing Airflow architecture at Amazon, emphasize the importance of the Metadata Database as the single source of truth. Mention how they handle database failures with automatic failover and read replicas. Also discuss the trade-offs between different executors based on workload characteristics.


Summary

Understanding Airflow's architecture is fundamental for any data engineering role. The key components—Scheduler, Webserver, Workers, and Metadata DB—work together in a distributed system that requires careful configuration and monitoring. For interviews at Amazon and Google, focus on:

  1. How each component interacts
  2. Trade-offs between different executors
  3. Database configuration and performance
  4. High-availability patterns
  5. Monitoring and troubleshooting approaches

This question is part of the Apache Airflow Advanced interview preparation series. Practice explaining these concepts out loud before your interview.

Advertisement