Apache Airflow Architecture
Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β APACHE AIRFLOW ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β Web Server β β Scheduler β β Triggerer β β
β β (Flask) β β (Core) β β (Async) β β
β ββββββββ¬ββββββββ ββββββββ¬ββββββββ ββββββββ¬ββββββββ β
β β β β β
β β ββββββββββββββββ΄βββββββββββββββ β β
β β β β β β
β βΌ βΌ βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β METADATA DATABASE (PostgreSQL) β β
β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β
β β β dag_run β β task β β log β β slot β β serial β β β
β β β β β instanceβ β β β pool β β pickles β β β
β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β β
β βΌ βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β EXECUTOR LAYER β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β β β Sequential β β Local β β Celery β β Kubernetes β β β
β β β Executor β β Executor β β Executor β β Executor β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β β
β βΌ βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β WORKER NODES β β
β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β
β β β Worker 1 β β Worker 2 β β Worker 3 β β Worker N β β Pods β β β
β β β (Celery) β β (Celery) β β (Celery) β β (Celery) β β (K8s) β β β
β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SCHEDULER INTERNAL WORKFLOW β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β DAG FILE PROCESSOR β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β DAG File Processor Manager β β β
β β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β β
β β β β File 1 β β File 2 β β File 3 β β File N β β β β
β β β β Parse β β Parse β β Parse β β Parse β β β β
β β β ββββββ¬βββββ ββββββ¬βββββ ββββββ¬βββββ ββββββ¬βββββ β β β
β β β βββββββββββββΌββββββββββββΌββββββββββββ β β β
β β β βΌ βΌ β β β
β β β βββββββββββββββββββββββββββ β β β
β β β β Serialized DAGs Cache β β β β
β β β βββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β SCHEDULER HEARTBEAT β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β 1. Check for new DAG files β β β
β β β 2. Update DAG parsing statistics β β β
β β β 3. Create DagRuns for scheduled DAGs β β β
β β β 4. Queue task instances for execution β β β
β β β 5. Update task instance states β β β
β β β 6. Process callbacks β β β
β β β 7. Emit heartbeats β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β TASK QUEUING β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β β
β β β β Priorityβ βPriority β βPriority β βPriority β β β β
β β β β 1 ββββΆβ 2 ββββΆβ 3 ββββΆβ N β β β β
β β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β METADATA DATABASE SCHEMA β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββββ
β β dag β β dag_run β β task_instance ββ
β ββββββββββββββββββββ€ ββββββββββββββββββββ€ ββββββββββββββββββββ€β
β β dag_id (PK) ββββ β id (PK) ββββ β task_id (PK) ββ
β β root_dag_id β β β dag_id (FK) β β β dag_id (PK) ββ
β β parent_dag_id β β β run_id (PK) β β β run_id (PK) ββ
β β file_token β βββββΆβ execution_date β βββββΆβ execution_date ββ
β β fileloc β β logical_date β β start_date ββ
β β owners β β start_date β β end_date ββ
β β is_active β β end_date β β duration ββ
β β is_paused β β state β β state ββ
β β is_subdag β β run_type β β try_number ββ
β β schedule_intervalβ β conf β β max_tries ββ
β β timetable_slug β β external_trigger β β hostname ββ
β β tags β β last_scheduling_decisionβ β unixname ββ
β ββββββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββββ
β β β β β
β β β β β
β βΌ βΌ βΌ β
β ββββββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββββ
β β log β β slot_pool β β serialized_dag ββ
β ββββββββββββββββββββ€ ββββββββββββββββββββ€ ββββββββββββββββββββ€β
β β id (PK) β β id (PK) β β dag_id (PK) ββ
β β dag_id β β name β β file_token ββ
β β task_id β β slots β β data ββ
β β execution_date β β occupied_slots β β processor_subdir ββ
β β event β β open_slots β β created_at ββ
β β timestamp β β β β updated_at ββ
β β message β β β β ββ
β β level β β β β ββ
β ββββββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββββ
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Formal Definitions
DfDirected Acyclic Graph (DAG)
A DAG is a finite directed graph with no directed cycles. In Airflow, a DAG is a collection of tasks with organized dependencies, defining the execution order and schedule of a workflow. Formally, where is the set of tasks and is the set of dependency edges, with no cycle .
DfScheduler
The scheduler is the core orchestrator component responsible for triggering scheduled workflows, monitoring task instances, and managing dependencies. It operates on a heartbeat model with interval , creating DagRuns and queuing task instances for execution.
DfExecutor
An executor determines how task instances are executed. It abstracts the execution mechanism, mapping tasks to compute resources. The parallelism is bounded by where is the number of available executor slots and is the count of tasks ready for execution.
DfMetadata Database
The metadata database (typically PostgreSQL) stores all state information about DAGs, task instances, connections, variables, and other Airflow objects. It serves as the central coordination point ensuring consistency across all distributed components via ACID transactions.
Detailed Explanation
Apache Airflow is a platform for programmatically authoring, scheduling, and monitoring workflows. At its core, Airflow follows a modular architecture with several key components that work together to provide reliable workflow orchestration.
Core Components
The Web Server provides the user interface for monitoring and managing DAGs. Built on Flask, it offers a RESTful API and a rich UI that displays DAG runs, task instances, logs, and other metadata. The web server connects to the metadata database to fetch information about workflow execution and presents it in an intuitive dashboard.
The Scheduler is the heart of Airflow. It's responsible for triggering scheduled workflows, monitoring task instances, and managing dependencies. The scheduler uses the metadata database to track state and maintain coordination between different components. It operates on a heartbeat model, periodically checking for new DAG files, creating DagRuns, and queuing task instances for execution.
The Metadata Database stores all state information about DAGs, task instances, users, connections, variables, and other Airflow objects. PostgreSQL is recommended for production deployments due to its ACID compliance and performance characteristics. The database schema is carefully designed to support concurrent operations and maintain data integrity across distributed components.
The Executor determines how task instances are executed. Airflow supports several executors with different trade-offs in terms of complexity, scalability, and resource utilization. The executor layer abstracts the execution mechanism, allowing users to switch between different execution strategies without modifying their DAGs.
Total Task Execution Time
Here,
- =Number of tasks in the DAG
- =Execution time of task i
- =Scheduling and queuing overhead for task i
Maximum Parallelism
Here,
- =Number of available executor slots
- =Total tasks ready for execution
ThDAG Correctness (Acyclicity Invariant)
A valid Airflow DAG must be a directed acyclic graph. If the dependency graph contains a cycle, the scheduler will reject the DAG during parsing. Formally, a path for any subset of tasks .
The scheduler processes DAG files at a configurable interval (min_file_process_interval). Reducing this interval improves responsiveness but increases CPU overhead. For most production deployments, 30 seconds is a reasonable default.
Component Interactions
When a DAG file is parsed, the scheduler creates serialized DAG objects and stores them in the metadata database. The scheduler then monitors for scheduled intervals and creates DagRuns when appropriate. Task instances are queued based on their dependencies and available resources. The executor picks up queued tasks and dispatches them to worker nodes or processes.
The metadata database serves as the central coordination point, ensuring consistency across all components. It tracks task states, manages concurrency limits, and provides the information needed for the web server to display current status. The database also stores connection details, variables, and other configuration that tasks may need during execution.
Scalability Considerations
Airflow's architecture supports horizontal scaling through the executor layer. The Celery executor allows distributing tasks across multiple worker nodes, while the Kubernetes executor dynamically provisions pods for task execution. The scheduler can be configured to parse DAG files in parallel, and the metadata database can be optimized with proper indexing and connection pooling.
The web server is stateless and can be scaled horizontally behind a load balancer. However, the scheduler is typically deployed as a single instance to avoid conflicts in scheduling decisions. For high-availability requirements, Airflow supports running multiple schedulers with careful coordination through the metadata database.
Reliability and Fault Tolerance
Airflow provides several mechanisms for ensuring reliability. The scheduler uses heartbeats to detect and recover from failures. Task instances can be configured with retry logic to handle transient failures. The metadata database stores enough information to resume execution after component failures. DAG runs are tracked with unique identifiers, ensuring that workflows can be properly restarted or resumed.
The system also provides comprehensive logging, with task logs stored centrally and accessible through the web server. This enables debugging and monitoring of workflow execution across distributed environments. Airflow's callback system allows integration with external monitoring tools for proactive alerting on failures or performance issues.
Key Concepts Table
| Component | Purpose | Configuration | Scalability |
|---|---|---|---|
| Web Server | User interface and API | webserver_config.py | Horizontal scaling with load balancer |
| Scheduler | Workflow orchestration | airflow.cfg [scheduler] | Single instance recommended |
| Metadata DB | State management | airflow.cfg [database] | Read replicas for queries |
| Executor | Task execution | airflow.cfg [core] | Depends on executor type |
| Worker | Task processing | Celery/K8s config | Horizontal scaling |
| Triggerer | Async trigger handling | airflow.cfg [triggerer] | Horizontal scaling |
| DAG Processor | File parsing | airflow.cfg [dag_processor_manager] | Parallel parsing |
Code Examples
Custom Executor Configuration
# dags/custom_executor_example.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
# Define default arguments for the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Create the DAG
with DAG(
'architecture_demo_dag',
default_args=default_args,
description='Demonstration of Airflow architecture concepts',
schedule_interval=timedelta(hours=1),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['architecture', 'demo'],
) as dag:
# Task to demonstrate scheduler behavior
def scheduler_info(**context):
"""Display scheduler information and context."""
from airflow.models import DagRun
from airflow.utils.state import State
# Get current DAG run information
dag_run = context['dag_run']
execution_date = context['execution_date']
print(f"Scheduler triggered DAG run: {dag_run.run_id}")
print(f"Execution date: {execution_date}")
print(f"Run type: {dag_run.run_type}")
# Access metadata database through ORM
from airflow.models import TaskInstance
from airflow import settings
session = settings.Session()
task_instances = session.query(TaskInstance).filter(
TaskInstance.dag_id == context['dag'].dag_id,
TaskInstance.run_id == dag_run.run_id
).all()
for ti in task_instances:
print(f"Task: {ti.task_id}, State: {ti.state}")
# Task to demonstrate executor behavior
def executor_demo(**context):
"""Demonstrate executor-specific behavior."""
import platform
import os
executor_info = {
'hostname': platform.node(),
'pid': os.getpid(),
'executor': context['ti'].executor or 'default',
'worker_id': os.environ.get('HOSTNAME', 'unknown'),
}
print(f"Executing on: {executor_info['hostname']}")
print(f"Process ID: {executor_info['pid']}")
print(f"Executor: {executor_info['executor']}")
# Simulate work
import time
time.sleep(2)
return executor_info
# Define tasks
task1 = PythonOperator(
task_id='scheduler_info',
python_callable=scheduler_info,
)
task2 = PythonOperator(
task_id='executor_demo',
python_callable=executor_demo,
)
task3 = BashOperator(
task_id='bash_task',
command='echo "Task executed on $(hostname) at $(date)"',
)
# Set task dependencies
task1 >> task2 >> task3
Metadata Database Optimization
# metadata_db_optimization.py
from airflow import settings
from airflow.models import DagRun, TaskInstance
from sqlalchemy import text
def optimize_metadata_db():
"""Optimize metadata database for better performance."""
session = settings.Session()
# Analyze query performance
analysis_queries = [
# Find slow queries
"""
SELECT query, mean_time, calls
FROM pg_stat_statements
WHERE query LIKE '%task_instance%'
ORDER BY mean_time DESC
LIMIT 10;
""",
# Check index usage
"""
SELECT indexrelname, idx_scan, idx_tup_read, idx_tup_fetch
FROM pg_stat_user_indexes
WHERE schemaname = 'public'
ORDER BY idx_scan DESC;
""",
# Table statistics
"""
SELECT relname, n_tup_ins, n_tup_upd, n_tup_del, n_live_tup, n_dead_tup
FROM pg_stat_user_tables
WHERE schemaname = 'public'
ORDER BY n_live_tup DESC;
""",
]
for query in analysis_queries:
result = session.execute(text(query))
print("Query Results:")
for row in result:
print(row)
# Optimize connection pooling
def configure_connection_pool():
"""Configure SQLAlchemy connection pool for Airflow."""
from sqlalchemy.pool import QueuePool
engine = settings.engine
if hasattr(engine.pool, '_configure'):
engine.pool._configure(
pool_size=5, # Number of connections to maintain
max_overflow=10, # Maximum overflow connections
pool_timeout=30, # Timeout for getting connection
pool_recycle=1800, # Recycle connections after 30 minutes
pool_pre_ping=True # Verify connections before use
)
configure_connection_pool()
# Create additional indexes for better query performance
additional_indexes = [
"""
CREATE INDEX IF NOT EXISTS idx_task_instance_dag_run
ON task_instance(dag_id, run_id);
""",
"""
CREATE INDEX IF NOT EXISTS idx_task_instance_state
ON task_instance(state);
""",
"""
CREATE INDEX IF NOT EXISTS idx_dag_run_state
ON dag_run(state);
""",
]
for index_sql in additional_indexes:
try:
session.execute(text(index_sql))
session.commit()
except Exception as e:
print(f"Index creation failed: {e}")
session.rollback()
if __name__ == "__main__":
optimize_metadata_db()
Web Server Customization
# web_server_customization.py
from airflow.www.app import create_app
from airflow.security import permissions
from flask import Blueprint
# Custom blueprint for additional endpoints
custom_bp = Blueprint(
'custom_bp',
__name__,
template_folder='templates',
static_folder='static',
)
@custom_bp.route('/api/v1/custom/metrics')
def get_custom_metrics():
"""Custom API endpoint for metrics."""
from airflow.models import DagRun, TaskInstance
from airflow.utils.state import State
from airflow import settings
from flask import jsonify
session = settings.Session()
# Get overall statistics
total_dags = session.query(DagRun).count()
running_tasks = session.query(TaskInstance).filter(
TaskInstance.state == State.RUNNING
).count()
failed_tasks = session.query(TaskInstance).filter(
TaskInstance.state == State.FAILED
).count()
metrics = {
'total_dag_runs': total_dags,
'running_tasks': running_tasks,
'failed_tasks': failed_tasks,
'success_rate': (total_dags - failed_tasks) / total_dags * 100 if total_dags > 0 else 0,
}
return jsonify(metrics)
def create_custom_app(config=None):
"""Create Airflow web app with customizations."""
app = create_app(config)
# Register custom blueprint
app.register_blueprint(custom_bp, url_prefix='/airflow')
# Add custom template filters
@app.template_filter('datetime_format')
def datetime_format(value, format='%Y-%m-%d %H:%M:%S'):
return value.strftime(format) if value else ''
# Add custom context processor
@app.context_processor
def inject_custom_variables():
return {
'custom_title': 'Airflow Dashboard',
'version': '2.8.0',
}
return app
Performance Metrics
| Metric | Value | Optimization Strategy |
|---|---|---|
| Scheduler Heartbeat Interval | 5 seconds | Adjust based on workload |
| DAG Parsing Time | 1-10 seconds per file | Use dag_bag caching |
| Database Query Time | < 100ms | Proper indexing, connection pooling |
| Task Queue Latency | < 5 seconds | Optimized executor configuration |
| Web Server Response Time | < 500ms | Caching, load balancing |
| Metadata DB Connections | 50-100 | Connection pooling, read replicas |
| Worker Memory Usage | 1-4 GB | Resource limits, monitoring |
| Log Storage | 1-10 GB/day | Log rotation, external storage |
Best Practices
-
Database Optimization: Use PostgreSQL with proper indexing. Monitor query performance and add indexes for frequently accessed tables. Consider read replicas for heavy read workloads.
-
Executor Selection: Choose the executor based on your scale requirements. Start with LocalExecutor for development, CeleryExecutor for medium scale, and KubernetesExecutor for large-scale deployments.
-
Scheduler Configuration: Tune
min_file_process_intervalanddag_dir_list_intervalbased on your DAG file count and parsing complexity. Useparallelismto control concurrent task execution. -
Connection Management: Implement connection pooling for database connections. Monitor connection usage and adjust pool sizes based on workload patterns.
-
Monitoring and Alerting: Set up comprehensive monitoring for all components. Track scheduler heartbeat, task success rates, and database performance metrics.
-
Security: Implement role-based access control. Use encrypted connections for metadata database and worker communication. Regularly rotate credentials and secrets.
-
Backup Strategy: Implement regular backups of the metadata database. Test restoration procedures to ensure data integrity. Consider point-in-time recovery for critical deployments.
-
Resource Planning: Estimate resource requirements based on DAG count, task frequency, and concurrency limits. Monitor resource utilization and scale proactively.
-
Logging Configuration: Centralize logs for debugging and compliance. Implement log rotation and retention policies. Consider using external logging systems for large deployments.
-
High Availability: For production deployments, consider redundant components where possible. Implement health checks and automatic failover for critical services.
Key Takeaways:
- Airflow follows a modular architecture: Scheduler, Executor, Web Server, Metadata DB, Workers
- The DAG correctness theorem requires acyclic dependency graphs for valid scheduling
- Maximum parallelism is bounded by
- Total execution time includes both task computation and scheduling overhead
- The metadata database is the central coordination point for all distributed components
- Heartbeat-based scheduling ensures fault tolerance and consistency
See also: Kafka Connect (kafka/03), PySpark Submit (pyspark/19), Data Engineering Orchestration (data-engineering/017)