CW

Testing DAGs and Operators in Apache Airflow

Free Lesson

Advertisement

Testing DAGs and Operators

Architecture Diagram

Formal Definitions

DfDAG Validation

DAG Validation is the process of verifying that a DAG file can be imported and parsed without errors, and that the resulting DAG object meets structural requirements (acyclicity, valid dependencies, correct task types). Formally, validation checks Vdag=VimportVstructureVsemanticV_{\text{dag}} = V_{\text{import}} \cap V_{\text{structure}} \cap V_{\text{semantic}}.

DfUnit Test

A unit test verifies a single component (function, operator, hook) in isolation with mocked dependencies. The test boundary BunitB_{\text{unit}} isolates the component from external systems (databases, APIs, networks).

DfIntegration Test

An integration test verifies interactions between multiple components, often including real external systems. Integration tests validate I={(c1,c2):c1C,c2C,(c1,c2)Einteraction}I = \{(c_1, c_2) : c_1 \in C, c_2 \in C, (c_1, c_2) \in E_{\text{interaction}}\} where CC are components and EinteractionE_{\text{interaction}} are interaction edges.

Detailed Explanation

DAG Validation Tests

# tests/test_dag_validation.py
import pytest
from airflow.models import DagBag

class TestDAGValidation:
    """Validate DAG structure and imports."""
    
    @pytest.fixture(autouse=True)
    def setup_dagbag(self):
        self.dagbag = DagBag(dag_folder='/opt/airflow/dags', include_examples=False)
    
    def test_dags_import_successfully(self):
        """All DAGs should import without errors."""
        assert len(self.dagbag.import_errors) == 0, \
            f"DAG import errors: {self.dagbag.import_errors}"
    
    def test_dag_count(self):
        """Verify expected number of DAGs are loaded."""
        assert len(self.dagbag.dags) > 0, "No DAGs found"
    
    def test_dag_has_required_attributes(self):
        """Each DAG should have required attributes."""
        for dag_id, dag in self.dagbag.dags.items():
            assert dag.dag_id is not None, f"DAG {dag_id} has no dag_id"
            assert dag.default_args is not None, f"DAG {dag_id} has no default_args"
            assert dag.start_date is not None, f"DAG {dag_id} has no start_date"
    
    def test_dag_is_acyclic(self):
        """All DAGs must be acyclic."""
        for dag_id, dag in self.dagbag.dags.items():
            assert dag.is_active is not None  # Basic structure check
    
    def test_dag_has_no_circular_dependencies(self):
        """Detect circular dependencies in DAGs."""
        for dag_id, dag in self.dagbag.dags.items():
            # Use networkx to detect cycles
            import networkx as nx
            
            G = nx.DiGraph()
            for task in dag.tasks:
                G.add_node(task.task_id)
                for upstream in task.upstream_list:
                    G.add_edge(upstream.task_id, task.task_id)
            
            assert nx.is_directed_acyclic_graph(G), \
                f"DAG {dag_id} has circular dependencies"
    
    def test_tasks_have_valid_operator_types(self):
        """All tasks should use supported operator types."""
        from airflow.operators.python import PythonOperator
        from airflow.operators.bash import BashOperator
        from airflow.operators.empty import EmptyOperator
        
        supported_operators = {
            PythonOperator, BashOperator, EmptyOperator,
        }
        
        for dag_id, dag in self.dagbag.dags.items():
            for task in dag.tasks:
                assert type(task) in supported_operators, \
                    f"Unsupported operator type: {type(task)} in DAG {dag_id}"

Unit Testing Tasks

# tests/test_tasks.py
import pytest
from unittest.mock import MagicMock, patch, Mock
from datetime import datetime, timedelta

class TestExtractTask:
    """Unit tests for extract task function."""
    
    def test_extract_data_success(self):
        """Test successful data extraction."""
        from dags.my_dag import extract_data
        
        mock_context = MagicMock()
        mock_context['ti'].xcom_push = MagicMock()
        
        result = extract_data(**mock_context)
        
        assert result is not None
        assert 'records' in result
        assert len(result['records']) > 0
    
    def test_extract_data_empty(self):
        """Test extraction with no data."""
        from dags.my_dag import extract_data
        
        mock_context = MagicMock()
        mock_context['ti'].xcom_push = MagicMock()
        
        with patch('dags.my_dag.get_source_data', return_value=[]):
            result = extract_data(**mock_context)
        
        assert result == {'records': [], 'count': 0}
    
    def test_extract_data_connection_error(self):
        """Test extraction handles connection errors."""
        from dags.my_dag import extract_data
        
        mock_context = MagicMock()
        
        with patch('dags.my_dag.get_source_data', side_effect=ConnectionError):
            with pytest.raises(ConnectionError):
                extract_data(**mock_context)

class TestTransformTask:
    """Unit tests for transform task function."""
    
    def test_transform_data(self):
        """Test data transformation logic."""
        from dags.my_dag import transform_data
        
        input_data = [
            {"name": "alice", "score": 85},
            {"name": "bob", "score": 92},
        ]
        
        result = transform_data(input_data)
        
        assert len(result) == 2
        assert result[0]["name"] == "ALICE"  # Uppercase
        assert result[0]["grade"] == "B"     # Score-based grade
    
    def test_transform_empty_data(self):
        """Test transformation with empty input."""
        from dags.my_dag import transform_data
        
        result = transform_data([])
        assert result == []
    
    def test_transform_invalid_data(self):
        """Test transformation handles invalid data gracefully."""
        from dags.my_dag import transform_data
        
        input_data = [{"name": None, "score": "invalid"}]
        
        with pytest.raises(ValueError):
            transform_data(input_data)

class TestLoadTask:
    """Unit tests for load task function."""
    
    @patch('dags.my_dag.get_db_connection')
    def test_load_success(self, mock_db):
        """Test successful data loading."""
        from dags.my_dag import load_data
        
        mock_conn = MagicMock()
        mock_db.return_value = mock_conn
        
        data = [{"id": 1, "name": "test"}]
        result = load_data(data)
        
        assert result['loaded'] == 1
        mock_conn.execute.assert_called_once()
    
    @patch('dags.my_dag.get_db_connection')
    def test_load_partial_failure(self, mock_db):
        """Test loading with partial failures."""
        from dags.my_dag import load_data
        
        mock_conn = MagicMock()
        mock_conn.execute.side_effect = [
            None,  # First insert succeeds
            Exception("Duplicate key"),  # Second fails
            None,  # Third succeeds
        ]
        mock_db.return_value = mock_conn
        
        data = [{"id": 1}, {"id": 2}, {"id": 3}]
        result = load_data(data)
        
        assert result['loaded'] == 2
        assert result['failed'] == 1

Mocking External Systems

# tests/test_with_mocks.py
import pytest
from unittest.mock import MagicMock, patch, AsyncMock
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.postgres.hooks.postgres import PostgresHook

class TestWithMockedExternalSystems:
    """Test tasks with mocked external dependencies."""
    
    @patch('airflow.providers.amazon.aws.hooks.s3.S3Hook')
    def test_s3_upload(self, MockS3Hook):
        """Test S3 upload with mocked hook."""
        from dags.my_dag import upload_to_s3
        
        mock_hook = MagicMock()
        MockS3Hook.return_value = mock_hook
        mock_hook.load_file.return_value = True
        
        result = upload_to_s3(
            local_path='/tmp/file.csv',
            bucket='my-bucket',
            key='data/file.csv',
        )
        
        assert result is True
        mock_hook.load_file.assert_called_once_with(
            filename='/tmp/file.csv',
            key='data/file.csv',
            bucket_name='my-bucket',
            replace=True,
        )
    
    @patch('airflow.providers.postgres.hooks.postgres.PostgresHook')
    def test_database_operations(self, MockPostgresHook):
        """Test database operations with mocked hook."""
        from dags.my_dag import process_database
        
        mock_hook = MagicMock()
        MockPostgresHook.return_value = mock_hook
        mock_hook.get_pandas_df.return_value = MagicMock()
        
        result = process_database(query="SELECT * FROM users")
        
        assert result is not None
        mock_hook.get_pandas_df.assert_called_once()
    
    @patch('requests.get')
    def test_api_call(self, mock_get):
        """Test API call with mocked requests."""
        from dags.my_dag import fetch_from_api
        
        mock_response = MagicMock()
        mock_response.status_code = 200
        mock_response.json.return_value = {"data": [1, 2, 3]}
        mock_get.return_value = mock_response
        
        result = fetch_from_api(url="http://api.example.com/data")
        
        assert result == {"data": [1, 2, 3]}
        mock_get.assert_called_once()
Test Coverage Metric
C=TcoveredTtotal×100%C = \frac{|T_{\text{covered}}|}{|T_{\text{total}}|} \times 100\%

Here,

  • CC=Code coverage percentage
  • TextcoveredT_{ ext{covered}}=Number of tested code paths
  • TexttotalT_{ ext{total}}=Total number of code paths

Test Reliability Index

R=1NflakyNtotalR = 1 - \frac{N_{\text{flaky}}}{N_{\text{total}}}

Here,

  • RR=Reliability index (0-1)
  • NextflakyN_{ ext{flaky}}=Number of flaky tests
  • NexttotalN_{ ext{total}}=Total number of tests

Use DagBag to parse DAGs in tests without starting Airflow services. This enables fast unit testing of DAG structure without database or scheduler dependencies.

Mock external systems (databases, APIs, cloud services) in unit tests. Use integration tests with real services for critical paths. Aim for 80% unit test coverage.

Key Concepts Table

Test TypeScopeSpeedDependenciesCoverage
Unit TestSingle functionFast (~ms)Mocked80%
Integration TestMultiple componentsMedium (~s)Real services15%
E2E TestFull pipelineSlow (~min)Full stack5%
DAG ValidationStructureFast (~ms)NoneAll DAGs
Performance TestExecution timeSlow (~min)Load envCritical paths

Code Examples

Comprehensive Test Suite

# tests/test_comprehensive.py
import pytest
from airflow.models import DagBag, DagRun, TaskInstance
from airflow.utils.state import State
from airflow.utils import timezone
from datetime import datetime, timedelta
from unittest.mock import MagicMock, patch

class TestDAGComprehensive:
    """Comprehensive DAG testing suite."""
    
    @pytest.fixture
    def dagbag(self):
        return DagBag(dag_folder='/opt/airflow/dags', include_examples=False)
    
    def test_all_dags_have_tags(self, dagbag):
        """All DAGs should have tags for organization."""
        for dag_id, dag in dagbag.dags.items():
            assert len(dag.tags) > 0, f"DAG {dag_id} has no tags"
    
    def test_all_dags_have_description(self, dagbag):
        """All DAGs should have descriptions."""
        for dag_id, dag in dagbag.dags.items():
            assert dag.description is not None, f"DAG {dag_id} has no description"
    
    def test_dag_default_args(self, dagbag):
        """Verify default_args configuration."""
        for dag_id, dag in dagbag.dags.items():
            if dag.default_args:
                # Check retries are configured
                assert 'retries' in dag.default_args, \
                    f"DAG {dag_id} has no retries configured"
                
                # Check retry delay
                assert 'retry_delay' in dag.default_args, \
                    f"DAG {dag_id} has no retry_delay configured"
    
    def test_dag_schedule_interval(self, dagbag):
        """Verify schedule intervals are reasonable."""
        for dag_id, dag in dagbag.dags.items():
            # Ensure schedule is not too frequent
            if dag.schedule_interval:
                assert dag.schedule_interval != '@impossible', \
                    f"DAG {dag_id} has impossible schedule"

class TestTaskDependencies:
    """Test task dependency configurations."""
    
    @pytest.fixture
    def dagbag(self):
        return DagBag(dag_folder='/opt/airflow/dags', include_examples=False)
    
    def test_no孤立_tasks(self, dagbag):
        """All tasks should have upstream or downstream dependencies."""
        for dag_id, dag in dagbag.dags.items():
            for task in dag.tasks:
                # EmptyOperator and BranchPythonOperator can be isolated
                from airflow.operators.empty import EmptyOperator
                from airflow.operators.python import BranchPythonOperator
                
                if type(task) not in [EmptyOperator, BranchPythonOperator]:
                    assert task.upstream_list or task.downstream_list, \
                        f"Task {task.task_id} in DAG {dag_id} is isolated"
    
    def test_task_count_reasonable(self, dagbag):
        """DAGs should not have excessive task counts."""
        for dag_id, dag in dagbag.dags.items():
            assert len(dag.tasks) <= 100, \
                f"DAG {dag_id} has {len(dag.tasks)} tasks (max 100)"

class TestTaskLogic:
    """Test task business logic with mocks."""
    
    def test_etl_extract(self):
        """Test ETL extract function."""
        from dags.etl_dag import extract
        
        mock_context = MagicMock()
        mock_context['ti'] = MagicMock()
        mock_context['params'] = {'source': 'test'}
        
        with patch('dags.etl_dag.get_source_connection') as mock_conn:
            mock_conn.return_value.query.return_value = [
                {'id': 1, 'value': 'test'}
            ]
            
            result = extract(**mock_context)
            
            assert result['record_count'] == 1
            assert result['status'] == 'success'
    
    def test_etl_transform(self):
        """Test ETL transform function."""
        from dags.etl_dag import transform
        
        input_data = [
            {'id': 1, 'name': 'test', 'value': 100},
            {'id': 2, 'name': 'test2', 'value': 200},
        ]
        
        result = transform(input_data)
        
        assert len(result) == 2
        assert all('transformed' in r for r in result)
    
    def test_etl_load(self):
        """Test ETL load function."""
        from dags.etl_dag import load
        
        data = [{'id': 1, 'transformed': True}]
        
        with patch('dags.etl_dag.get_db_connection') as mock_db:
            mock_db.return_value.execute.return_value = 1
            
            result = load(data)
            
            assert result['loaded'] == 1

class TestPerformance:
    """Test DAG execution performance."""
    
    def test_dag_parsing_time(self, dagbag):
        """DAGs should parse quickly."""
        import time
        
        start = time.time()
        DagBag(dag_folder='/opt/airflow/dags', include_examples=False)
        parse_time = time.time() - start
        
        assert parse_time < 30, f"DAG parsing took {parse_time:.2f}s (max 30s)"
    
    def test_task_execution_time(self):
        """Tasks should execute within time limits."""
        from airflow.models import DagBag
        
        dagbag = DagBag(dag_folder='/opt/airflow/dags')
        
        for dag_id, dag in dagbag.dags.items():
            for task in dag.tasks:
                if hasattr(task, 'execution_timeout'):
                    if task.execution_timeout:
                        assert task.execution_timeout.total_seconds() <= 3600, \
                            f"Task {task.task_id} has excessive timeout"

CI/CD Test Configuration

# .github/workflows/test.yml
name: Airflow Tests

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

jobs:
  lint:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.10'
      - name: Install dependencies
        run: |
          pip install flake8 black isort mypy
      - name: Lint with flake8
        run: flake8 dags/ tests/
      - name: Check formatting with black
        run: black --check dags/ tests/
      - name: Sort imports
        run: isort --check-only dags/ tests/

  test:
    runs-on: ubuntu-latest
    needs: lint
    services:
      postgres:
        image: postgres:15
        env:
          POSTGRES_USER: airflow
          POSTGRES_PASSWORD: airflow
          POSTGRES_DB: airflow
        ports:
          - 5432:5432
    steps:
      - uses: actions/checkout@v3
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.10'
      - name: Install Airflow
        run: |
          pip install apache-airflow==2.8.0
          pip install pytest pytest-cov
      - name: Run DAG validation
        run: pytest tests/test_dag_validation.py -v
      - name: Run unit tests
        run: pytest tests/test_tasks.py -v --cov=dags --cov-report=xml
      - name: Upload coverage
        uses: codecov/codecov-action@v3

  integration:
    runs-on: ubuntu-latest
    needs: test
    steps:
      - uses: actions/checkout@v3
      - name: Run integration tests
        run: |
          docker-compose -f docker-compose-test.yml up -d
          pytest tests/integration/ -v
          docker-compose -f docker-compose-test.yml down

Performance Metrics

Test Suite Performance

MetricTargetWarningCritical
Unit Test Duration< 5min5-10min> 10min
Integration Test Duration< 15min15-30min> 30min
Total CI Duration< 30min30-60min> 60min
Test Coverage> 80%70-80%< 70%
Flaky Test Rate< 1%1-5%> 5%

Test Distribution

Test CategoryCountDurationPriority
DAG Validation10-2010sP0
Unit Tests50-1002-5minP0
Integration Tests10-2010-15minP1
E2E Tests5-1020-30minP2

Key Takeaways:

  • Use DagBag for fast DAG validation without running Airflow services
  • Mock external systems (databases, APIs) in unit tests
  • Aim for 80% unit test coverage, 15% integration, 5% E2E
  • Validate DAG structure: acyclicity, required attributes, valid operators
  • Test task business logic independently of Airflow execution
  • Use CI/CD pipelines for automated testing on every commit

See Also

Advertisement

Need Expert Airflow Help?

Get personalized DAG design, scheduling optimization, or production Airflow consulting.

Advertisement