Testing DAGs and Operators
Architecture Diagram
Formal Definitions
DfDAG Validation
DAG Validation is the process of verifying that a DAG file can be imported and parsed without errors, and that the resulting DAG object meets structural requirements (acyclicity, valid dependencies, correct task types). Formally, validation checks .
DfUnit Test
A unit test verifies a single component (function, operator, hook) in isolation with mocked dependencies. The test boundary isolates the component from external systems (databases, APIs, networks).
DfIntegration Test
An integration test verifies interactions between multiple components, often including real external systems. Integration tests validate where are components and are interaction edges.
Detailed Explanation
DAG Validation Tests
# tests/test_dag_validation.py
import pytest
from airflow.models import DagBag
class TestDAGValidation:
"""Validate DAG structure and imports."""
@pytest.fixture(autouse=True)
def setup_dagbag(self):
self.dagbag = DagBag(dag_folder='/opt/airflow/dags', include_examples=False)
def test_dags_import_successfully(self):
"""All DAGs should import without errors."""
assert len(self.dagbag.import_errors) == 0, \
f"DAG import errors: {self.dagbag.import_errors}"
def test_dag_count(self):
"""Verify expected number of DAGs are loaded."""
assert len(self.dagbag.dags) > 0, "No DAGs found"
def test_dag_has_required_attributes(self):
"""Each DAG should have required attributes."""
for dag_id, dag in self.dagbag.dags.items():
assert dag.dag_id is not None, f"DAG {dag_id} has no dag_id"
assert dag.default_args is not None, f"DAG {dag_id} has no default_args"
assert dag.start_date is not None, f"DAG {dag_id} has no start_date"
def test_dag_is_acyclic(self):
"""All DAGs must be acyclic."""
for dag_id, dag in self.dagbag.dags.items():
assert dag.is_active is not None # Basic structure check
def test_dag_has_no_circular_dependencies(self):
"""Detect circular dependencies in DAGs."""
for dag_id, dag in self.dagbag.dags.items():
# Use networkx to detect cycles
import networkx as nx
G = nx.DiGraph()
for task in dag.tasks:
G.add_node(task.task_id)
for upstream in task.upstream_list:
G.add_edge(upstream.task_id, task.task_id)
assert nx.is_directed_acyclic_graph(G), \
f"DAG {dag_id} has circular dependencies"
def test_tasks_have_valid_operator_types(self):
"""All tasks should use supported operator types."""
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
supported_operators = {
PythonOperator, BashOperator, EmptyOperator,
}
for dag_id, dag in self.dagbag.dags.items():
for task in dag.tasks:
assert type(task) in supported_operators, \
f"Unsupported operator type: {type(task)} in DAG {dag_id}"
Unit Testing Tasks
# tests/test_tasks.py
import pytest
from unittest.mock import MagicMock, patch, Mock
from datetime import datetime, timedelta
class TestExtractTask:
"""Unit tests for extract task function."""
def test_extract_data_success(self):
"""Test successful data extraction."""
from dags.my_dag import extract_data
mock_context = MagicMock()
mock_context['ti'].xcom_push = MagicMock()
result = extract_data(**mock_context)
assert result is not None
assert 'records' in result
assert len(result['records']) > 0
def test_extract_data_empty(self):
"""Test extraction with no data."""
from dags.my_dag import extract_data
mock_context = MagicMock()
mock_context['ti'].xcom_push = MagicMock()
with patch('dags.my_dag.get_source_data', return_value=[]):
result = extract_data(**mock_context)
assert result == {'records': [], 'count': 0}
def test_extract_data_connection_error(self):
"""Test extraction handles connection errors."""
from dags.my_dag import extract_data
mock_context = MagicMock()
with patch('dags.my_dag.get_source_data', side_effect=ConnectionError):
with pytest.raises(ConnectionError):
extract_data(**mock_context)
class TestTransformTask:
"""Unit tests for transform task function."""
def test_transform_data(self):
"""Test data transformation logic."""
from dags.my_dag import transform_data
input_data = [
{"name": "alice", "score": 85},
{"name": "bob", "score": 92},
]
result = transform_data(input_data)
assert len(result) == 2
assert result[0]["name"] == "ALICE" # Uppercase
assert result[0]["grade"] == "B" # Score-based grade
def test_transform_empty_data(self):
"""Test transformation with empty input."""
from dags.my_dag import transform_data
result = transform_data([])
assert result == []
def test_transform_invalid_data(self):
"""Test transformation handles invalid data gracefully."""
from dags.my_dag import transform_data
input_data = [{"name": None, "score": "invalid"}]
with pytest.raises(ValueError):
transform_data(input_data)
class TestLoadTask:
"""Unit tests for load task function."""
@patch('dags.my_dag.get_db_connection')
def test_load_success(self, mock_db):
"""Test successful data loading."""
from dags.my_dag import load_data
mock_conn = MagicMock()
mock_db.return_value = mock_conn
data = [{"id": 1, "name": "test"}]
result = load_data(data)
assert result['loaded'] == 1
mock_conn.execute.assert_called_once()
@patch('dags.my_dag.get_db_connection')
def test_load_partial_failure(self, mock_db):
"""Test loading with partial failures."""
from dags.my_dag import load_data
mock_conn = MagicMock()
mock_conn.execute.side_effect = [
None, # First insert succeeds
Exception("Duplicate key"), # Second fails
None, # Third succeeds
]
mock_db.return_value = mock_conn
data = [{"id": 1}, {"id": 2}, {"id": 3}]
result = load_data(data)
assert result['loaded'] == 2
assert result['failed'] == 1
Mocking External Systems
# tests/test_with_mocks.py
import pytest
from unittest.mock import MagicMock, patch, AsyncMock
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.postgres.hooks.postgres import PostgresHook
class TestWithMockedExternalSystems:
"""Test tasks with mocked external dependencies."""
@patch('airflow.providers.amazon.aws.hooks.s3.S3Hook')
def test_s3_upload(self, MockS3Hook):
"""Test S3 upload with mocked hook."""
from dags.my_dag import upload_to_s3
mock_hook = MagicMock()
MockS3Hook.return_value = mock_hook
mock_hook.load_file.return_value = True
result = upload_to_s3(
local_path='/tmp/file.csv',
bucket='my-bucket',
key='data/file.csv',
)
assert result is True
mock_hook.load_file.assert_called_once_with(
filename='/tmp/file.csv',
key='data/file.csv',
bucket_name='my-bucket',
replace=True,
)
@patch('airflow.providers.postgres.hooks.postgres.PostgresHook')
def test_database_operations(self, MockPostgresHook):
"""Test database operations with mocked hook."""
from dags.my_dag import process_database
mock_hook = MagicMock()
MockPostgresHook.return_value = mock_hook
mock_hook.get_pandas_df.return_value = MagicMock()
result = process_database(query="SELECT * FROM users")
assert result is not None
mock_hook.get_pandas_df.assert_called_once()
@patch('requests.get')
def test_api_call(self, mock_get):
"""Test API call with mocked requests."""
from dags.my_dag import fetch_from_api
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"data": [1, 2, 3]}
mock_get.return_value = mock_response
result = fetch_from_api(url="http://api.example.com/data")
assert result == {"data": [1, 2, 3]}
mock_get.assert_called_once()
Here,
- =Code coverage percentage
- =Number of tested code paths
- =Total number of code paths
Test Reliability Index
Here,
- =Reliability index (0-1)
- =Number of flaky tests
- =Total number of tests
Use DagBag to parse DAGs in tests without starting Airflow services. This enables fast unit testing of DAG structure without database or scheduler dependencies.
Mock external systems (databases, APIs, cloud services) in unit tests. Use integration tests with real services for critical paths. Aim for 80% unit test coverage.
Key Concepts Table
| Test Type | Scope | Speed | Dependencies | Coverage |
|---|---|---|---|---|
| Unit Test | Single function | Fast (~ms) | Mocked | 80% |
| Integration Test | Multiple components | Medium (~s) | Real services | 15% |
| E2E Test | Full pipeline | Slow (~min) | Full stack | 5% |
| DAG Validation | Structure | Fast (~ms) | None | All DAGs |
| Performance Test | Execution time | Slow (~min) | Load env | Critical paths |
Code Examples
Comprehensive Test Suite
# tests/test_comprehensive.py
import pytest
from airflow.models import DagBag, DagRun, TaskInstance
from airflow.utils.state import State
from airflow.utils import timezone
from datetime import datetime, timedelta
from unittest.mock import MagicMock, patch
class TestDAGComprehensive:
"""Comprehensive DAG testing suite."""
@pytest.fixture
def dagbag(self):
return DagBag(dag_folder='/opt/airflow/dags', include_examples=False)
def test_all_dags_have_tags(self, dagbag):
"""All DAGs should have tags for organization."""
for dag_id, dag in dagbag.dags.items():
assert len(dag.tags) > 0, f"DAG {dag_id} has no tags"
def test_all_dags_have_description(self, dagbag):
"""All DAGs should have descriptions."""
for dag_id, dag in dagbag.dags.items():
assert dag.description is not None, f"DAG {dag_id} has no description"
def test_dag_default_args(self, dagbag):
"""Verify default_args configuration."""
for dag_id, dag in dagbag.dags.items():
if dag.default_args:
# Check retries are configured
assert 'retries' in dag.default_args, \
f"DAG {dag_id} has no retries configured"
# Check retry delay
assert 'retry_delay' in dag.default_args, \
f"DAG {dag_id} has no retry_delay configured"
def test_dag_schedule_interval(self, dagbag):
"""Verify schedule intervals are reasonable."""
for dag_id, dag in dagbag.dags.items():
# Ensure schedule is not too frequent
if dag.schedule_interval:
assert dag.schedule_interval != '@impossible', \
f"DAG {dag_id} has impossible schedule"
class TestTaskDependencies:
"""Test task dependency configurations."""
@pytest.fixture
def dagbag(self):
return DagBag(dag_folder='/opt/airflow/dags', include_examples=False)
def test_no孤立_tasks(self, dagbag):
"""All tasks should have upstream or downstream dependencies."""
for dag_id, dag in dagbag.dags.items():
for task in dag.tasks:
# EmptyOperator and BranchPythonOperator can be isolated
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import BranchPythonOperator
if type(task) not in [EmptyOperator, BranchPythonOperator]:
assert task.upstream_list or task.downstream_list, \
f"Task {task.task_id} in DAG {dag_id} is isolated"
def test_task_count_reasonable(self, dagbag):
"""DAGs should not have excessive task counts."""
for dag_id, dag in dagbag.dags.items():
assert len(dag.tasks) <= 100, \
f"DAG {dag_id} has {len(dag.tasks)} tasks (max 100)"
class TestTaskLogic:
"""Test task business logic with mocks."""
def test_etl_extract(self):
"""Test ETL extract function."""
from dags.etl_dag import extract
mock_context = MagicMock()
mock_context['ti'] = MagicMock()
mock_context['params'] = {'source': 'test'}
with patch('dags.etl_dag.get_source_connection') as mock_conn:
mock_conn.return_value.query.return_value = [
{'id': 1, 'value': 'test'}
]
result = extract(**mock_context)
assert result['record_count'] == 1
assert result['status'] == 'success'
def test_etl_transform(self):
"""Test ETL transform function."""
from dags.etl_dag import transform
input_data = [
{'id': 1, 'name': 'test', 'value': 100},
{'id': 2, 'name': 'test2', 'value': 200},
]
result = transform(input_data)
assert len(result) == 2
assert all('transformed' in r for r in result)
def test_etl_load(self):
"""Test ETL load function."""
from dags.etl_dag import load
data = [{'id': 1, 'transformed': True}]
with patch('dags.etl_dag.get_db_connection') as mock_db:
mock_db.return_value.execute.return_value = 1
result = load(data)
assert result['loaded'] == 1
class TestPerformance:
"""Test DAG execution performance."""
def test_dag_parsing_time(self, dagbag):
"""DAGs should parse quickly."""
import time
start = time.time()
DagBag(dag_folder='/opt/airflow/dags', include_examples=False)
parse_time = time.time() - start
assert parse_time < 30, f"DAG parsing took {parse_time:.2f}s (max 30s)"
def test_task_execution_time(self):
"""Tasks should execute within time limits."""
from airflow.models import DagBag
dagbag = DagBag(dag_folder='/opt/airflow/dags')
for dag_id, dag in dagbag.dags.items():
for task in dag.tasks:
if hasattr(task, 'execution_timeout'):
if task.execution_timeout:
assert task.execution_timeout.total_seconds() <= 3600, \
f"Task {task.task_id} has excessive timeout"
CI/CD Test Configuration
# .github/workflows/test.yml
name: Airflow Tests
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: |
pip install flake8 black isort mypy
- name: Lint with flake8
run: flake8 dags/ tests/
- name: Check formatting with black
run: black --check dags/ tests/
- name: Sort imports
run: isort --check-only dags/ tests/
test:
runs-on: ubuntu-latest
needs: lint
services:
postgres:
image: postgres:15
env:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
ports:
- 5432:5432
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install Airflow
run: |
pip install apache-airflow==2.8.0
pip install pytest pytest-cov
- name: Run DAG validation
run: pytest tests/test_dag_validation.py -v
- name: Run unit tests
run: pytest tests/test_tasks.py -v --cov=dags --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v3
integration:
runs-on: ubuntu-latest
needs: test
steps:
- uses: actions/checkout@v3
- name: Run integration tests
run: |
docker-compose -f docker-compose-test.yml up -d
pytest tests/integration/ -v
docker-compose -f docker-compose-test.yml down
Performance Metrics
Test Suite Performance
| Metric | Target | Warning | Critical |
|---|---|---|---|
| Unit Test Duration | < 5min | 5-10min | > 10min |
| Integration Test Duration | < 15min | 15-30min | > 30min |
| Total CI Duration | < 30min | 30-60min | > 60min |
| Test Coverage | > 80% | 70-80% | < 70% |
| Flaky Test Rate | < 1% | 1-5% | > 5% |
Test Distribution
| Test Category | Count | Duration | Priority |
|---|---|---|---|
| DAG Validation | 10-20 | 10s | P0 |
| Unit Tests | 50-100 | 2-5min | P0 |
| Integration Tests | 10-20 | 10-15min | P1 |
| E2E Tests | 5-10 | 20-30min | P2 |
Key Takeaways:
- Use
DagBagfor fast DAG validation without running Airflow services - Mock external systems (databases, APIs) in unit tests
- Aim for 80% unit test coverage, 15% integration, 5% E2E
- Validate DAG structure: acyclicity, required attributes, valid operators
- Test task business logic independently of Airflow execution
- Use CI/CD pipelines for automated testing on every commit
See Also
- DAG Design Patterns — Patterns that improve testability
- Error Handling — Testing error scenarios
- CI/CD Pipelines — Automating test execution
- Performance Tuning — Testing performance requirements