Airflow DAG Design Patterns & Error Handling
Building production-ready data pipelines with Airflow
Interview Question
"Design an Airflow DAG that: (1) runs daily at 2 AM, (2) extracts data from 3 sources (API, database, S3), (3) validates data quality, (4) transforms and loads into a warehouse, (5) sends alerts on failure, (6) handles late-arriving data. Include error handling, retry logic, and testing strategy."
Difficulty: Medium-Hard | Frequently asked at Airbnb, Spotify, Lyft, DoorDash
Theoretical Foundation
Airflow Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Airflow Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Web Serverβ β Scheduler β β Metadata β β
β β (UI) β β β β Database β β
β ββββββββ¬βββββββ ββββββββ¬βββββββ ββββββββ¬βββββββ β
β β β β β
β ββββββββββββββββββββΌβββββββββββββββββββ β
β β β
β ββββββββββ΄βββββββββ β
β β Executor β β
β β (Celery/K8s) β β
β ββββββββββ¬βββββββββ β
β β β
β ββββββββββββββββββββΌβββββββββββββββββββ β
β β β β β
β ββββββββ΄βββββββ ββββββββ΄βββββββ ββββββββ΄βββββββ β
β β Worker 1 β β Worker 2 β β Worker 3 β β
β β (Task) β β (Task) β β (Task) β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
DAG (Directed Acyclic Graph)
A DAG defines the workflow structureβtasks and their dependencies.
Key properties:
- Directed: Tasks have clear dependencies
- Acyclic: No circular dependencies
- Graph: Visual representation of workflow
Task Dependencies
# Dependency patterns
task_a >> task_b # A must complete before B
task_a >> [task_b, task_c] # A before B and C (parallel)
[task_a, task_b] >> task_c # A and B before C
task_a >> task_b >> task_c # Sequential chain
Operators
Operators define what work to do:
| Operator | Use Case |
|---|---|
PythonOperator | Python functions |
BashOperator | Shell commands |
SparkSubmitOperator | Spark jobs |
S3ToRedshiftOperator | S3 to Redshift |
PostgresOperator | PostgreSQL queries |
HttpSensor | Wait for API |
S3KeySensor | Wait for S3 file |
Executors
| Executor | Use Case |
|---|---|
SequentialExecutor | Single machine, testing |
LocalExecutor | Multi-core single machine |
CeleryExecutor | Distributed workers |
KubernetesExecutor | Dynamic pod scaling |
Error Handling Patterns
1. Retries with Exponential Backoff
task = PythonOperator(
task_id='extract_data',
python_callable=extract_func,
retries=3,
retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True,
max_retry_delay=timedelta(hours=1),
)
2. Callbacks
task = PythonOperator(
task_id='process_data',
python_callable=process_func,
on_success_callback=success_alert,
on_failure_callback=failure_alert,
on_retry_callback=retry_alert,
)
3. Trigger Rules
# Default: all_success
task_b >> task_c # C runs only if B succeeds
# Run even if upstream fails
task_b.set_upstream(task_a)
task_c.set_upstream(task_b)
task_c.trigger_rule = TriggerRule.ALL_FAILED # Run if B fails
# Run if any upstream succeeds
task_c.trigger_rule = TriggerRule.ONE_SUCCESS
# Always run (even if upstream fails)
task_c.trigger_rule = TriggerRule.NONE_FAILED
4. Branching
def choose_branch(**context):
"""Choose which branch to execute"""
if context['dag_run'].conf.get('full_refresh'):
return 'full_load'
return 'incremental_load'
branch = BranchPythonOperator(
task_id='branch',
python_callable=choose_branch,
)
full_load = PythonOperator(task_id='full_load', ...)
incremental = PythonOperator(task_id='incremental', ...)
branch >> [full_load, incremental]
TaskFlow API (Airflow 2.0+)
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
@dag(
schedule_interval='@daily',
start_date=days_ago(1),
catchup=False,
tags=['etl', 'production'],
)
def etl_pipeline():
@task
def extract_api():
"""Extract data from API"""
data = call_api()
return data # XCom automatically
@task
def extract_db():
"""Extract data from database"""
data = query_database()
return data
@task
def validate(api_data, db_data):
"""Validate data quality"""
assert len(api_data) > 0
assert len(db_data) > 0
return {'valid': True}
@task
def transform(api_data, db_data, validation):
"""Transform data"""
combined = combine(api_data, db_data)
return combined
@task
def load(transformed_data):
"""Load into warehouse"""
write_to_warehouse(transformed_data)
# Define dependencies
api_data = extract_api()
db_data = extract_db()
validation = validate(api_data, db_data)
transformed = transform(api_data, db_data, validation)
load(transformed)
etl_pipeline()
Dynamic DAG Generation
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
import json
# Load configuration from external source
def load_config():
"""Load DAG configuration from JSON file"""
with open('/opt/airflow/config/dags.json') as f:
return json.load(f)
# Generate DAGs from configuration
for dag_config in load_config():
dag = DAG(
dag_id=dag_config['name'],
schedule_interval=dag_config['schedule'],
start_date=days_ago(1),
catchup=False,
tags=dag_config.get('tags', []),
)
tasks = {}
for task_config in dag_config['tasks']:
task = PythonOperator(
task_id=task_config['name'],
python_callable=eval(task_config['callable']),
dag=dag,
)
tasks[task_config['name']] = task
# Set dependencies
for task_config in dag_config['tasks']:
for dep in task_config.get('dependencies', []):
tasks[task_config['name']] >> tasks[dep]
globals()[dag_config['name']] = dag
Testing Strategy
# ============================================================
# UNIT TESTING DAGs
# ============================================================
import pytest
from airflow.models import DagBag
@pytest.fixture
def dag_bag():
"""Load all DAGs"""
return DagBag(dag_folder='/opt/airflow/dags', include_examples=False)
def test_dag_import(dag_bag):
"""Test that all DAGs can be imported"""
assert len(dag_bag.import_errors) == 0, \
f"DAG import errors: {dag_bag.import_errors}"
def test_dag_structure(dag_bag):
"""Test DAG structure"""
dag = dag_bag.get_dag('daily_etl_pipeline')
# Check schedule
assert dag.schedule_interval == '@daily'
# Check tasks exist
assert 'extract_api' in dag.task_ids
assert 'extract_db' in dag.task_ids
assert 'extract_s3' in dag.task_ids
assert 'validate' in dag.task_ids
assert 'transform' in dag.task_ids
assert 'load' in dag.task_ids
def test_task_dependencies(dag_bag):
"""Test task dependencies"""
dag = dag_bag.get_dag('daily_etl_pipeline')
# Check dependencies
extract_api = dag.get_task('extract_api')
validate = dag.get_task('validate')
assert validate.task_id in [t.task_id for t in extract_api.downstream_list]
def test_retry_config(dag_bag):
"""Test retry configuration"""
dag = dag_bag.get_dag('daily_etl_pipeline')
for task in dag.tasks:
if task.task_id.startswith('extract'):
assert task.retries == 3
assert task.retry_delay == timedelta(minutes=5)
Production DAG Example
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.utils.dates import days_ago
from airflow.models import Variable
import json
import logging
logger = logging.getLogger(__name__)
# ============================================================
# DAG CONFIGURATION
# ============================================================
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'email': ['alerts@company.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(hours=1),
'execution_timeout': timedelta(hours=2),
'on_failure_callback': None, # Set in DAG
'on_retry_callback': None,
}
# ============================================================
# CALLBACK FUNCTIONS
# ============================================================
def alert_failure(context):
"""Send alert on task failure"""
logger.error(f"Task {context['task_instance'].task_id} failed")
# Send to Slack/PagerDuty
import requests
requests.post(
Variable.get('slack_webhook_url'),
json={
'text': f"π¨ ETL Pipeline Failed\n"
f"Task: {context['task_instance'].task_id}\n"
f"DAG: {context['dag'].dag_id}\n"
f"Execution: {context['execution_date']}\n"
f"Log: {context['task_instance'].log_url}"
}
)
def alert_success(context):
"""Send alert on DAG success"""
logger.info(f"DAG {context['dag'].dag_id} completed successfully")
# ============================================================
# DATA QUALITY CHECKS
# ============================================================
def validate_api_data(**context):
"""Validate API data quality"""
ti = context['task_instance']
data = ti.xcom_pull(task_ids='extract_api')
# Check row count
if len(data) == 0:
raise ValueError("API returned no data")
# Check required fields
required_fields = ['id', 'name', 'timestamp']
for field in required_fields:
if field not in data[0]:
raise ValueError(f"Missing required field: {field}")
# Check data freshness
latest_timestamp = max(row['timestamp'] for row in data)
if latest_timestamp < context['execution_date'] - timedelta(days=1):
raise ValueError("API data is stale")
return {'valid': True, 'row_count': len(data)}
def validate_db_data(**context):
"""Validate database data quality"""
ti = context['task_instance']
data = ti.xcom_pull(task_ids='extract_db')
# Check for nulls in critical columns
for row in data:
if row.get('user_id') is None:
raise ValueError(f"Null user_id found: {row}")
return {'valid': True, 'row_count': len(data)}
# ============================================================
# DAG DEFINITION
# ============================================================
with DAG(
dag_id='daily_etl_pipeline',
default_args=default_args,
description='Daily ETL pipeline with data quality checks',
schedule_interval='0 2 * * *', # 2 AM daily
start_date=days_ago(1),
catchup=False,
max_active_runs=1,
tags=['etl', 'production', 'daily'],
on_success_callback=alert_success,
on_failure_callback=alert_failure,
) as dag:
# ============================================================
# EXTRACT TASKS
# ============================================================
extract_api = PythonOperator(
task_id='extract_api',
python_callable=lambda: call_api(
url=Variable.get('api_url'),
api_key=Variable.get('api_key')
),
provide_context=True,
)
extract_db = PythonOperator(
task_id='extract_db',
python_callable=lambda: query_database(
connection_id='postgres_source',
query="SELECT * FROM users WHERE updated_at > %s"
),
provide_context=True,
)
extract_s3 = BashOperator(
task_id='extract_s3',
bash_command="""
aws s3 cp s3://source-bucket/data/{{ ds }}/data.csv \
/tmp/extracted_data.csv
""",
)
# ============================================================
# VALIDATE TASKS
# ============================================================
validate_api = PythonOperator(
task_id='validate_api',
python_callable=validate_api_data,
provide_context=True,
)
validate_db = PythonOperator(
task_id='validate_db',
python_callable=validate_db_data,
provide_context=True,
)
validate_s3 = PostgresOperator(
task_id='validate_s3',
postgres_conn_id='postgres_warehouse',
sql="""
SELECT COUNT(*) FROM stg.s3_data
WHERE date = '{{ ds }}'
""",
)
# ============================================================
# TRANSFORM TASKS
# ============================================================
transform = PythonOperator(
task_id='transform',
python_callable=transform_data,
provide_context=True,
execution_timeout=timedelta(hours=1),
)
# ============================================================
# LOAD TASKS
# ============================================================
load = PostgresOperator(
task_id='load',
postgres_conn_id='postgres_warehouse',
sql="""
INSERT INTO warehouse.users (id, name, email, updated_at)
SELECT id, name, email, updated_at
FROM stg.users_staging
ON CONFLICT (id) DO UPDATE
SET name = EXCLUDED.name,
email = EXCLUDED.email,
updated_at = EXCLUDED.updated_at;
""",
)
# ============================================================
# DEPENDENCIES
# ============================================================
# Extract in parallel
[extract_api, extract_db, extract_s3] >> validate_api
[extract_api, extract_db, extract_s3] >> validate_db
[extract_api, extract_db, extract_s3] >> validate_s3
# Validate then transform
[validate_api, validate_db, validate_s3] >> transform
# Transform then load
transform >> load
Advanced Patterns
SubDAGs
from airflow.utils.task_group import TaskGroup
def create_extraction_tasks():
"""Create extraction task group"""
with TaskGroup("extraction") as extraction:
extract_api = PythonOperator(task_id='extract_api', ...)
extract_db = PythonOperator(task_id='extract_db', ...)
extract_s3 = PythonOperator(task_id='extract_s3', ...)
return [extract_api, extract_db, extract_s3]
def create_validation_tasks():
"""Create validation task group"""
with TaskGroup("validation") as validation:
validate_api = PythonOperator(task_id='validate_api', ...)
validate_db = PythonOperator(task_id='validate_db', ...)
validate_s3 = PythonOperator(task_id='validate_s3', ...)
return [validate_api, validate_db, validate_s3]
# Main DAG
with DAG('etl_pipeline', ...) as dag:
extraction = create_extraction_tasks()
validation = create_validation_tasks()
transform = PythonOperator(task_id='transform', ...)
load = PythonOperator(task_id='load', ...)
extraction >> validation >> transform >> load
Sensor Pattern
from airflow.sensors.s3KeySensor import S3KeySensor
from airflow.sensors.http_sensor import HttpSensor
# Wait for data availability
wait_for_s3 = S3KeySensor(
task_id='wait_for_s3',
bucket_name='source-bucket',
bucket_key='data/{{ ds }}/data.csv',
timeout=3600,
poke_interval=60,
mode='reschedule',
)
# Wait for API availability
wait_for_api = HttpSensor(
task_id='wait_for_api',
http_conn_id='api_endpoint',
endpoint='/health',
timeout=300,
poke_interval=30,
)
Monitoring and Observability
# ============================================================
# CUSTOM METRICS
# ============================================================
from airflow.metrics import metrics
def extract_data(**context):
"""Extract data with metrics"""
# Increment counter
metrics.incr('etl.extract.api.calls')
# Time the operation
import time
start = time.time()
data = call_api()
duration = time.time() - start
metrics.timing('etl.extract.api.duration', duration)
# Record row count
metrics.gauge('etl.extract.api.rows', len(data))
return data
# ============================================================
# LOGGING
# ============================================================
def log_pipeline_status(**context):
"""Log pipeline status"""
ti = context['task_instance']
logger.info(f"""
Pipeline Status:
- DAG: {context['dag'].dag_id}
- Task: {ti.task_id}
- Execution Date: {context['execution_date']}
- State: {ti.state}
- Duration: {ti.duration}
- Try Number: {ti.try_number}
""")
π‘
Production Tip: Use Airflow Variables and Connections for sensitive configuration. Never hardcode credentials in DAGs. Use environment variables or a secrets backend (HashiCorp Vault, AWS Secrets Manager).
Common Follow-Up Questions
Q1: How do you handle backfills in Airflow?
# Option 1: catchup=True with start_date
dag = DAG(
'backfill_dag',
start_date=datetime(2024, 1, 1),
catchup=True, # Will run for all dates since start_date
)
# Option 2: Manual backfill with specific dates
airflow tasks backfill -s 2024-01-01 -e 2024-01-31 daily_etl_pipeline
# Option 3: Use BackfillOperator
from airflow.operators.empty import EmptyOperator
Q2: How do you handle data dependencies?
# Use ExternalTaskSensor
from airflow.sensors.external_task import ExternalTaskSensor
wait_for_upstream = ExternalTaskSensor(
task_id='wait_for_upstream',
external_dag_id='upstream_dag',
external_task_id='load_task',
execution_delta=timedelta(hours=1),
mode='reschedule',
)
Q3: How do you manage DAG versioning?
# Use Git for version control
# Use tags for releases
dag = DAG(
'etl_pipeline',
tags=['v1.2.3', 'production'],
)
# Use MD5 hash of DAG file for change detection
import hashlib
with open(__file__, 'rb') as f:
dag_hash = hashlib.md5(f.read()).hexdigest()
Q4: How do you handle secrets?
# Use Airflow Variables
api_key = Variable.get('api_key')
# Use Connections
from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection('my_connection')
# Use Secrets Backend
from airflow.providers.amazon.aws.secrets.secrets_manager import SecretsManagerBackend
β οΈ
Critical Consideration: Airflow is not designed for real-time streaming. Use it for batch workflows and orchestration. For streaming, use Kafka, Flink, or Spark Streaming. Airflow can trigger streaming jobs but shouldn't process events itself.
Company-Specific Tips
Airbnb Interview Tips
- Discuss SubDAGs and TaskGroups for complex workflows
- Explain data quality with Great Expectations
- Mention cost optimization with spot instances
- Talk about multi-tenancy with role-based access
Spotify Interview Tips
- Focus on data product delivery pipelines
- Discuss A/B testing data pipelines
- Mention ML pipeline orchestration
- Talk about data freshness monitoring
Lyft Interview Tips
- Discuss geospatial data processing pipelines
- Explain real-time analytics with Airflow
- Mention cost optimization for ride data
- Talk about multi-region deployment
βΉοΈ
Final Takeaway: Airflow is the de facto standard for batch data pipeline orchestration. Master DAG design patterns, error handling, and testing to build production-ready pipelines. Always consider: (1) idempotency, (2) retry logic, (3) monitoring, and (4) cost optimization.