CI/CD Pipelines for Airflow
Architecture Diagram
Formal Definitions
DfCI/CD Pipeline
A CI/CD Pipeline is an automated workflow that builds, tests, and deploys code changes. For Airflow, it ensures DAGs are validated before deployment and can be rolled back if issues occur. The pipeline is where is build, is test, is deploy, and is monitor.
DfGitOps
GitOps is a deployment paradigm where Git is the single source of truth for declarative infrastructure and application configuration. Changes are applied automatically when Git state diverges from the live state. Formally, where is Git state.
DfDAG Versioning
DAG Versioning tracks changes to DAG files over time, enabling rollback and audit trails. Each deployment creates a version where is the Git commit, is the test results, is the deploy timestamp, and is the monitoring status.
Detailed Explanation
CI Pipeline Configuration
# .github/workflows/airflow-ci.yml
name: Airflow CI
on:
push:
branches: [main, develop]
paths:
- 'dags/**'
- 'plugins/**'
- 'tests/**'
pull_request:
branches: [main]
jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: |
pip install flake8 black isort mypy pylint
- name: Lint with flake8
run: flake8 dags/ plugins/ tests/ --max-line-length=120
- name: Check formatting with black
run: black --check dags/ plugins/ tests/
- name: Sort imports
run: isort --check-only dags/ plugins/ tests/
- name: Type check with mypy
run: mypy dags/ --ignore-missing-imports
test:
runs-on: ubuntu-latest
needs: lint
services:
postgres:
image: postgres:15
env:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
ports:
- 5432:5432
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install Airflow
run: |
pip install apache-airflow==2.8.0
pip install pytest pytest-cov pytest-xdist
pip install -r requirements.txt
- name: Run DAG validation
run: |
pytest tests/test_dag_validation.py -v --tb=short
- name: Run unit tests
run: |
pytest tests/unit/ -v --cov=dags --cov-report=xml -n auto
- name: Run integration tests
run: |
pytest tests/integration/ -v --tb=short
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
build-image:
runs-on: ubuntu-latest
needs: test
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v3
- name: Build Docker image
run: |
docker build -t airflow-dags:${{ github.sha }} .
docker tag airflow-dags:${{ github.sha }} airflow-dags:latest
- name: Push to registry
run: |
echo ${{ secrets.REGISTRY_PASSWORD }} | docker login -u ${{ secrets.REGISTRY_USER }} --password-stdin
docker push airflow-dags:${{ github.sha }}
docker push airflow-dags:latest
DAG Testing Framework
# tests/test_dags.py
import pytest
from airflow.models import DagBag
from airflow.utils.state import State
import os
class TestDAGs:
"""Comprehensive DAG testing framework."""
@pytest.fixture(autouse=True)
def setup(self):
self.dagbag = DagBag(
dag_folder='/opt/airflow/dags',
include_examples=False,
)
def test_dags_import_successfully(self):
"""All DAGs should import without errors."""
assert len(self.dagbag.import_errors) == 0, \
f"DAG import errors: {self.dagbag.import_errors}"
def test_dag_count(self):
"""Verify expected DAGs are loaded."""
expected_dags = {'etl_dag', 'reporting_dag', 'maintenance_dag'}
loaded_dags = set(self.dagbag.dags.keys())
missing = expected_dags - loaded_dags
assert not missing, f"Missing DAGs: {missing}"
def test_dag_has_tags(self):
"""All DAGs should have tags."""
for dag_id, dag in self.dagbag.dags.items():
assert len(dag.tags) > 0, f"DAG {dag_id} has no tags"
def test_dag_default_args(self):
"""Verify default_args configuration."""
for dag_id, dag in self.dagbag.dags.items():
if dag.default_args:
assert 'retries' in dag.default_args, \
f"DAG {dag_id} missing retries"
assert 'retry_delay' in dag.default_args, \
f"DAG {dag_id} missing retry_delay"
def test_task_count_reasonable(self):
"""DAGs should not have excessive task counts."""
for dag_id, dag in self.dagbag.dags.items():
assert len(dag.tasks) <= 100, \
f"DAG {dag_id} has {len(dag.tasks)} tasks (max 100)"
def test_no_circular_dependencies(self):
"""All DAGs must be acyclic."""
import networkx as nx
for dag_id, dag in self.dagbag.dags.items():
G = nx.DiGraph()
for task in dag.tasks:
G.add_node(task.task_id)
for upstream in task.upstream_list:
G.add_edge(upstream.task_id, task.task_id)
assert nx.is_directed_acyclic_graph(G), \
f"DAG {dag_id} has circular dependencies"
Deployment Automation
# deploy/dag_deployer.py
import os
import shutil
import hashlib
from datetime import datetime
from pathlib import Path
class DAGDeployer:
"""Automate DAG deployment with versioning."""
def __init__(self, source_dir, target_dir, backup_dir):
self.source_dir = Path(source_dir)
self.target_dir = Path(target_dir)
self.backup_dir = Path(backup_dir)
def calculate_checksum(self, file_path):
"""Calculate MD5 checksum of file."""
hasher = hashlib.md5()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b''):
hasher.update(chunk)
return hasher.hexdigest()
def backup_existing_dags(self):
"""Backup existing DAGs before deployment."""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_path = self.backup_dir / f'backup_{timestamp}'
if self.target_dir.exists():
shutil.copytree(self.target_dir, backup_path)
print(f"Backup created: {backup_path}")
return backup_path
def deploy_dags(self, dry_run=False):
"""Deploy DAGs from source to target."""
# Create backup
if not dry_run:
self.backup_existing_dags()
# Calculate checksums
source_checksums = {}
for file_path in self.source_dir.rglob('*.py'):
relative_path = file_path.relative_to(self.source_dir)
source_checksums[relative_path] = self.calculate_checksum(file_path)
target_checksums = {}
if self.target_dir.exists():
for file_path in self.target_dir.rglob('*.py'):
relative_path = file_path.relative_to(self.target_dir)
target_checksums[relative_path] = self.calculate_checksum(file_path)
# Find changes
new_files = set(source_checksums.keys()) - set(target_checksums.keys())
modified_files = {
f for f in source_checksums.keys()
if f in target_checksums and source_checksums[f] != target_checksums[f]
}
deleted_files = set(target_checksums.keys()) - set(source_checksums.keys())
print(f"New files: {len(new_files)}")
print(f"Modified files: {len(modified_files)}")
print(f"Deleted files: {len(deleted_files)}")
if dry_run:
print("Dry run - no changes applied")
return
# Deploy changes
for file_path in new_files | modified_files:
source = self.source_dir / file_path
target = self.target_dir / file_path
target.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source, target)
print(f"Deployed: {file_path}")
for file_path in deleted_files:
target = self.target_dir / file_path
if target.exists():
target.unlink()
print(f"Deleted: {file_path}")
return {
'new': len(new_files),
'modified': len(modified_files),
'deleted': len(deleted_files),
}
if __name__ == "__main__":
deployer = DAGDeployer(
source_dir='/repo/dags',
target_dir='/opt/airflow/dags',
backup_dir='/opt/airflow/backups',
)
# Dry run first
deployer.deploy_dags(dry_run=True)
# Actual deployment
deployer.deploy_dags(dry_run=False)
Here,
- =Deployment success rate
- =Number of successful deployments
- =Total number of deployments
CI Pipeline Duration
Here,
- =Total CI pipeline duration
- =Linting and formatting time
- =Test execution time
- =Docker image build time
- =Image push time
Use DAG versioning with Git SHA tags to track which DAG version is deployed. This enables quick rollback and audit trails.
Implement automated rollback in your CD pipeline. If health checks fail after deployment, automatically revert to the previous version.
Key Concepts Table
| Stage | Tool | Purpose | Duration |
|---|---|---|---|
| Lint | flake8, black | Code quality | 1-2min |
| Unit Test | pytest | Component testing | 2-5min |
| Integration Test | pytest + Docker | System testing | 5-15min |
| Build | Docker | Image creation | 5-10min |
| Deploy | Helm/Kubectl | Production deploy | 2-5min |
| Verify | Health checks | Post-deploy validation | 1-3min |
Code Examples
GitOps with ArgoCD
# argocd-application.yaml
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: airflow-dags
namespace: argocd
spec:
project: default
source:
repoURL: https://github.com/org/airflow-dags.git
targetRevision: HEAD
path: dags
helm:
valueFiles:
- values.yaml
destination:
server: https://kubernetes.default.svc
namespace: airflow
syncPolicy:
automated:
prune: true
selfHeal: true
syncOptions:
- CreateNamespace=true
Automated Testing Pipeline
# tests/integration/test_dag_execution.py
import pytest
from airflow.models import DagBag, DagRun, TaskInstance
from airflow.utils.state import State
from airflow.utils import timezone
from datetime import datetime, timedelta
class TestDAGExecution:
"""Test DAG execution in integration environment."""
@pytest.fixture
def dagbag(self):
return DagBag(dag_folder='/opt/airflow/dags')
def test_dag_can_be_triggered(self, dagbag):
"""Verify DAGs can be manually triggered."""
for dag_id, dag in dagbag.dags.items():
dag_run = dag.create_dagrun(
run_type='manual',
execution_date=timezone.utcnow(),
state=State.RUNNING,
)
assert dag_run is not None
def test_task_dependencies_satisfied(self, dagbag):
"""Verify task dependencies are correct."""
for dag_id, dag in dagbag.dags.items():
for task in dag.tasks:
# All upstream tasks should exist
for upstream in task.upstream_list:
assert upstream in dag.tasks
def test_task_timeouts_configured(self, dagbag):
"""Verify all tasks have timeouts configured."""
for dag_id, dag in dagbag.dags.items():
for task in dag.tasks:
if hasattr(task, 'execution_timeout'):
assert task.execution_timeout is not None, \
f"Task {task.task_id} has no timeout"
Rollback Automation
# deploy/rollback.py
import os
import shutil
from datetime import datetime
from pathlib import Path
class DAGRollback:
"""Automated rollback for failed deployments."""
def __init__(self, target_dir, backup_dir):
self.target_dir = Path(target_dir)
self.backup_dir = Path(backup_dir)
def list_backups(self):
"""List available backups sorted by timestamp."""
backups = []
for item in self.backup_dir.iterdir():
if item.is_dir() and item.name.startswith('backup_'):
timestamp = item.name.replace('backup_', '')
backups.append((timestamp, item))
return sorted(backups, key=lambda x: x[0], reverse=True)
def rollback_to_backup(self, backup_name):
"""Rollback to a specific backup."""
backup_path = self.backup_dir / backup_name
if not backup_path.exists():
raise FileNotFoundError(f"Backup not found: {backup_name}")
# Clear current DAGs
if self.target_dir.exists():
shutil.rmtree(self.target_dir)
# Restore from backup
shutil.copytree(backup_path, self.target_dir)
print(f"Rolled back to: {backup_name}")
return True
def rollback_to_previous(self):
"""Rollback to the most recent backup."""
backups = self.list_backups()
if not backups:
print("No backups available")
return False
return self.rollback_to_backup(backups[0][1].name)
if __name__ == "__main__":
rollback = DAGRollback(
target_dir='/opt/airflow/dags',
backup_dir='/opt/airflow/backups',
)
# List available backups
backups = rollback.list_backups()
print("Available backups:")
for timestamp, path in backups:
print(f" {timestamp}: {path}")
# Rollback to previous
rollback.rollback_to_previous()
Performance Metrics
CI/CD Pipeline Performance
| Metric | Target | Warning | Critical |
|---|---|---|---|
| CI Duration | < 10min | 10-20min | > 20min |
| CD Duration | < 5min | 5-10min | > 10min |
| Deployment Success | > 99% | 95-99% | < 95% |
| Rollback Time | < 2min | 2-5min | > 5min |
| Test Coverage | > 80% | 70-80% | < 70% |
Deployment Frequency
| Environment | Frequency | Approval | Automation |
|---|---|---|---|
| Development | On push | Auto | 100% |
| Staging | On PR merge | Auto | 90% |
| Production | On release | Manual | 70% |
Key Takeaways:
- Implement comprehensive DAG validation in CI before deployment
- Use GitOps patterns (ArgoCD, Flux) for automated DAG deployment
- Maintain DAG versioning with Git SHA tags for rollback capability
- Automate rollback on health check failures
- Test all DAGs in staging before production deployment
- Monitor deployment success rates and pipeline duration
See Also
- Testing DAGs — Comprehensive testing strategies
- Performance Tuning — Optimizing CI/CD pipeline speed
- Kubernetes Executor — Containerized deployment
- Monitoring and Alerting — Post-deployment monitoring