๐ŸŽ‰ 75% of content is free forever โ€” Unlock Premium from $10/mo โ†’
CW
Search coursesโ€ฆ
๐Ÿ’ผ Servicesโ„น๏ธ Aboutโœ‰๏ธ ContactView Pricing Plansfrom $10

CI/CD for Data Pipelines

Cloud ArchitectureData Engineeringโญ Premium

Advertisement

CI/CD for Data Pipelines

Difficulty: Senior Level | Companies: AWS, Google, Microsoft, Netflix, Uber

Data Pipeline Challenges

Data pipelines have unique CI/CD challenges: schema evolution, data quality validation, backfills, and state management.

โ„น๏ธ

Data pipelines are code. They should be version controlled, tested, and deployed with the same rigor as application code.

Data Pipeline CI/CD Architecture

Architecture Diagram
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  Push to โ”‚โ”€โ”€โ”€โ–ถโ”‚  Lint &  โ”‚โ”€โ”€โ”€โ–ถโ”‚  Unit    โ”‚โ”€โ”€โ”€โ–ถโ”‚  Integra-โ”‚
โ”‚  Git     โ”‚    โ”‚  Format  โ”‚    โ”‚  Tests   โ”‚    โ”‚  tion    โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚  Tests   โ”‚
                                                โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”˜
                                                     โ”‚
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”
โ”‚  Deploy  โ”‚โ—€โ”€โ”€โ”€โ”‚  Staging โ”‚โ—€โ”€โ”€โ”€โ”‚  Data    โ”‚โ—€โ”€โ”€โ”€โ”‚  Schema  โ”‚
โ”‚  Prod    โ”‚    โ”‚  Deploy  โ”‚    โ”‚  Quality โ”‚    โ”‚  Valid.  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Pattern 1: dbt Project CI/CD

Test and deploy dbt models with automated checks.

# .github/workflows/dbt-ci.yml
name: dbt CI/CD

on:
  push:
    branches: [main, develop]
    paths: ['dbt/**']
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
      
      - name: Install dependencies
        run: |
          pip install dbt-postgres pytest pandas
      
      - name: dbt deps
        run: dbt deps
        working-directory: dbt
      
      - name: dbt compile
        run: dbt compile
        working-directory: dbt
        env:
          DBT_PROFILES_DIR: .github/dbt-profiles
      
      - name: Run unit tests
        run: pytest tests/unit/
        working-directory: dbt
      
      - name: dbt source freshness
        run: dbt source freshness
        working-directory: dbt
        env:
          DBT_PROFILES_DIR: .github/dbt-profiles
      
      - name: dbt build (staging)
        run: dbt build --select tag:staging
        working-directory: dbt
        env:
          DBT_PROFILES_DIR: .github/dbt-profiles-staging

  deploy:
    needs: test
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: dbt build (production)
        run: dbt build --full-refresh
        working-directory: dbt
        env:
          DBT_PROFILES_DIR: .github/dbt-profiles-prod

Pattern 2: Data Quality Testing Framework

Validate data quality at each pipeline stage.

# data_quality/validators.py
import great_expectations as ge
from datetime import datetime, timedelta

class DataQualityValidator:
    def __init__(self, spark_session):
        self.spark = spark_session
    
    def validate_orders(self, df):
        """Validate order data quality."""
        ge_df = ge.from_pandas(df.toPandas())
        
        results = []
        
        # Check 1: No null order IDs
        results.append(
            ge_df.expect_column_values_to_not_be_null('order_id')
        )
        
        # Check 2: Order IDs are unique
        results.append(
            ge_df.expect_column_values_to_be_unique('order_id')
        )
        
        # Check 3: Total is positive
        results.append(
            ge_df.expect_column_values_to_be_between(
                'total', min_value=0.01, max_value=1000000
            )
        )
        
        # Check 4: Status is valid enum
        results.append(
            ge_df.expect_column_values_to_be_in_set(
                'status', ['pending', 'confirmed', 'shipped', 'delivered']
            )
        )
        
        # Check 5: Created date is not in future
        results.append(
            ge_df.expect_column_values_to_be_between(
                'created_at',
                min_value=datetime(2020, 1, 1),
                max_value=datetime.now()
            )
        )
        
        # Check 6: Row count sanity check
        row_count = df.count()
        results.append(
            ge_df.expect_table_row_count_to_be_between(
                min_value=100,  # Minimum expected rows
                max_value=10000000
            )
        )
        
        # Fail if any critical check fails
        failures = [r for r in results if not r.success]
        if failures:
            raise DataQualityError(
                f"Data quality checks failed: {[r.expectation_config.expectation_type for r in failures]}"
            )
        
        return results

โ„น๏ธ

Use Great Expectations or Soda Core for data quality. Define expectations as code and run them in your CI/CD pipeline.

Pattern 3: Schema Evolution with Backward Compatibility

Handle schema changes without breaking downstream consumers.

# schema_evolution/manager.py
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import json

class SchemaManager:
    def __init__(self, schema_registry_url: str):
        self.registry_url = schema_registry_url
    
    def check_compatibility(self, new_schema: StructType, topic: str) -> bool:
        """Check if new schema is backward compatible."""
        current_schema = self.get_current_schema(topic)
        
        # Rule 1: New fields must have defaults
        new_fields = set(f.name for f in new_schema.fields) - set(f.name for f in current_schema.fields)
        for field_name in new_fields:
            field = new_schema[field_name]
            if not field.nullable:
                raise SchemaCompatibilityError(
                    f"New field {field_name} must be nullable for backward compatibility"
                )
        
        # Rule 2: Cannot remove fields
        removed_fields = set(f.name for f in current_schema.fields) - set(f.name for f in new_schema.fields)
        if removed_fields:
            raise SchemaCompatibilityError(
                f"Cannot remove fields: {removed_fields}"
            )
        
        # Rule 3: Cannot change field types (unless widening)
        for field in current_schema.fields:
            if field.name in [f.name for f in new_schema.fields]:
                new_field = new_schema[field.name]
                if not self.is_type_widening(field.dataType, new_field.dataType):
                    raise SchemaCompatibilityError(
                        f"Cannot change type of {field.name}: {field.dataType} -> {new_field.dataType}"
                    )
        
        return True
    
    def is_type_widening(self, current_type, new_type) -> bool:
        """Check if type change is a valid widening."""
        widening_rules = {
            IntegerType: [StringType],
            StringType: [],
        }
        return new_type in widening_rules.get(type(current_type), [])

Pattern 4: Airflow DAG CI/CD

Test and deploy Airflow DAGs with version control.

# dags/order_processing.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['data-alerts@example.com'],
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'order_processing',
    default_args=default_args,
    description='Process orders from S3 to Redshift',
    schedule_interval='@hourly',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['orders', 'production'],
) as dag:
    
    extract = PythonOperator(
        task_id='extract_orders',
        python_callable=extract_orders,
        op_kwargs={'execution_date': '{{ ds }}'},
    )
    
    transform = PythonOperator(
        task_id='transform_orders',
        python_callable=transform_orders,
    )
    
    load = S3ToRedshiftOperator(
        task_id='load_to_redshift',
        schema='raw',
        table='orders',
        s3_bucket='data-lake',
        s3_key='orders/{{ ds }}/orders.parquet',
        copy_options=['FORMAT AS PARQUET'],
    )
    
    quality_check = PythonOperator(
        task_id='quality_check',
        python_callable=run_quality_checks,
    )
    
    extract >> transform >> load >> quality_check
# .github/workflows/airflow-ci.yml
name: Airflow DAG CI

on:
  push:
    paths: ['dags/**']

jobs:
  test-dags:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
      
      - name: Install Airflow
        run: |
          pip install apache-airflow==2.8.0
          pip install apache-airflow-providers-amazon
      
      - name: Test DAG loading
        run: |
          python -c "
          from airflow.models import DagBag
          dag_bag = DagBag(dag_folder='dags')
          if dag_bag.import_errors:
              raise Exception(f'DAG import errors: {dag_bag.import_errors}')
          print(f'Loaded {len(dag_bag.dags)} DAGs')
          "
      
      - name: Lint DAGs
        run: |
          pip install ruff
          ruff check dags/

โš ๏ธ

Always test DAG loading in CI. Import errors and syntax issues should be caught before deployment.

Pattern 5: Data Pipeline Testing Pyramid

Architecture Diagram
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚         End-to-End Tests (5%)           โ”‚
โ”‚   Full pipeline execution in staging    โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚      Integration Tests (15%)            โ”‚
โ”‚  Cross-component testing with test data โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚        Unit Tests (80%)                 โ”‚
โ”‚    Transformations, validators, utils   โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

CI/CD Pipeline Checklist

  1. Version Control - All pipeline code in Git
  2. Automated Testing - Unit, integration, data quality
  3. Schema Validation - Backward compatibility checks
  4. Staging Deployment - Test in production-like environment
  5. Rollback Strategy - Ability to revert failed deployments
  6. Monitoring - Alert on pipeline failures

Follow-Up Questions

  1. How do you handle backfills when deploying schema changes to production?
  2. What strategies would you use to test data transformations with realistic data volumes?
  3. How do you implement blue-green deployments for Airflow DAGs?

Advertisement