Pipeline Orchestration Patterns: DAGs, Error Handling & Idempotency
Difficulty: Senior Level | Companies: Netflix, Uber, Airbnb, Lyft, DoorDash
1. DAG Design Patterns
Fan-Out / Fan-In
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def fan_out_fan_in_dag():
"""Process multiple regions in parallel, then merge"""
with DAG(
'fan_out_fan_in',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
) as dag:
# Fan-out: process each region in parallel
regions = ['us-east', 'us-west', 'eu-west', 'ap-south']
process_tasks = []
for region in regions:
task = PythonOperator(
task_id=f'process_{region}',
python_callable=process_region,
op_kwargs={'region': region},
)
process_tasks.append(task)
# Fan-in: merge results
merge = PythonOperator(
task_id='merge_results',
python_callable=merge_regions,
)
# All region tasks feed into merge
process_tasks >> merge
return dag
def process_region(region: str):
"""Process data for a single region"""
# Independent processing per region
pass
def merge_regions():
"""Merge all region results"""
# Requires all upstream tasks to complete
pass
Conditional Branching
def conditional_dag():
"""Branch based on data quality"""
with DAG('conditional_pipeline', ...) as dag:
quality_check = PythonOperator(
task_id='check_quality',
python_callable=run_quality_checks,
)
def branch_func(**context):
quality_result = context['ti'].xcom_pull(task_ids='check_quality')
if quality_result['passed']:
return 'process_data'
else:
return 'quarantine_data'
branch = BranchPythonOperator(
task_id='branch_on_quality',
python_callable=branch_func,
)
process = PythonOperator(
task_id='process_data',
python_callable=process_data,
)
quarantine = PythonOperator(
task_id='quarantine_data',
python_callable=quarantine_data,
)
quality_check >> branch >> [process, quarantine]
2. Idempotency Patterns
Idempotent Writes
class IdempotentWriter:
"""Write operations that can be safely retried"""
def __init__(self, target_path: str):
self.target_path = target_path
def write_with_dedup(self, df, partition_key: str):
"""Write with deduplication β same input = same output"""
# Deduplicate input
deduped_df = df.dropDuplicates([partition_key])
# Write to temp location first
temp_path = f"{self.target_path}/_temp/{uuid.uuid4()}"
deduped_df.write.format("delta").mode("overwrite").save(temp_path)
# Atomic rename (or Delta MERGE)
self._atomic_replace(temp_path, self.target_path)
def _atomic_replace(self, temp_path: str, target_path: str):
"""Atomically replace target with temp"""
# For Delta Lake:
# Use MERGE or INSERT OVERWRITE
pass
Idempotent Backfills
class BackfillManager:
def __init__(self, spark, checkpoint_path: str):
self.spark = spark
self.checkpoint_path = checkpoint_path
def backfill_date(self, date: str, force: bool = False):
"""Backfill a single date β safe to retry"""
checkpoint = self._get_checkpoint(date)
if checkpoint == "completed" and not force:
print(f"Date {date} already backfilled, skipping")
return
try:
# Mark as in-progress
self._update_checkpoint(date, "in_progress")
# Process
self._process_date(date)
# Mark as completed
self._update_checkpoint(date, "completed")
except Exception as e:
self._update_checkpoint(date, f"failed: {e}")
raise
def backfill_range(self, start_date: str, end_date: str, parallelism: int = 4):
"""Backfill a date range"""
dates = pd.date_range(start_date, end_date)
with ThreadPoolExecutor(max_workers=parallelism) as executor:
futures = [
executor.submit(self.backfill_date, str(d.date()))
for d in dates
]
for future in as_completed(futures):
future.result() # Raise exceptions
3. Error Handling & Retry Strategies
from airflow.models import Variable
from functools import wraps
import time
class RetryStrategy:
"""Configurable retry with exponential backoff"""
def __init__(self, max_retries: int = 3, base_delay: float = 1.0,
max_delay: float = 60.0, backoff_factor: float = 2.0):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.backoff_factor = backoff_factor
def retry(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(self.max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < self.max_retries:
delay = min(
self.base_delay * (self.backoff_factor ** attempt),
self.max_delay
)
print(f"Attempt {attempt + 1} failed: {e}, retrying in {delay}s")
time.sleep(delay)
raise last_exception
return wrapper
# Usage in Airflow
default_args = {
'owner': 'data-engineering',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(minutes=30),
'on_retry_callback': notify_retry,
'on_failure_callback': notify_failure,
}
Circuit Breaker Pattern
class CircuitBreaker:
"""Prevent cascading failures in dependent services"""
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if service recovered
def __init__(self, failure_threshold: int = 5,
recovery_timeout: int = 60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.state = self.CLOSED
self.failure_count = 0
self.last_failure_time = None
def call(self, func, *args, **kwargs):
if self.state == self.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = self.HALF_OPEN
else:
raise CircuitBreakerOpenError("Circuit is open")
try:
result = func(*args, **kwargs)
if self.state == self.HALF_OPEN:
self.state = self.CLOSED
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = self.OPEN
raise
4. Data Quality Gates
class QualityGate:
"""Gate that blocks downstream if quality fails"""
def __init__(self, checks: list):
self.checks = checks
def validate(self, df) -> dict:
results = {}
all_passed = True
for check in self.checks:
result = check.run(df)
results[check.name] = result
if not result.passed:
all_passed = False
return {
"passed": all_passed,
"results": results,
"failed_checks": [name for name, r in results.items() if not r.passed]
}
class NotNullCheck:
def __init__(self, columns: list):
self.columns = columns
self.name = f"not_null_{','.join(columns)}"
def run(self, df):
for col in self.columns:
null_count = df.filter(F.col(col).isNull()).count()
if null_count > 0:
return CheckResult(passed=False, message=f"{col} has {null_count} nulls")
return CheckResult(passed=True)
class VolumeCheck:
def __init__(self, min_rows: int, max_rows: int):
self.min_rows = min_rows
self.max_rows = max_rows
self.name = "volume_check"
def run(self, df):
count = df.count()
if count < self.min_rows:
return CheckResult(passed=False, message=f"Too few rows: {count}")
if count > self.max_rows:
return CheckResult(passed=False, message=f"Too many rows: {count}")
return CheckResult(passed=True)
# Usage
quality_gate = QualityGate([
NotNullCheck(["order_id", "user_id"]),
VolumeCheck(min_rows=1000, max_rows=10000000),
])
5. Backfill & Recovery Patterns
class BackfillStrategy:
"""Handle historical data backfills"""
def __init__(self, dag_id: str, start_date: str, end_date: str):
self.dag_id = dag_id
self.start_date = start_date
self.end_date = end_date
def generate_backfill_config(self) -> dict:
"""Generate Airflow backfill configuration"""
return {
"dag_id": self.dag_id,
"start_date": self.start_date,
"end_date": self.end_date,
"conf": {"is_backfill": True},
"reset_dag_run": True,
"donot_pickle": False,
}
def backfill_with_dependencies(self, spark):
"""Backfill while respecting data dependencies"""
dates = pd.date_range(self.start_date, self.end_date)
for date in dates:
# Check if upstream dependencies are ready
if self._upstream_ready(date):
self._backfill_date(spark, date)
else:
print(f"Skipping {date} β upstream not ready")
def _upstream_ready(self, date) -> bool:
"""Check if upstream data exists for this date"""
# Query upstream tables
return True
βΉοΈ
Best Practice: Every pipeline operation should be idempotent. If you can't guarantee idempotency, add a unique job ID and deduplicate on read.
Follow-Up Questions
- Design a DAG for a multi-region ETL pipeline with cross-region dependencies.
- How do you handle backfills that need to update downstream aggregates?
- Design a self-healing pipeline that automatically recovers from failures.
- How would you implement a blue-green deployment for a data pipeline?
- Design a pipeline that can handle out-of-order data delivery.