CW

Real-World Data Pipeline Patterns in Apache Airflow

Free Lesson

Advertisement

Real-World Data Pipeline Patterns

Architecture Diagram

Formal Definitions

DfData Pipeline

A Data Pipeline is a series of data processing steps that extract data from sources, transform it, and load it into target systems. Formally, P=(E,T,L)P = (E, T, L) where EE is extraction, TT is transformation, and LL is loading. The pipeline maintains data integrity: Dtarget=f(Dsource)D_{\text{target}} = f(D_{\text{source}}).

DfIncremental Loading

Incremental Loading processes only new or changed data since the last run, rather than the entire dataset. The incremental condition is Dnew={dDsource:T(d)>Tlast_run}D_{\text{new}} = \{d \in D_{\text{source}} : T(d) > T_{\text{last\_run}}\} where T(d)T(d) is the timestamp of record dd.

DfData Quality Gate

A Data Quality Gate is a validation checkpoint that blocks data from progressing if quality criteria are not met. The gate function is G:D{pass,fail}G: D \rightarrow \{pass, fail\} where DD is the data being validated.

Detailed Explanation

ETL Pipeline Pattern

from airflow.decorators import task, dag
from datetime import datetime, timedelta
from typing import Dict, Any

@dag(
    schedule_interval="0 6 * * *",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'production'],
    default_args={
        'retries': 3,
        'retry_delay': timedelta(minutes=5),
    },
)
def etl_pipeline():
    
    @task
    def extract_orders() -> list:
        """Extract orders from source database."""
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        
        hook = PostgresHook(postgres_conn_id='source_db')
        
        query = """
            SELECT order_id, customer_id, order_date, total_amount
            FROM orders
            WHERE order_date >= '{{ ds }}'
            AND order_date < '{{ next_ds }}'
        """
        
        df = hook.get_pandas_df(query)
        return df.to_dict('records')
    
    @task
    def transform_orders(orders: list) -> list:
        """Transform orders with business logic."""
        transformed = []
        
        for order in orders:
            # Apply business rules
            transformed_order = {
                'order_id': order['order_id'],
                'customer_id': order['customer_id'],
                'order_date': order['order_date'],
                'total_amount': float(order['total_amount']),
                'order_category': categorize_order(float(order['total_amount'])),
                'processed_at': datetime.now().isoformat(),
            }
            transformed.append(transformed_order)
        
        return transformed
    
    def categorize_order(amount: float) -> str:
        """Categorize order by amount."""
        if amount >= 1000:
            return 'enterprise'
        elif amount >= 100:
            return 'business'
        else:
            return 'consumer'
    
    @task
    def validate_data(data: list) -> dict:
        """Validate data quality."""
        errors = []
        
        for record in data:
            if not record.get('order_id'):
                errors.append(f"Missing order_id: {record}")
            if record.get('total_amount', 0) < 0:
                errors.append(f"Negative amount: {record}")
        
        return {
            'valid': len(errors) == 0,
            'total_records': len(data),
            'error_count': len(errors),
            'errors': errors[:10],
        }
    
    @task
    def load_to_warehouse(data: list, validation: dict) -> int:
        """Load validated data to warehouse."""
        if not validation['valid']:
            raise ValueError(f"Data validation failed: {validation['errors']}")
        
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        
        hook = PostgresHook(postgres_conn_id='warehouse_db')
        
        # Insert data
        rows = [
            (d['order_id'], d['customer_id'], d['order_date'],
             d['total_amount'], d['order_category'], d['processed_at'])
            for d in data
        ]
        
        hook.insert_rows(
            table='fact_orders',
            rows=rows,
            target_fields=['order_id', 'customer_id', 'order_date',
                          'total_amount', 'order_category', 'processed_at'],
        )
        
        return len(rows)
    
    # Define pipeline
    orders = extract_orders()
    transformed = transform_orders(orders)
    validation = validate_data(transformed)
    loaded = load_to_warehouse(transformed, validation)
    
    loaded

etl_pipeline()

ELT Pipeline Pattern

from airflow.decorators import task, dag
from datetime import datetime

@dag(
    schedule_interval="0 2 * * *",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['elt', 'dbt', 'production'],
)
def elt_pipeline():
    
    @task
    def extract_to_staging():
        """Extract raw data to staging area."""
        from airflow.providers.amazon.aws.hooks.s3 import S3Hook
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        
        # Extract from source
        source_hook = PostgresHook(postgres_conn_id='source_db')
        df = source_hook.get_pandas_df("SELECT * FROM raw_events")
        
        # Load to S3 (staging)
        s3_hook = S3Hook(aws_conn_id='aws_default')
        
        local_path = '/tmp/staging_events.parquet'
        df.to_parquet(local_path, index=False)
        
        s3_hook.load_file(
            filename=local_path,
            key=f'staging/events/{datetime.now().strftime("%Y/%m/%d")}/events.parquet',
            bucket_name='data-lake',
            replace=True,
        )
        
        return {'rows': len(df), 'status': 'staged'}
    
    @task
    def run_dbt_models():
        """Run dbt transformations in warehouse."""
        import subprocess
        
        # Run dbt models
        result = subprocess.run(
            ['dbt', 'run', '--models', 'staging.*', 'marts.*'],
            capture_output=True,
            text=True,
            cwd='/opt/dbt/project',
        )
        
        if result.returncode != 0:
            raise Exception(f"dbt failed: {result.stderr}")
        
        return {'status': 'transformed', 'output': result.stdout}
    
    @task
    def run_dbt_tests():
        """Run dbt data quality tests."""
        import subprocess
        
        result = subprocess.run(
            ['dbt', 'test'],
            capture_output=True,
            text=True,
            cwd='/opt/dbt/project',
        )
        
        if result.returncode != 0:
            raise Exception(f"dbt tests failed: {result.stderr}")
        
        return {'status': 'tests_passed'}
    
    @task
    def refresh_dashboards():
        """Refresh BI dashboards after transformation."""
        # Trigger dashboard refresh
        print("Refreshing dashboards...")
        return {'status': 'refreshed'}
    
    # Define pipeline
    staged = extract_to_staging()
    transformed = run_dbt_models()
    tested = run_dbt_tests()
    refreshed = refresh_dashboards()
    
    staged >> transformed >> tested >> refreshed

elt_pipeline()

Incremental Loading Pattern

from airflow.decorators import task, dag
from datetime import datetime, timedelta
from typing import Optional

@dag(
    schedule_interval="@hourly",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['incremental', 'production'],
)
def incremental_pipeline():
    
    @task
    def get_last_checkpoint() -> Optional[str]:
        """Get the last processing checkpoint."""
        from airflow.models import Variable
        
        checkpoint = Variable.get(
            'incremental_checkpoint',
            default_var=None,
        )
        return checkpoint
    
    @task
    def extract_incremental(last_checkpoint: Optional[str]) -> list:
        """Extract only new/changed records."""
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        
        hook = PostgresHook(postgres_conn_id='source_db')
        
        if last_checkpoint:
            query = f"""
                SELECT * FROM events
                WHERE updated_at > '{last_checkpoint}'
                ORDER BY updated_at
                LIMIT 10000
            """
        else:
            query = "SELECT * FROM events ORDER BY updated_at LIMIT 10000"
        
        df = hook.get_pandas_df(query)
        return df.to_dict('records')
    
    @task
    def transform_incremental(records: list) -> list:
        """Transform incremental records."""
        transformed = []
        
        for record in records:
            # Apply transformations
            transformed_record = {
                'id': record['id'],
                'event_type': record['event_type'].lower(),
                'payload': record['payload'],
                'event_timestamp': record['event_timestamp'],
                'processed_at': datetime.now().isoformat(),
            }
            transformed.append(transformed_record)
        
        return transformed
    
    @task
    def upsert_to_warehouse(records: list) -> int:
        """Upsert records to warehouse (insert or update)."""
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        
        hook = PostgresHook(postgres_conn_id='warehouse_db')
        
        # Use upsert (INSERT ON CONFLICT UPDATE)
        rows = [
            (r['id'], r['event_type'], r['payload'],
             r['event_timestamp'], r['processed_at'])
            for r in records
        ]
        
        hook.insert_rows(
            table='events',
            rows=rows,
            target_fields=['id', 'event_type', 'payload',
                          'event_timestamp', 'processed_at'],
        )
        
        return len(rows)
    
    @task
    def update_checkpoint(records: list):
        """Update checkpoint with latest timestamp."""
        from airflow.models import Variable
        
        if records:
            latest_timestamp = max(r['event_timestamp'] for r in records)
            Variable.set('incremental_checkpoint', latest_timestamp)
    
    # Define pipeline
    last_checkpoint = get_last_checkpoint()
    records = extract_incremental(last_checkpoint)
    transformed = transform_incremental(records)
    loaded = upsert_to_warehouse(transformed)
    update_checkpoint(records)
    
    loaded

incremental_pipeline()
Incremental Data Volume
Vincremental={dD:T(d)>Tcheckpoint}V_{\text{incremental}} = |\{d \in D : T(d) > T_{\text{checkpoint}}\}|

Here,

  • VextincrementalV_{ ext{incremental}}=Volume of incremental data
  • DD=Full dataset
  • T(d)T(d)=Timestamp of record d
  • TextcheckpointT_{ ext{checkpoint}}=Last processing checkpoint

Pipeline Throughput

Throughput=VprocessedTexecution\text{Throughput} = \frac{V_{\text{processed}}}{T_{\text{execution}}}

Here,

  • VextprocessedV_{ ext{processed}}=Volume of data processed
  • TextexecutionT_{ ext{execution}}=Pipeline execution time

Data Quality Score

Q=NvalidNtotal×100%Q = \frac{N_{\text{valid}}}{N_{\text{total}}} \times 100\%

Here,

  • QQ=Data quality score
  • NextvalidN_{ ext{valid}}=Number of valid records
  • NexttotalN_{ ext{total}}=Total number of records

Use incremental loading for large datasets to reduce processing time and resource usage. Track checkpoints using Airflow Variables or a dedicated metadata table.

Implement data quality gates between pipeline stages. Fail fast when quality issues are detected to prevent bad data from propagating downstream.

Key Concepts Table

PatternUse CaseComplexityPerformance
Full ETLSmall datasetsLowMedium
ELTLarge datasets, modern warehousesMediumHigh
IncrementalLarge, append-heavy datasetsHighVery High
CDCReal-time sync, audit trailsVery HighVery High
StreamingReal-time analyticsVery HighVery High

Code Examples

Data Quality Framework

# data_quality.py
from airflow.decorators import task, dag
from datetime import datetime
from typing import Dict, Any, List
import json

class DataQualityChecker:
    """Comprehensive data quality checking framework."""
    
    def __init__(self):
        self.rules = []
    
    def add_rule(self, name, check_fn, severity='error'):
        """Add a quality rule."""
        self.rules.append({
            'name': name,
            'check_fn': check_fn,
            'severity': severity,
        })
    
    def check(self, data: List[Dict]) -> Dict[str, Any]:
        """Run all quality checks."""
        results = {
            'passed': True,
            'total_records': len(data),
            'checks': [],
        }
        
        for rule in self.rules:
            try:
                passed = rule['check_fn'](data)
                results['checks'].append({
                    'name': rule['name'],
                    'passed': passed,
                    'severity': rule['severity'],
                })
                
                if not passed and rule['severity'] == 'error':
                    results['passed'] = False
                    
            except Exception as e:
                results['checks'].append({
                    'name': rule['name'],
                    'passed': False,
                    'error': str(e),
                    'severity': rule['severity'],
                })
                
                if rule['severity'] == 'error':
                    results['passed'] = False
        
        return results

# Usage in DAG
@dag(
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['data-quality', 'production'],
)
def quality_checked_pipeline():
    
    @task
    def extract_data() -> list:
        """Extract data from source."""
        return [
            {'id': 1, 'name': 'Alice', 'email': 'alice@example.com'},
            {'id': 2, 'name': 'Bob', 'email': 'bob@example.com'},
            {'id': 3, 'name': '', 'email': 'invalid-email'},
        ]
    
    @task
    def check_quality(data: list) -> dict:
        """Run data quality checks."""
        checker = DataQualityChecker()
        
        # Add rules
        checker.add_rule(
            'no_null_names',
            lambda d: all(r.get('name') for r in d),
            severity='error',
        )
        
        checker.add_rule(
            'valid_emails',
            lambda d: all('@' in r.get('email', '') for r in d),
            severity='warning',
        )
        
        checker.add_rule(
            'unique_ids',
            lambda d: len(set(r['id'] for r in d)) == len(d),
            severity='error',
        )
        
        return checker.check(data)
    
    @task
    def process_if_valid(data: list, quality: dict) -> list:
        """Process data only if quality checks pass."""
        if not quality['passed']:
            raise ValueError(f"Quality checks failed: {quality['checks']}")
        
        # Process valid data
        return [r for r in data if r.get('name')]
    
    data = extract_data()
    quality = check_quality(data)
    processed = process_if_valid(data, quality)
    
    processed

quality_checked_pipeline()

Idempotent Pipeline Pattern

from airflow.decorators import task, dag
from datetime import datetime
import hashlib

@dag(
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['idempotent', 'production'],
)
def idempotent_pipeline():
    
    @task
    def generate_run_id(**context) -> str:
        """Generate unique run ID for idempotency."""
        execution_date = context['execution_date']
        run_id = hashlib.md5(
            f"{execution_date.isoformat()}".encode()
        ).hexdigest()
        return run_id
    
    @task
    def process_data(run_id: str) -> dict:
        """Process data with idempotency check."""
        from airflow.models import Variable
        
        # Check if already processed
        processed_key = f"processed_{run_id}"
        if Variable.get(processed_key, default_var=None):
            return {'status': 'already_processed', 'run_id': run_id}
        
        # Process data
        result = perform_processing()
        
        # Mark as processed
        Variable.set(processed_key, 'true')
        
        return {'status': 'processed', 'run_id': run_id, 'result': result}
    
    def perform_processing():
        """Actual processing logic."""
        return {'records': 100}
    
    @task
    def cleanup_duplicates(run_id: str):
        """Clean up any duplicate records."""
        print(f"Cleaning up for run: {run_id}")
    
    run_id = generate_run_id()
    result = process_data(run_id)
    cleanup_duplicates(run_id)
    
    result

idempotent_pipeline()

Error Recovery Pattern

from airflow.decorators import task, dag
from airflow.exceptions import AirflowException
from datetime import datetime, timedelta

@dag(
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['error-recovery', 'production'],
    default_args={
        'retries': 3,
        'retry_delay': timedelta(minutes=5),
    },
)
def resilient_pipeline():
    
    @task
    def extract_with_checkpoint():
        """Extract with checkpoint for recovery."""
        from airflow.models import Variable
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        
        # Get checkpoint
        checkpoint = Variable.get('extract_checkpoint', default_var='0')
        
        hook = PostgresHook(postgres_conn_id='source_db')
        query = f"""
            SELECT * FROM events
            WHERE id > {checkpoint}
            ORDER BY id
            LIMIT 1000
        """
        
        df = hook.get_pandas_df(query)
        
        # Save checkpoint
        if len(df) > 0:
            Variable.set('extract_checkpoint', str(df['id'].max()))
        
        return df.to_dict('records')
    
    @task
    def transform_with_validation(records: list) -> list:
        """Transform with validation to catch errors early."""
        transformed = []
        errors = []
        
        for record in records:
            try:
                # Transform
                transformed_record = transform_record(record)
                transformed.append(transformed_record)
            except Exception as e:
                errors.append({'record': record, 'error': str(e)})
        
        if errors:
            # Log errors but don't fail
            print(f"Transform errors: {len(errors)}")
        
        return transformed
    
    def transform_record(record):
        """Transform a single record."""
        if not record.get('id'):
            raise ValueError("Missing id")
        return {**record, 'transformed': True}
    
    @task
    def load_with_retry(records: list) -> int:
        """Load with retry logic for transient failures."""
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        
        hook = PostgresHook(postgres_conn_id='warehouse_db')
        
        loaded = 0
        for record in records:
            try:
                hook.insert_rows(
                    table='events',
                    rows=[(record['id'], record.get('transformed'))],
                    target_fields=['id', 'transformed'],
                )
                loaded += 1
            except Exception as e:
                print(f"Failed to load record {record['id']}: {e}")
                # Don't fail entire batch for single record failure
        
        return loaded
    
    records = extract_with_checkpoint()
    transformed = transform_with_validation(records)
    loaded = load_with_retry(transformed)
    
    loaded

resilient_pipeline()

Performance Metrics

Pipeline Performance

MetricTargetWarningCritical
Throughput> 10K records/sec5-10K/sec< 5K/sec
Latency< 5min5-15min> 15min
Data Quality> 99%95-99%< 95%
Success Rate> 99%95-99%< 95%
Resource Usage< 70%70-85%> 85%

Pattern Comparison

PatternData VolumeLatencyComplexityCost
Full ETL< 1GBHoursLowMedium
ELT1GB-1TBMinutesMediumMedium
Incremental> 1TBMinutesHighLow
CDCReal-timeSecondsVery HighHigh

Key Takeaways:

  • Choose ETL for small datasets; ELT for large datasets with modern warehouses
  • Use incremental loading for large, append-heavy datasets
  • Implement data quality gates between pipeline stages
  • Design pipelines to be idempotent for safe retries
  • Use checkpoints for recovery from failures
  • Monitor throughput, latency, and data quality metrics

See Also

Advertisement

Need Expert Airflow Help?

Get personalized DAG design, scheduling optimization, or production Airflow consulting.

Advertisement