CI/CD for Data Pipelines
Difficulty: Senior Level | Companies: AWS, Google, Microsoft, Netflix, Uber
Data Pipeline Challenges
Data pipelines have unique CI/CD challenges: schema evolution, data quality validation, backfills, and state management.
โน๏ธ
Data pipelines are code. They should be version controlled, tested, and deployed with the same rigor as application code.
Data Pipeline CI/CD Architecture
โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ
โ Push to โโโโโถโ Lint & โโโโโถโ Unit โโโโโถโ Integra-โ
โ Git โ โ Format โ โ Tests โ โ tion โ
โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ Tests โ
โโโโโโฌโโโโโโ
โ
โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโผโโโโโโ
โ Deploy โโโโโโ Staging โโโโโโ Data โโโโโโ Schema โ
โ Prod โ โ Deploy โ โ Quality โ โ Valid. โ
โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ
Pattern 1: dbt Project CI/CD
Test and deploy dbt models with automated checks.
# .github/workflows/dbt-ci.yml
name: dbt CI/CD
on:
push:
branches: [main, develop]
paths: ['dbt/**']
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install dbt-postgres pytest pandas
- name: dbt deps
run: dbt deps
working-directory: dbt
- name: dbt compile
run: dbt compile
working-directory: dbt
env:
DBT_PROFILES_DIR: .github/dbt-profiles
- name: Run unit tests
run: pytest tests/unit/
working-directory: dbt
- name: dbt source freshness
run: dbt source freshness
working-directory: dbt
env:
DBT_PROFILES_DIR: .github/dbt-profiles
- name: dbt build (staging)
run: dbt build --select tag:staging
working-directory: dbt
env:
DBT_PROFILES_DIR: .github/dbt-profiles-staging
deploy:
needs: test
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: dbt build (production)
run: dbt build --full-refresh
working-directory: dbt
env:
DBT_PROFILES_DIR: .github/dbt-profiles-prod
Pattern 2: Data Quality Testing Framework
Validate data quality at each pipeline stage.
# data_quality/validators.py
import great_expectations as ge
from datetime import datetime, timedelta
class DataQualityValidator:
def __init__(self, spark_session):
self.spark = spark_session
def validate_orders(self, df):
"""Validate order data quality."""
ge_df = ge.from_pandas(df.toPandas())
results = []
# Check 1: No null order IDs
results.append(
ge_df.expect_column_values_to_not_be_null('order_id')
)
# Check 2: Order IDs are unique
results.append(
ge_df.expect_column_values_to_be_unique('order_id')
)
# Check 3: Total is positive
results.append(
ge_df.expect_column_values_to_be_between(
'total', min_value=0.01, max_value=1000000
)
)
# Check 4: Status is valid enum
results.append(
ge_df.expect_column_values_to_be_in_set(
'status', ['pending', 'confirmed', 'shipped', 'delivered']
)
)
# Check 5: Created date is not in future
results.append(
ge_df.expect_column_values_to_be_between(
'created_at',
min_value=datetime(2020, 1, 1),
max_value=datetime.now()
)
)
# Check 6: Row count sanity check
row_count = df.count()
results.append(
ge_df.expect_table_row_count_to_be_between(
min_value=100, # Minimum expected rows
max_value=10000000
)
)
# Fail if any critical check fails
failures = [r for r in results if not r.success]
if failures:
raise DataQualityError(
f"Data quality checks failed: {[r.expectation_config.expectation_type for r in failures]}"
)
return results
โน๏ธ
Use Great Expectations or Soda Core for data quality. Define expectations as code and run them in your CI/CD pipeline.
Pattern 3: Schema Evolution with Backward Compatibility
Handle schema changes without breaking downstream consumers.
# schema_evolution/manager.py
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import json
class SchemaManager:
def __init__(self, schema_registry_url: str):
self.registry_url = schema_registry_url
def check_compatibility(self, new_schema: StructType, topic: str) -> bool:
"""Check if new schema is backward compatible."""
current_schema = self.get_current_schema(topic)
# Rule 1: New fields must have defaults
new_fields = set(f.name for f in new_schema.fields) - set(f.name for f in current_schema.fields)
for field_name in new_fields:
field = new_schema[field_name]
if not field.nullable:
raise SchemaCompatibilityError(
f"New field {field_name} must be nullable for backward compatibility"
)
# Rule 2: Cannot remove fields
removed_fields = set(f.name for f in current_schema.fields) - set(f.name for f in new_schema.fields)
if removed_fields:
raise SchemaCompatibilityError(
f"Cannot remove fields: {removed_fields}"
)
# Rule 3: Cannot change field types (unless widening)
for field in current_schema.fields:
if field.name in [f.name for f in new_schema.fields]:
new_field = new_schema[field.name]
if not self.is_type_widening(field.dataType, new_field.dataType):
raise SchemaCompatibilityError(
f"Cannot change type of {field.name}: {field.dataType} -> {new_field.dataType}"
)
return True
def is_type_widening(self, current_type, new_type) -> bool:
"""Check if type change is a valid widening."""
widening_rules = {
IntegerType: [StringType],
StringType: [],
}
return new_type in widening_rules.get(type(current_type), [])
Pattern 4: Airflow DAG CI/CD
Test and deploy Airflow DAGs with version control.
# dags/order_processing.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'email_on_failure': True,
'email': ['data-alerts@example.com'],
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'order_processing',
default_args=default_args,
description='Process orders from S3 to Redshift',
schedule_interval='@hourly',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['orders', 'production'],
) as dag:
extract = PythonOperator(
task_id='extract_orders',
python_callable=extract_orders,
op_kwargs={'execution_date': '{{ ds }}'},
)
transform = PythonOperator(
task_id='transform_orders',
python_callable=transform_orders,
)
load = S3ToRedshiftOperator(
task_id='load_to_redshift',
schema='raw',
table='orders',
s3_bucket='data-lake',
s3_key='orders/{{ ds }}/orders.parquet',
copy_options=['FORMAT AS PARQUET'],
)
quality_check = PythonOperator(
task_id='quality_check',
python_callable=run_quality_checks,
)
extract >> transform >> load >> quality_check
# .github/workflows/airflow-ci.yml
name: Airflow DAG CI
on:
push:
paths: ['dags/**']
jobs:
test-dags:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install Airflow
run: |
pip install apache-airflow==2.8.0
pip install apache-airflow-providers-amazon
- name: Test DAG loading
run: |
python -c "
from airflow.models import DagBag
dag_bag = DagBag(dag_folder='dags')
if dag_bag.import_errors:
raise Exception(f'DAG import errors: {dag_bag.import_errors}')
print(f'Loaded {len(dag_bag.dags)} DAGs')
"
- name: Lint DAGs
run: |
pip install ruff
ruff check dags/
โ ๏ธ
Always test DAG loading in CI. Import errors and syntax issues should be caught before deployment.
Pattern 5: Data Pipeline Testing Pyramid
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ End-to-End Tests (5%) โ
โ Full pipeline execution in staging โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Integration Tests (15%) โ
โ Cross-component testing with test data โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Unit Tests (80%) โ
โ Transformations, validators, utils โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
CI/CD Pipeline Checklist
- Version Control - All pipeline code in Git
- Automated Testing - Unit, integration, data quality
- Schema Validation - Backward compatibility checks
- Staging Deployment - Test in production-like environment
- Rollback Strategy - Ability to revert failed deployments
- Monitoring - Alert on pipeline failures
Follow-Up Questions
- How do you handle backfills when deploying schema changes to production?
- What strategies would you use to test data transformations with realistic data volumes?
- How do you implement blue-green deployments for Airflow DAGs?