Real-World Data Pipeline Patterns
Architecture Diagram
Formal Definitions
DfData Pipeline
A Data Pipeline is a series of data processing steps that extract data from sources, transform it, and load it into target systems. Formally, where is extraction, is transformation, and is loading. The pipeline maintains data integrity: .
DfIncremental Loading
Incremental Loading processes only new or changed data since the last run, rather than the entire dataset. The incremental condition is where is the timestamp of record .
DfData Quality Gate
A Data Quality Gate is a validation checkpoint that blocks data from progressing if quality criteria are not met. The gate function is where is the data being validated.
Detailed Explanation
ETL Pipeline Pattern
from airflow.decorators import task, dag
from datetime import datetime, timedelta
from typing import Dict, Any
@dag(
schedule_interval="0 6 * * *",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['etl', 'production'],
default_args={
'retries': 3,
'retry_delay': timedelta(minutes=5),
},
)
def etl_pipeline():
@task
def extract_orders() -> list:
"""Extract orders from source database."""
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id='source_db')
query = """
SELECT order_id, customer_id, order_date, total_amount
FROM orders
WHERE order_date >= '{{ ds }}'
AND order_date < '{{ next_ds }}'
"""
df = hook.get_pandas_df(query)
return df.to_dict('records')
@task
def transform_orders(orders: list) -> list:
"""Transform orders with business logic."""
transformed = []
for order in orders:
# Apply business rules
transformed_order = {
'order_id': order['order_id'],
'customer_id': order['customer_id'],
'order_date': order['order_date'],
'total_amount': float(order['total_amount']),
'order_category': categorize_order(float(order['total_amount'])),
'processed_at': datetime.now().isoformat(),
}
transformed.append(transformed_order)
return transformed
def categorize_order(amount: float) -> str:
"""Categorize order by amount."""
if amount >= 1000:
return 'enterprise'
elif amount >= 100:
return 'business'
else:
return 'consumer'
@task
def validate_data(data: list) -> dict:
"""Validate data quality."""
errors = []
for record in data:
if not record.get('order_id'):
errors.append(f"Missing order_id: {record}")
if record.get('total_amount', 0) < 0:
errors.append(f"Negative amount: {record}")
return {
'valid': len(errors) == 0,
'total_records': len(data),
'error_count': len(errors),
'errors': errors[:10],
}
@task
def load_to_warehouse(data: list, validation: dict) -> int:
"""Load validated data to warehouse."""
if not validation['valid']:
raise ValueError(f"Data validation failed: {validation['errors']}")
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id='warehouse_db')
# Insert data
rows = [
(d['order_id'], d['customer_id'], d['order_date'],
d['total_amount'], d['order_category'], d['processed_at'])
for d in data
]
hook.insert_rows(
table='fact_orders',
rows=rows,
target_fields=['order_id', 'customer_id', 'order_date',
'total_amount', 'order_category', 'processed_at'],
)
return len(rows)
# Define pipeline
orders = extract_orders()
transformed = transform_orders(orders)
validation = validate_data(transformed)
loaded = load_to_warehouse(transformed, validation)
loaded
etl_pipeline()
ELT Pipeline Pattern
from airflow.decorators import task, dag
from datetime import datetime
@dag(
schedule_interval="0 2 * * *",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['elt', 'dbt', 'production'],
)
def elt_pipeline():
@task
def extract_to_staging():
"""Extract raw data to staging area."""
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.postgres.hooks.postgres import PostgresHook
# Extract from source
source_hook = PostgresHook(postgres_conn_id='source_db')
df = source_hook.get_pandas_df("SELECT * FROM raw_events")
# Load to S3 (staging)
s3_hook = S3Hook(aws_conn_id='aws_default')
local_path = '/tmp/staging_events.parquet'
df.to_parquet(local_path, index=False)
s3_hook.load_file(
filename=local_path,
key=f'staging/events/{datetime.now().strftime("%Y/%m/%d")}/events.parquet',
bucket_name='data-lake',
replace=True,
)
return {'rows': len(df), 'status': 'staged'}
@task
def run_dbt_models():
"""Run dbt transformations in warehouse."""
import subprocess
# Run dbt models
result = subprocess.run(
['dbt', 'run', '--models', 'staging.*', 'marts.*'],
capture_output=True,
text=True,
cwd='/opt/dbt/project',
)
if result.returncode != 0:
raise Exception(f"dbt failed: {result.stderr}")
return {'status': 'transformed', 'output': result.stdout}
@task
def run_dbt_tests():
"""Run dbt data quality tests."""
import subprocess
result = subprocess.run(
['dbt', 'test'],
capture_output=True,
text=True,
cwd='/opt/dbt/project',
)
if result.returncode != 0:
raise Exception(f"dbt tests failed: {result.stderr}")
return {'status': 'tests_passed'}
@task
def refresh_dashboards():
"""Refresh BI dashboards after transformation."""
# Trigger dashboard refresh
print("Refreshing dashboards...")
return {'status': 'refreshed'}
# Define pipeline
staged = extract_to_staging()
transformed = run_dbt_models()
tested = run_dbt_tests()
refreshed = refresh_dashboards()
staged >> transformed >> tested >> refreshed
elt_pipeline()
Incremental Loading Pattern
from airflow.decorators import task, dag
from datetime import datetime, timedelta
from typing import Optional
@dag(
schedule_interval="@hourly",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['incremental', 'production'],
)
def incremental_pipeline():
@task
def get_last_checkpoint() -> Optional[str]:
"""Get the last processing checkpoint."""
from airflow.models import Variable
checkpoint = Variable.get(
'incremental_checkpoint',
default_var=None,
)
return checkpoint
@task
def extract_incremental(last_checkpoint: Optional[str]) -> list:
"""Extract only new/changed records."""
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id='source_db')
if last_checkpoint:
query = f"""
SELECT * FROM events
WHERE updated_at > '{last_checkpoint}'
ORDER BY updated_at
LIMIT 10000
"""
else:
query = "SELECT * FROM events ORDER BY updated_at LIMIT 10000"
df = hook.get_pandas_df(query)
return df.to_dict('records')
@task
def transform_incremental(records: list) -> list:
"""Transform incremental records."""
transformed = []
for record in records:
# Apply transformations
transformed_record = {
'id': record['id'],
'event_type': record['event_type'].lower(),
'payload': record['payload'],
'event_timestamp': record['event_timestamp'],
'processed_at': datetime.now().isoformat(),
}
transformed.append(transformed_record)
return transformed
@task
def upsert_to_warehouse(records: list) -> int:
"""Upsert records to warehouse (insert or update)."""
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id='warehouse_db')
# Use upsert (INSERT ON CONFLICT UPDATE)
rows = [
(r['id'], r['event_type'], r['payload'],
r['event_timestamp'], r['processed_at'])
for r in records
]
hook.insert_rows(
table='events',
rows=rows,
target_fields=['id', 'event_type', 'payload',
'event_timestamp', 'processed_at'],
)
return len(rows)
@task
def update_checkpoint(records: list):
"""Update checkpoint with latest timestamp."""
from airflow.models import Variable
if records:
latest_timestamp = max(r['event_timestamp'] for r in records)
Variable.set('incremental_checkpoint', latest_timestamp)
# Define pipeline
last_checkpoint = get_last_checkpoint()
records = extract_incremental(last_checkpoint)
transformed = transform_incremental(records)
loaded = upsert_to_warehouse(transformed)
update_checkpoint(records)
loaded
incremental_pipeline()
Here,
- =Volume of incremental data
- =Full dataset
- =Timestamp of record d
- =Last processing checkpoint
Pipeline Throughput
Here,
- =Volume of data processed
- =Pipeline execution time
Data Quality Score
Here,
- =Data quality score
- =Number of valid records
- =Total number of records
Use incremental loading for large datasets to reduce processing time and resource usage. Track checkpoints using Airflow Variables or a dedicated metadata table.
Implement data quality gates between pipeline stages. Fail fast when quality issues are detected to prevent bad data from propagating downstream.
Key Concepts Table
| Pattern | Use Case | Complexity | Performance |
|---|---|---|---|
| Full ETL | Small datasets | Low | Medium |
| ELT | Large datasets, modern warehouses | Medium | High |
| Incremental | Large, append-heavy datasets | High | Very High |
| CDC | Real-time sync, audit trails | Very High | Very High |
| Streaming | Real-time analytics | Very High | Very High |
Code Examples
Data Quality Framework
# data_quality.py
from airflow.decorators import task, dag
from datetime import datetime
from typing import Dict, Any, List
import json
class DataQualityChecker:
"""Comprehensive data quality checking framework."""
def __init__(self):
self.rules = []
def add_rule(self, name, check_fn, severity='error'):
"""Add a quality rule."""
self.rules.append({
'name': name,
'check_fn': check_fn,
'severity': severity,
})
def check(self, data: List[Dict]) -> Dict[str, Any]:
"""Run all quality checks."""
results = {
'passed': True,
'total_records': len(data),
'checks': [],
}
for rule in self.rules:
try:
passed = rule['check_fn'](data)
results['checks'].append({
'name': rule['name'],
'passed': passed,
'severity': rule['severity'],
})
if not passed and rule['severity'] == 'error':
results['passed'] = False
except Exception as e:
results['checks'].append({
'name': rule['name'],
'passed': False,
'error': str(e),
'severity': rule['severity'],
})
if rule['severity'] == 'error':
results['passed'] = False
return results
# Usage in DAG
@dag(
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['data-quality', 'production'],
)
def quality_checked_pipeline():
@task
def extract_data() -> list:
"""Extract data from source."""
return [
{'id': 1, 'name': 'Alice', 'email': 'alice@example.com'},
{'id': 2, 'name': 'Bob', 'email': 'bob@example.com'},
{'id': 3, 'name': '', 'email': 'invalid-email'},
]
@task
def check_quality(data: list) -> dict:
"""Run data quality checks."""
checker = DataQualityChecker()
# Add rules
checker.add_rule(
'no_null_names',
lambda d: all(r.get('name') for r in d),
severity='error',
)
checker.add_rule(
'valid_emails',
lambda d: all('@' in r.get('email', '') for r in d),
severity='warning',
)
checker.add_rule(
'unique_ids',
lambda d: len(set(r['id'] for r in d)) == len(d),
severity='error',
)
return checker.check(data)
@task
def process_if_valid(data: list, quality: dict) -> list:
"""Process data only if quality checks pass."""
if not quality['passed']:
raise ValueError(f"Quality checks failed: {quality['checks']}")
# Process valid data
return [r for r in data if r.get('name')]
data = extract_data()
quality = check_quality(data)
processed = process_if_valid(data, quality)
processed
quality_checked_pipeline()
Idempotent Pipeline Pattern
from airflow.decorators import task, dag
from datetime import datetime
import hashlib
@dag(
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['idempotent', 'production'],
)
def idempotent_pipeline():
@task
def generate_run_id(**context) -> str:
"""Generate unique run ID for idempotency."""
execution_date = context['execution_date']
run_id = hashlib.md5(
f"{execution_date.isoformat()}".encode()
).hexdigest()
return run_id
@task
def process_data(run_id: str) -> dict:
"""Process data with idempotency check."""
from airflow.models import Variable
# Check if already processed
processed_key = f"processed_{run_id}"
if Variable.get(processed_key, default_var=None):
return {'status': 'already_processed', 'run_id': run_id}
# Process data
result = perform_processing()
# Mark as processed
Variable.set(processed_key, 'true')
return {'status': 'processed', 'run_id': run_id, 'result': result}
def perform_processing():
"""Actual processing logic."""
return {'records': 100}
@task
def cleanup_duplicates(run_id: str):
"""Clean up any duplicate records."""
print(f"Cleaning up for run: {run_id}")
run_id = generate_run_id()
result = process_data(run_id)
cleanup_duplicates(run_id)
result
idempotent_pipeline()
Error Recovery Pattern
from airflow.decorators import task, dag
from airflow.exceptions import AirflowException
from datetime import datetime, timedelta
@dag(
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['error-recovery', 'production'],
default_args={
'retries': 3,
'retry_delay': timedelta(minutes=5),
},
)
def resilient_pipeline():
@task
def extract_with_checkpoint():
"""Extract with checkpoint for recovery."""
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
# Get checkpoint
checkpoint = Variable.get('extract_checkpoint', default_var='0')
hook = PostgresHook(postgres_conn_id='source_db')
query = f"""
SELECT * FROM events
WHERE id > {checkpoint}
ORDER BY id
LIMIT 1000
"""
df = hook.get_pandas_df(query)
# Save checkpoint
if len(df) > 0:
Variable.set('extract_checkpoint', str(df['id'].max()))
return df.to_dict('records')
@task
def transform_with_validation(records: list) -> list:
"""Transform with validation to catch errors early."""
transformed = []
errors = []
for record in records:
try:
# Transform
transformed_record = transform_record(record)
transformed.append(transformed_record)
except Exception as e:
errors.append({'record': record, 'error': str(e)})
if errors:
# Log errors but don't fail
print(f"Transform errors: {len(errors)}")
return transformed
def transform_record(record):
"""Transform a single record."""
if not record.get('id'):
raise ValueError("Missing id")
return {**record, 'transformed': True}
@task
def load_with_retry(records: list) -> int:
"""Load with retry logic for transient failures."""
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id='warehouse_db')
loaded = 0
for record in records:
try:
hook.insert_rows(
table='events',
rows=[(record['id'], record.get('transformed'))],
target_fields=['id', 'transformed'],
)
loaded += 1
except Exception as e:
print(f"Failed to load record {record['id']}: {e}")
# Don't fail entire batch for single record failure
return loaded
records = extract_with_checkpoint()
transformed = transform_with_validation(records)
loaded = load_with_retry(transformed)
loaded
resilient_pipeline()
Performance Metrics
Pipeline Performance
| Metric | Target | Warning | Critical |
|---|---|---|---|
| Throughput | > 10K records/sec | 5-10K/sec | < 5K/sec |
| Latency | < 5min | 5-15min | > 15min |
| Data Quality | > 99% | 95-99% | < 95% |
| Success Rate | > 99% | 95-99% | < 95% |
| Resource Usage | < 70% | 70-85% | > 85% |
Pattern Comparison
| Pattern | Data Volume | Latency | Complexity | Cost |
|---|---|---|---|---|
| Full ETL | < 1GB | Hours | Low | Medium |
| ELT | 1GB-1TB | Minutes | Medium | Medium |
| Incremental | > 1TB | Minutes | High | Low |
| CDC | Real-time | Seconds | Very High | High |
Key Takeaways:
- Choose ETL for small datasets; ELT for large datasets with modern warehouses
- Use incremental loading for large, append-heavy datasets
- Implement data quality gates between pipeline stages
- Design pipelines to be idempotent for safe retries
- Use checkpoints for recovery from failures
- Monitor throughput, latency, and data quality metrics
See Also
- Connection Management — Database and API connections
- Error Handling — Retry and recovery patterns
- Performance Tuning — Optimizing pipeline performance
- Testing DAGs — Testing data pipelines