CW

CI/CD Pipelines for Apache Airflow

Free Lesson

Advertisement

CI/CD Pipelines for Airflow

Architecture Diagram

Formal Definitions

DfCI/CD Pipeline

A CI/CD Pipeline is an automated workflow that builds, tests, and deploys code changes. For Airflow, it ensures DAGs are validated before deployment and can be rolled back if issues occur. The pipeline is P=(B,T,D,M)P = (B, T, D, M) where BB is build, TT is test, DD is deploy, and MM is monitor.

DfGitOps

GitOps is a deployment paradigm where Git is the single source of truth for declarative infrastructure and application configuration. Changes are applied automatically when Git state diverges from the live state. Formally, Deploy(G)Slive\text{Deploy}(G) \rightarrow S_{\text{live}} where GG is Git state.

DfDAG Versioning

DAG Versioning tracks changes to DAG files over time, enabling rollback and audit trails. Each deployment creates a version Vi=(Gi,Ti,Di,Mi)V_i = (G_i, T_i, D_i, M_i) where GiG_i is the Git commit, TiT_i is the test results, DiD_i is the deploy timestamp, and MiM_i is the monitoring status.

Detailed Explanation

CI Pipeline Configuration

# .github/workflows/airflow-ci.yml
name: Airflow CI

on:
  push:
    branches: [main, develop]
    paths:
      - 'dags/**'
      - 'plugins/**'
      - 'tests/**'
  pull_request:
    branches: [main]

jobs:
  lint:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.10'
      - name: Install dependencies
        run: |
          pip install flake8 black isort mypy pylint
      - name: Lint with flake8
        run: flake8 dags/ plugins/ tests/ --max-line-length=120
      - name: Check formatting with black
        run: black --check dags/ plugins/ tests/
      - name: Sort imports
        run: isort --check-only dags/ plugins/ tests/
      - name: Type check with mypy
        run: mypy dags/ --ignore-missing-imports

  test:
    runs-on: ubuntu-latest
    needs: lint
    services:
      postgres:
        image: postgres:15
        env:
          POSTGRES_USER: airflow
          POSTGRES_PASSWORD: airflow
          POSTGRES_DB: airflow
        ports:
          - 5432:5432
    steps:
      - uses: actions/checkout@v3
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.10'
      - name: Install Airflow
        run: |
          pip install apache-airflow==2.8.0
          pip install pytest pytest-cov pytest-xdist
          pip install -r requirements.txt
      - name: Run DAG validation
        run: |
          pytest tests/test_dag_validation.py -v --tb=short
      - name: Run unit tests
        run: |
          pytest tests/unit/ -v --cov=dags --cov-report=xml -n auto
      - name: Run integration tests
        run: |
          pytest tests/integration/ -v --tb=short
      - name: Upload coverage
        uses: codecov/codecov-action@v3
        with:
          file: ./coverage.xml

  build-image:
    runs-on: ubuntu-latest
    needs: test
    if: github.ref == 'refs/heads/main'
    steps:
      - uses: actions/checkout@v3
      - name: Build Docker image
        run: |
          docker build -t airflow-dags:${{ github.sha }} .
          docker tag airflow-dags:${{ github.sha }} airflow-dags:latest
      - name: Push to registry
        run: |
          echo ${{ secrets.REGISTRY_PASSWORD }} | docker login -u ${{ secrets.REGISTRY_USER }} --password-stdin
          docker push airflow-dags:${{ github.sha }}
          docker push airflow-dags:latest

DAG Testing Framework

# tests/test_dags.py
import pytest
from airflow.models import DagBag
from airflow.utils.state import State
import os

class TestDAGs:
    """Comprehensive DAG testing framework."""
    
    @pytest.fixture(autouse=True)
    def setup(self):
        self.dagbag = DagBag(
            dag_folder='/opt/airflow/dags',
            include_examples=False,
        )
    
    def test_dags_import_successfully(self):
        """All DAGs should import without errors."""
        assert len(self.dagbag.import_errors) == 0, \
            f"DAG import errors: {self.dagbag.import_errors}"
    
    def test_dag_count(self):
        """Verify expected DAGs are loaded."""
        expected_dags = {'etl_dag', 'reporting_dag', 'maintenance_dag'}
        loaded_dags = set(self.dagbag.dags.keys())
        
        missing = expected_dags - loaded_dags
        assert not missing, f"Missing DAGs: {missing}"
    
    def test_dag_has_tags(self):
        """All DAGs should have tags."""
        for dag_id, dag in self.dagbag.dags.items():
            assert len(dag.tags) > 0, f"DAG {dag_id} has no tags"
    
    def test_dag_default_args(self):
        """Verify default_args configuration."""
        for dag_id, dag in self.dagbag.dags.items():
            if dag.default_args:
                assert 'retries' in dag.default_args, \
                    f"DAG {dag_id} missing retries"
                assert 'retry_delay' in dag.default_args, \
                    f"DAG {dag_id} missing retry_delay"
    
    def test_task_count_reasonable(self):
        """DAGs should not have excessive task counts."""
        for dag_id, dag in self.dagbag.dags.items():
            assert len(dag.tasks) <= 100, \
                f"DAG {dag_id} has {len(dag.tasks)} tasks (max 100)"
    
    def test_no_circular_dependencies(self):
        """All DAGs must be acyclic."""
        import networkx as nx
        
        for dag_id, dag in self.dagbag.dags.items():
            G = nx.DiGraph()
            for task in dag.tasks:
                G.add_node(task.task_id)
                for upstream in task.upstream_list:
                    G.add_edge(upstream.task_id, task.task_id)
            
            assert nx.is_directed_acyclic_graph(G), \
                f"DAG {dag_id} has circular dependencies"

Deployment Automation

# deploy/dag_deployer.py
import os
import shutil
import hashlib
from datetime import datetime
from pathlib import Path

class DAGDeployer:
    """Automate DAG deployment with versioning."""
    
    def __init__(self, source_dir, target_dir, backup_dir):
        self.source_dir = Path(source_dir)
        self.target_dir = Path(target_dir)
        self.backup_dir = Path(backup_dir)
    
    def calculate_checksum(self, file_path):
        """Calculate MD5 checksum of file."""
        hasher = hashlib.md5()
        with open(file_path, 'rb') as f:
            for chunk in iter(lambda: f.read(4096), b''):
                hasher.update(chunk)
        return hasher.hexdigest()
    
    def backup_existing_dags(self):
        """Backup existing DAGs before deployment."""
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_path = self.backup_dir / f'backup_{timestamp}'
        
        if self.target_dir.exists():
            shutil.copytree(self.target_dir, backup_path)
            print(f"Backup created: {backup_path}")
        
        return backup_path
    
    def deploy_dags(self, dry_run=False):
        """Deploy DAGs from source to target."""
        # Create backup
        if not dry_run:
            self.backup_existing_dags()
        
        # Calculate checksums
        source_checksums = {}
        for file_path in self.source_dir.rglob('*.py'):
            relative_path = file_path.relative_to(self.source_dir)
            source_checksums[relative_path] = self.calculate_checksum(file_path)
        
        target_checksums = {}
        if self.target_dir.exists():
            for file_path in self.target_dir.rglob('*.py'):
                relative_path = file_path.relative_to(self.target_dir)
                target_checksums[relative_path] = self.calculate_checksum(file_path)
        
        # Find changes
        new_files = set(source_checksums.keys()) - set(target_checksums.keys())
        modified_files = {
            f for f in source_checksums.keys() 
            if f in target_checksums and source_checksums[f] != target_checksums[f]
        }
        deleted_files = set(target_checksums.keys()) - set(source_checksums.keys())
        
        print(f"New files: {len(new_files)}")
        print(f"Modified files: {len(modified_files)}")
        print(f"Deleted files: {len(deleted_files)}")
        
        if dry_run:
            print("Dry run - no changes applied")
            return
        
        # Deploy changes
        for file_path in new_files | modified_files:
            source = self.source_dir / file_path
            target = self.target_dir / file_path
            target.parent.mkdir(parents=True, exist_ok=True)
            shutil.copy2(source, target)
            print(f"Deployed: {file_path}")
        
        for file_path in deleted_files:
            target = self.target_dir / file_path
            if target.exists():
                target.unlink()
                print(f"Deleted: {file_path}")
        
        return {
            'new': len(new_files),
            'modified': len(modified_files),
            'deleted': len(deleted_files),
        }

if __name__ == "__main__":
    deployer = DAGDeployer(
        source_dir='/repo/dags',
        target_dir='/opt/airflow/dags',
        backup_dir='/opt/airflow/backups',
    )
    
    # Dry run first
    deployer.deploy_dags(dry_run=True)
    
    # Actual deployment
    deployer.deploy_dags(dry_run=False)
Deployment Success Rate
Rdeploy=NsuccessfulNtotal×100%R_{\text{deploy}} = \frac{N_{\text{successful}}}{N_{\text{total}}} \times 100\%

Here,

  • RextdeployR_{ ext{deploy}}=Deployment success rate
  • NextsuccessfulN_{ ext{successful}}=Number of successful deployments
  • NexttotalN_{ ext{total}}=Total number of deployments

CI Pipeline Duration

TCI=Tlint+Ttest+Tbuild+TpushT_{\text{CI}} = T_{\text{lint}} + T_{\text{test}} + T_{\text{build}} + T_{\text{push}}

Here,

  • TextCIT_{ ext{CI}}=Total CI pipeline duration
  • TextlintT_{ ext{lint}}=Linting and formatting time
  • TexttestT_{ ext{test}}=Test execution time
  • TextbuildT_{ ext{build}}=Docker image build time
  • TextpushT_{ ext{push}}=Image push time

Use DAG versioning with Git SHA tags to track which DAG version is deployed. This enables quick rollback and audit trails.

Implement automated rollback in your CD pipeline. If health checks fail after deployment, automatically revert to the previous version.

Key Concepts Table

StageToolPurposeDuration
Lintflake8, blackCode quality1-2min
Unit TestpytestComponent testing2-5min
Integration Testpytest + DockerSystem testing5-15min
BuildDockerImage creation5-10min
DeployHelm/KubectlProduction deploy2-5min
VerifyHealth checksPost-deploy validation1-3min

Code Examples

GitOps with ArgoCD

# argocd-application.yaml
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
  name: airflow-dags
  namespace: argocd
spec:
  project: default
  source:
    repoURL: https://github.com/org/airflow-dags.git
    targetRevision: HEAD
    path: dags
    helm:
      valueFiles:
        - values.yaml
  destination:
    server: https://kubernetes.default.svc
    namespace: airflow
  syncPolicy:
    automated:
      prune: true
      selfHeal: true
    syncOptions:
      - CreateNamespace=true

Automated Testing Pipeline

# tests/integration/test_dag_execution.py
import pytest
from airflow.models import DagBag, DagRun, TaskInstance
from airflow.utils.state import State
from airflow.utils import timezone
from datetime import datetime, timedelta

class TestDAGExecution:
    """Test DAG execution in integration environment."""
    
    @pytest.fixture
    def dagbag(self):
        return DagBag(dag_folder='/opt/airflow/dags')
    
    def test_dag_can_be_triggered(self, dagbag):
        """Verify DAGs can be manually triggered."""
        for dag_id, dag in dagbag.dags.items():
            dag_run = dag.create_dagrun(
                run_type='manual',
                execution_date=timezone.utcnow(),
                state=State.RUNNING,
            )
            assert dag_run is not None
    
    def test_task_dependencies_satisfied(self, dagbag):
        """Verify task dependencies are correct."""
        for dag_id, dag in dagbag.dags.items():
            for task in dag.tasks:
                # All upstream tasks should exist
                for upstream in task.upstream_list:
                    assert upstream in dag.tasks
    
    def test_task_timeouts_configured(self, dagbag):
        """Verify all tasks have timeouts configured."""
        for dag_id, dag in dagbag.dags.items():
            for task in dag.tasks:
                if hasattr(task, 'execution_timeout'):
                    assert task.execution_timeout is not None, \
                        f"Task {task.task_id} has no timeout"

Rollback Automation

# deploy/rollback.py
import os
import shutil
from datetime import datetime
from pathlib import Path

class DAGRollback:
    """Automated rollback for failed deployments."""
    
    def __init__(self, target_dir, backup_dir):
        self.target_dir = Path(target_dir)
        self.backup_dir = Path(backup_dir)
    
    def list_backups(self):
        """List available backups sorted by timestamp."""
        backups = []
        for item in self.backup_dir.iterdir():
            if item.is_dir() and item.name.startswith('backup_'):
                timestamp = item.name.replace('backup_', '')
                backups.append((timestamp, item))
        
        return sorted(backups, key=lambda x: x[0], reverse=True)
    
    def rollback_to_backup(self, backup_name):
        """Rollback to a specific backup."""
        backup_path = self.backup_dir / backup_name
        
        if not backup_path.exists():
            raise FileNotFoundError(f"Backup not found: {backup_name}")
        
        # Clear current DAGs
        if self.target_dir.exists():
            shutil.rmtree(self.target_dir)
        
        # Restore from backup
        shutil.copytree(backup_path, self.target_dir)
        
        print(f"Rolled back to: {backup_name}")
        return True
    
    def rollback_to_previous(self):
        """Rollback to the most recent backup."""
        backups = self.list_backups()
        
        if not backups:
            print("No backups available")
            return False
        
        return self.rollback_to_backup(backups[0][1].name)

if __name__ == "__main__":
    rollback = DAGRollback(
        target_dir='/opt/airflow/dags',
        backup_dir='/opt/airflow/backups',
    )
    
    # List available backups
    backups = rollback.list_backups()
    print("Available backups:")
    for timestamp, path in backups:
        print(f"  {timestamp}: {path}")
    
    # Rollback to previous
    rollback.rollback_to_previous()

Performance Metrics

CI/CD Pipeline Performance

MetricTargetWarningCritical
CI Duration< 10min10-20min> 20min
CD Duration< 5min5-10min> 10min
Deployment Success> 99%95-99%< 95%
Rollback Time< 2min2-5min> 5min
Test Coverage> 80%70-80%< 70%

Deployment Frequency

EnvironmentFrequencyApprovalAutomation
DevelopmentOn pushAuto100%
StagingOn PR mergeAuto90%
ProductionOn releaseManual70%

Key Takeaways:

  • Implement comprehensive DAG validation in CI before deployment
  • Use GitOps patterns (ArgoCD, Flux) for automated DAG deployment
  • Maintain DAG versioning with Git SHA tags for rollback capability
  • Automate rollback on health check failures
  • Test all DAGs in staging before production deployment
  • Monitor deployment success rates and pipeline duration

See Also

Advertisement

Need Expert Airflow Help?

Get personalized DAG design, scheduling optimization, or production Airflow consulting.

Advertisement