🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Topic: DAG Design Patterns and Best Practices

Apache Airflow AdvancedDAG Design Patterns⭐ Premium

Advertisement

DAG Design Patterns & Best Practices

Building Production-Ready DAGs

NetflixMetaDifficulty: Advanced

Interview Question

ℹ️Interview Context

Company: Netflix / Meta Role: Senior Data Engineer / Data Platform Engineer Difficulty: Advanced Time: 45-60 minutes

Question: "Design a modular, maintainable DAG for a complex data pipeline. How do you handle dynamic task generation, task dependencies, and ensure idempotency? Walk us through your design patterns."


Detailed Theory

Core Design Principles

# design_principles.py
"""
Netflix's DAG Design Principles:

1. MODULARITY:
   - Single responsibility per DAG
   - Reusable task libraries
   - Abstract common patterns into mixins

2. IDEMPOTENCY:
   - Tasks must produce same result on re-run
   - Use unique run IDs for data partitioning
   - Handle partial failures gracefully

3. OBSERVABILITY:
   - Clear task naming
   - Informative logs
   - Proper error handling

4. SCALABILITY:
   - Dynamic task generation
   - Efficient resource usage
   - Parallel execution where possible
"""

Pattern 1: TaskFactory Pattern

# task_factory_pattern.py
from airflow.decorators import dag, task
from datetime import datetime
from typing import List, Dict

# Task Factory for creating reusable task sets
class TaskFactory:
    """Factory pattern for creating task sets"""
    
    @staticmethod
    def create_extract_tasks(
        sources: List[str],
        default_args: Dict = None
    ):
        """Create extraction tasks for multiple sources"""
        tasks = []
        for source in sources:
            @task(task_id=f'extract_{source}')
            def extract(source_name=source):
                # Extraction logic
                return {'source': source_name, 'rows': 1000}
            tasks.append(extract())
        return tasks
    
    @staticmethod
    def create_load_tasks(
        targets: List[str]
    ):
        """Create loading tasks for multiple targets"""
        tasks = []
        for target in targets:
            @task(task_id=f'load_{target}')
            def load(data, target_name=target):
                # Loading logic
                return {'target': target_name, 'status': 'success'}
            tasks.append(load)
        return tasks

# Usage in DAG
@dag(
    dag_id='factory_pattern_dag',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
)
def factory_dag():
    sources = ['mysql', 'postgres', 'api']
    targets = ['s3', 'bigquery', 'snowflake']
    
    extracts = TaskFactory.create_extract_tasks(sources)
    loads = TaskFactory.create_load_tasks(targets)
    
    # Wire dependencies
    for extract in extracts:
        for load in loads:
            load(extract)

factory_dag()

Pattern 2: Dynamic Task Generation

# dynamic_tasks.py
from airflow.decorators import dag, task
from datetime import datetime
from typing import List

@dag(
    dag_id='dynamic_generation_dag',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['dynamic', 'template'],
)
def dynamic_generation():
    """
    Dynamic task generation pattern:
    - Tasks created at parse time based on config
    - Useful for variable number of targets
    - Each task is independent and parallelizable
    """
    
    @task
    def get_sources() -> List[str]:
        """Fetch list of sources from config"""
        return ['source_a', 'source_b', 'source_c', 'source_d']
    
    @task
    def process_source(source: str) -> dict:
        """Process individual source"""
        # Processing logic
        return {
            'source': source,
            'records': 1000,
            'status': 'processed'
        }
    
    @task
    def aggregate(results: List[dict]) -> dict:
        """Aggregate results from all sources"""
        total_records = sum(r['records'] for r in results)
        return {
            'total_sources': len(results),
            'total_records': total_records
        }
    
    # Dynamic task creation
    sources = get_sources()
    processed = process_source.expand(sources)
    agg = aggregate(processed)

dynamic_generation()

ℹ️Pro Tip

Use the .expand() method for dynamic task generation. It's cleaner than using PythonOperator with dynamic Python code. The TaskFlow API with @task decorator makes this pattern very intuitive.

Pattern 3: Branching Pattern

# branching_pattern.py
from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
from datetime import datetime

@dag(
    dag_id='branching_pattern_dag',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
)
def branching_pattern():
    """
    Branching pattern:
    - Choose execution path based on conditions
    - Useful for A/B testing, feature flags
    - Handles conditional logic gracefully
    """
    
    @task
    def evaluate_condition(data: dict) -> str:
        """Evaluate condition and return branch name"""
        if data.get('use_new_path'):
            return 'process_new'
        return 'process_legacy'
    
    @task
    def process_new(data: dict) -> dict:
        """New processing path"""
        return {'path': 'new', 'result': 'new_result'}
    
    @task
    def process_legacy(data: dict) -> dict:
        """Legacy processing path"""
        return {'path': 'legacy', 'result': 'legacy_result'}
    
    @task
    def merge_results(new_result: dict = None, legacy_result: dict = None) -> dict:
        """Merge results from either path"""
        return new_result or legacy_result
    
    # Branch
    condition = evaluate_condition()
    
    # Branch tasks
    new_path = process_new()
    legacy_path = process_legacy()
    
    # Use BranchPythonOperator for branching
    from airflow.operators.python import BranchPythonOperator
    
    branch = BranchPythonOperator(
        task_id='branch',
        python_callable=lambda: 'process_new' if condition else 'process_legacy',
    )
    
    # Merge
    merge = merge_results(
        new_result=new_path,
        legacy_result=legacy_path
    )
    
    branch >> [new_path, legacy_path] >> merge

branching_pattern()

Pattern 4: Error Handling Pattern

# error_handling.py
from airflow.decorators import dag, task
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime

@dag(
    dag_id='error_handling_dag',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    on_failure_callback=handle_failure,
    on_retry_callback=handle_retry,
)
def error_handling_pattern():
    """
    Error handling pattern:
    - Proper trigger rules
    - Callback functions
    - Retry mechanisms
    - Alerting integration
    """
    
    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        retry_exponential_backoff=True,
        max_retry_delay=timedelta(minutes=30),
        on_failure_callback=task_failure_callback,
        on_retry_callback=task_retry_callback,
        on_success_callback=task_success_callback,
    )
    def critical_task() -> dict:
        """Task with comprehensive error handling"""
        try:
            # Critical operation
            result = perform_critical_operation()
            return {'status': 'success', 'result': result}
        except Exception as e:
            # Log and re-raise for retry
            logger.error(f"Task failed: {str(e)}")
            raise
    
    @task(trigger_rule=TriggerRule.ONE_FAILED)
    def handle_failure() -> dict:
        """Handle failure from upstream tasks"""
        # Send alert
        send_alert("Task failed", severity="critical")
        return {'status': 'failure_handled'}
    
    @task(trigger_rule=TriggerRule.ALL_DONE)
    def cleanup() -> dict:
        """Cleanup regardless of upstream status"""
        # Cleanup resources
        cleanup_temp_files()
        return {'status': 'cleaned'}
    
    # Dependencies
    critical = critical_task()
    failure_handler = handle_failure()
    cleanup_task = cleanup()
    
    critical >> failure_handler >> cleanup_task

error_handling_pattern()

⚠️Important

Always set trigger_rule=TriggerRule.ALL_DONE for cleanup tasks. This ensures they run regardless of upstream task status, which is critical for resource cleanup.

Pattern 5: Cross-DAG Dependencies

# cross_dag_deps.py
from airflow.decorators import dag, task
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta

@dag(
    dag_id='cross_dag_dependency_dag',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
)
def cross_dag_dependency():
    """
    Cross-DAG dependency pattern:
    - Wait for upstream DAG completion
    - Pass data between DAGs via XCom
    - Handle timing differences
    """
    
    @task
    def wait_for_upstream():
        """Wait for upstream DAG to complete"""
        # ExternalTaskSensor handles the waiting
        pass
    
    @task
    def process_data() -> dict:
        """Process data from upstream"""
        return {'processed': True}
    
    # ExternalTaskSensor for cross-DAG dependency
    wait_task = ExternalTaskSensor(
        task_id='wait_for_upstream_dag',
        external_dag_id='upstream_etl_dag',
        external_task_id='final_task',
        execution_delta=timedelta(hours=1),
        poke_interval=60,
        timeout=3600,
        mode='reschedule',
    )
    
    process = process_data()
    
    wait_task >> process

cross_dag_dependency()

Pattern 6: SubDAG Pattern (Airflow 1.x Style)

# subdag_pattern.py
"""
Note: SubDAGs are deprecated in Airflow 2.x.
Use TaskGroups instead for better UI and performance.
"""

# Old SubDAG pattern (deprecated)
from airflow.models import SubDAG

def create_subdag(parent_dag, child_dag_name, start_date, schedule):
    subdag = SubDAG(
        parent_dag=parent_dag,
        child_dag_name=child_dag_name,
        start_date=start_date,
        schedule=schedule,
    )
    
    # Add tasks to subdag
    with subdag:
        task1 = DummyOperator(task_id='task1')
        task2 = DummyOperator(task_id='task2')
        task1 >> task2
    
    return subdag

# New TaskGroup pattern (recommended)
from airflow.utils.task_group import TaskGroup

@dag(
    dag_id='taskgroup_pattern_dag',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
)
def taskgroup_pattern():
    start = DummyOperator(task_id='start')
    
    with TaskGroup(group_id='processing') as processing:
        task1 = DummyOperator(task_id='extract')
        task2 = DummyOperator(task_id='transform')
        task3 = DummyOperator(task_id='load')
        task1 >> task2 >> task3
    
    end = DummyOperator(task_id='end')
    
    start >> processing >> end

Real-World Scenarios

Scenario 1: Netflix ETL Pipeline

# netflix_etl_pattern.py
"""
Netflix-style ETL Pipeline:
- Ingest from multiple sources
- Apply business logic
- Load to data warehouse
- Notify stakeholders
"""

from airflow.decorators import dag, task
from datetime import datetime
from typing import List, Dict

@dag(
    dag_id='netflix_etl_pipeline',
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['netflix', 'etl', 'production'],
    max_active_runs=1,  # Prevent concurrent runs
    doc_md="""
    ## Netflix ETL Pipeline
    
    This pipeline ingests data from multiple sources,
    transforms it, and loads to the data warehouse.
    
    **Sources:** MySQL, Kafka, S3
    **Targets:** BigQuery, Snowflake
    """
)
def netflix_etl():
    @task
    def extract_from_mysql() -> List[Dict]:
        """Extract from MySQL source"""
        # Implementation
        return [{'table': 'users', 'rows': 10000}]
    
    @task
    def extract_from_kafka() -> List[Dict]:
        """Extract from Kafka topic"""
        # Implementation
        return [{'topic': 'events', 'records': 50000}]
    
    @task
    def transform_mysql(data: List[Dict]) -> List[Dict]:
        """Transform MySQL data"""
        # Business logic
        return data
    
    @task
    def transform_kafka(data: List[Dict]) -> List[Dict]:
        """Transform Kafka data"""
        # Business logic
        return data
    
    @task
    def load_to_bigquery(mysql_data: List[Dict], kafka_data: List[Dict]) -> Dict:
        """Load combined data to BigQuery"""
        # Combine and load
        return {'status': 'loaded', 'rows': 60000}
    
    @task
    def notify_stakeholders(load_result: Dict) -> None:
        """Send notification to stakeholders"""
        # Send email/Slack notification
        pass
    
    # Extract
    mysql_data = extract_from_mysql()
    kafka_data = extract_from_kafka()
    
    # Transform
    transformed_mysql = transform_mysql(mysql_data)
    transformed_kafka = transform_kafka(kafka_data)
    
    # Load
    load_result = load_to_bigquery(transformed_mysql, transformed_kafka)
    
    # Notify
    notify_stakeholders(load_result)

netflix_etl()

Scenario 2: Meta Data Pipeline with Quality Checks

# meta_quality_pipeline.py
"""
Meta-style pipeline with data quality checks:
- Validate data before processing
- Alert on quality issues
- Quarantine bad data
"""

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id='meta_quality_pipeline',
    schedule_interval='@hourly',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['meta', 'quality', 'production'],
)
def meta_quality_pipeline():
    @task
    def extract() -> dict:
        """Extract raw data"""
        return {'records': 1000, 'quality_score': 0.85}
    
    @task
    def validate(data: dict) -> dict:
        """Validate data quality"""
        quality_score = data.get('quality_score', 0)
        
        if quality_score < 0.7:
            raise ValueError(f"Quality score too low: {quality_score}")
        
        return {
            'valid': True,
            'quality_score': quality_score,
            'records': data['records']
        }
    
    @task
    def process(validated_data: dict) -> dict:
        """Process valid data"""
        return {
            'processed': True,
            'records': validated_data['records']
        }
    
    @task
    def quarantine(data: dict) -> None:
        """Quarantine invalid data"""
        # Move to quarantine table
        pass
    
    @task
    def alert(data: dict) -> None:
        """Send quality alert"""
        # Send alert
        pass
    
    # Pipeline
    raw = extract()
    validated = validate(raw)
    processed = process(validated)
    
    # Error handling
    validate.set_downstream(quarantine)
    validate.set_downstream(alert)

meta_quality_pipeline()

Edge Cases and Gotchas

⚠️Common Pitfalls

  1. Mutable Default Arguments: Never use mutable defaults in task functions. Use None and initialize inside.

  2. Global State: Don't rely on global variables in tasks. They may not be available in worker processes.

  3. Task Serialization: Ensure all task arguments are JSON-serializable.

  4. Circular Dependencies: Airflow detects circular dependencies at parse time. Always verify your DAG structure.

  5. parse_start_date: Using parse_start_date can cause unexpected behavior. Use execution_date instead.

# edge_cases.py
from airflow.decorators import dag, task
from datetime import datetime

# Bad: Mutable default argument
@task
def bad_task(items=[]):  # This will cause issues!
    items.append('new')
    return items

# Good: Use None and initialize
@task
def good_task(items=None):
    if items is None:
        items = []
    items.append('new')
    return items

# Edge Case: Task naming conflicts
@dag(dag_id='naming_dag', schedule_interval='@daily', start_date=datetime(2024, 1, 1))
def naming_dag():
    # Use unique task IDs
    @task(task_id='process_data_v1')
    def process_v1():
        return {'version': 'v1'}
    
    @task(task_id='process_data_v2')
    def process_v2():
        return {'version': 'v2'}
    
    process_v1() >> process_v2()

naming_dag()

QuizBox


Best Practices

Naming Conventions

# naming_conventions.py
"""
Naming Best Practices:

1. DAG IDs:
   - Use snake_case
   - Include team/domain prefix
   - Be descriptive
   - Example: 'marketing_email_campaign_daily'

2. Task IDs:
   - Use snake_case
   - Include action verb
   - Be specific
   - Example: 'extract_users_from_mysql'

3. Tags:
   - Use consistent tags
   - Include team, type, priority
   - Example: ['marketing', 'etl', 'high-priority']
"""

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id='marketing_email_campaign_daily',  # Clear naming
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    tags=['marketing', 'etl', 'high-priority'],  # Consistent tagging
    doc_md="## Email Campaign Pipeline\nDaily extraction and processing of email campaign data.",
)
def well_named_dag():
    @task(task_id='extract_campaign_data')  # Descriptive task ID
    def extract():
        return {'data': 'value'}
    
    @task(task_id='transform_campaign_metrics')
    def transform(data):
        return data
    
    @task(task_id='load_to_warehouse')
    def load(data):
        return {'status': 'loaded'}
    
    extract() >> transform() >> load()

well_named_dag()

Documentation Standards

# documentation.py
"""
Documentation Best Practices:

1. Use doc_md for DAG documentation
2. Use doc_md for task documentation
3. Include purpose, inputs, outputs
4. Document any special considerations
"""

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id='documented_pipeline',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    doc_md="""
    ## Documented Pipeline
    
    ### Purpose
    This pipeline processes customer data for analytics.
    
    ### Inputs
    - MySQL: customer_updates table
    - S3: raw_events bucket
    
    ### Outputs
    - BigQuery: analytics.customer_metrics
    
    ### Schedule
    Runs daily at 2 AM UTC
    
    ### Contacts
    - Team: data-engineering@company.com
    - On-call: #data-engineering
    """,
)
def documented_pipeline():
    @task(
        doc_md="""
        ### Extract Customer Updates
        
        Extracts incremental customer updates from MySQL.
        
        **Logic:**
        - Query where updated_at > last_run
        - Handle NULL values
        - Return as JSON
        """,
    )
    def extract_customers() -> dict:
        """Extract customer updates from MySQL."""
        # Implementation
        return {'records': 1000}
    
    extract_customers()

documented_pipeline()

ℹ️Netflix Interview Tip

Netflix emphasizes modularity and reusability. When discussing DAG design, mention how they use TaskFactory patterns to create reusable task sets across multiple pipelines. Also discuss their approach to error handling with comprehensive callbacks and alerting.


Summary

Mastering DAG design patterns is essential for building production-ready data pipelines. Key patterns include:

  1. TaskFactory Pattern - Create reusable task sets
  2. Dynamic Task Generation - Use .expand() for variable task counts
  3. Branching Pattern - Handle conditional logic
  4. Error Handling Pattern - Comprehensive error handling with callbacks
  5. Cross-DAG Dependencies - Use ExternalTaskSensor
  6. TaskGroup Pattern - Organize related tasks

For Netflix and Meta interviews, focus on:

  • Modular, maintainable design
  • Comprehensive error handling
  • Proper documentation
  • Scalability considerations
  • Production best practices

This question is part of the Apache Airflow Advanced interview preparation series. Practice implementing these patterns before your interview.

Advertisement