CI/CD Architecture
Cloud Build Pipeline
# cloudbuild.yaml
steps:
# Step 1: Lint and test Python code
- name: 'python:3.11'
entrypoint: 'bash'
args:
- '-c'
- |
pip install flake8 pytest pytest-cov
flake8 src/ --max-line-length=100
pytest tests/ --cov=src/ --cov-report=html
# Step 2: Run SQL linting
- name: 'python:3.11'
entrypoint: 'bash'
args:
- '-c'
- |
pip install sqlfluff
sqlfluff lint models/ --dialect bigquery
# Step 3: Test Dataform models
- name: 'gcr.io/my-project/dataform-tester'
entrypoint: 'bash'
args:
- '-c'
- |
dataform compile
dataform test
# Step 4: Build Dataflow template
- name: 'gcr.io/cloud-builders/gcloud'
entrypoint: 'bash'
args:
- '-c'
- |
gcloud dataflow flex-templates build \
gs://my-templates/pipeline \
--image=gcr.io/my-project/pipeline:latest \
--sdk-language=PYTHON
# Step 5: Deploy to Cloud Composer
- name: 'gcr.io/cloud-builders/gcloud'
entrypoint: 'bash'
args:
- '-c'
- |
gsutil cp dags/*.gs gs://my-composer-bucket/dags/
# Step 6: Deploy DBT models
- name: 'gcr.io/my-project/dbt-runner'
entrypoint: 'bash'
args:
- '-c'
- |
dbt deps
dbt run --target prod
dbt test --target prod
timeout: '1800s'
options:
logging: CLOUD_LOGGING_ONLY
Testing Strategies
# Unit tests for data transformations
import pytest
from src.transformations import parse_event, validate_record
def test_parse_event():
"""Test event parsing."""
raw_event = '{"event_id": "123", "type": "purchase", "amount": 99.99}'
result = parse_event(raw_event)
assert result['event_id'] == '123'
assert result['amount'] == 99.99
def test_validate_record_valid():
"""Test valid record validation."""
record = {'event_id': '123', 'amount': 100}
assert validate_record(record) is True
def test_validate_record_invalid():
"""Test invalid record validation."""
record = {'amount': -1}
assert validate_record(record) is False
# Data quality tests
def test_bigquery_data_quality():
"""Test BigQuery data quality."""
from google.cloud import bigquery
client = bigquery.Client()
query = """
SELECT
COUNT(*) as total,
COUNTIF(order_id IS NULL) as null_ids,
COUNTIF(amount < 0) as negative_amounts
FROM `project.analytics.sales`
WHERE DATE(processed_at) = CURRENT_DATE()
"""
results = client.query(query).result()
row = list(results)[0]
assert row.null_ids == 0, f"Found {row.null_ids} null order IDs"
assert row.negative_amounts == 0, f"Found {row.negative_amounts} negative amounts"
β¨
Best Practice: Implement multiple testing levels: 1) Unit tests for transforms, 2) Integration tests for pipeline components, 3) Data quality tests for output validation, 4) Performance tests for SLA compliance. Use Cloud Build for automated testing on every commit.
Common Interview Questions
Q1: What should be included in a data engineering CI/CD pipeline?
Answer: 1) Code linting and formatting, 2) Unit tests for transformations, 3) SQL linting and testing, 4) Data quality validation, 5) Schema validation, 6) Integration tests, 7) Performance tests, 8) Security scanning.
Q2: How do you test Dataflow pipelines?
Answer: Use DirectRunner for local testing with small datasets. Test individual DoFns with pytest. Use test pipelines for integration testing. Validate output schemas and data quality. Use Dataflow Templates for deployment testing.
Q3: How do you handle database migrations in CI/CD?
Answer: 1) Version control schema changes, 2) Use migration tools (Flyway, Liquibase), 3) Test migrations in staging, 4) Implement rollback procedures, 5) Use BigQuery dry-runs to validate SQL, 6) Deploy migrations before code.
Q4: What is the benefit of Dataform for CI/CD?
Answer: Dataform provides built-in testing (assertions), version control via Git, compilation for validation, and scheduled execution. It integrates with Cloud Build for automated deployment and testing of SQL transformations.
Q5: How do you implement rollback in data pipelines?
Answer: 1) Version control all artifacts, 2) Use Blue/Green deployments, 3) Maintain rollback scripts, 4) Use BigQuery time-travel for data rollback, 5) Keep previous pipeline versions deployable, 6) Test rollback procedures regularly.