Dynamic DAG Generation in Airflow
Architecture Diagram
Formal Definitions
DfDynamic DAG Generation
Dynamic DAG generation is the process of creating DAG objects at parse time using a factory function that reads external configuration. Given a configuration set , the generator produces DAGs: for each .
DfDAG Factory Function
A DAG factory function takes a configuration object and returns a fully-formed DAG instance. The function encapsulates task creation, dependency wiring, and scheduling logic parametrically.
DfIdempotent Generation
Idempotent generation means that running the factory with the same configuration produces equivalent DAGs. Formally, : (same task structure, dependencies, and parameters). This is critical for scheduler stability.
Detailed Explanation
Why Dynamic DAGs?
Static DAG definitions work well for small deployments. As the number of pipelines grows (hundreds or thousands), manual DAG definitions become unsustainable. Dynamic generation lets you define pipeline logic once and instantiate it across multiple datasets, teams, or environments from configuration.
Pattern 1: YAML-Driven Generation
# dags/pipeline_factory.py
import yaml
from pathlib import Path
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
def load_pipeline_config(config_path: str = '/opt/airflow/configs/pipelines.yaml') -> list:
with open(config_path, 'r') as f:
return yaml.safe_load(f)
def create_etl_task(task_id: str, sql: str, conn_id: str):
"""Factory for creating ETL tasks."""
return PostgresOperator(
task_id=task_id,
postgres_conn_id=conn_id,
sql=sql,
)
def create_quality_task(task_id: str, table: str, rules: dict, conn_id: str):
"""Factory for creating quality check tasks."""
def check_quality(**context):
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id=conn_id)
for rule_name, rule_sql in rules.items():
result = hook.get_first(rule_sql)
if result[0] == 0:
raise ValueError(f'Quality rule failed: {rule_name}')
return True
return PythonOperator(
task_id=task_id,
python_callable=check_quality,
)
def generate_dags():
"""Main DAG generator function."""
configs = load_pipeline_config()
for config in configs:
dag_id = f'etl_{config["name"]}'
schedule = config.get('schedule', '@daily')
with DAG(
dag_id=dag_id,
default_args={
'owner': config.get('owner', 'data-team'),
'retries': config.get('retries', 2),
'retry_delay': timedelta(minutes=5),
},
start_date=datetime.fromisoformat(config['start_date']),
schedule_interval=schedule,
catchup=config.get('catchup', False),
tags=config.get('tags', []),
max_active_runs=config.get('max_active_runs', 1),
) as dag:
extract = create_etl_task(
task_id='extract',
sql=config['extract_sql'],
conn_id=config['source_conn'],
)
transform = create_etl_task(
task_id='transform',
sql=config['transform_sql'],
conn_id=config['target_conn'],
)
load = create_etl_task(
task_id='load',
sql=config['load_sql'],
conn_id=config['target_conn'],
)
quality = create_quality_task(
task_id='quality_check',
table=config['target_table'],
rules=config.get('quality_rules', {}),
conn_id=config['target_conn'],
)
extract >> transform >> load >> quality
globals()[dag_id] = dag
generate_dags()
Pipeline Configuration YAML
# /opt/airflow/configs/pipelines.yaml
- name: orders_daily
owner: orders-team
start_date: "2024-01-01"
schedule: "0 2 * * *"
source_conn: source_postgres
target_conn: warehouse_postgres
target_table: fct_orders
extract_sql: |
SELECT * FROM raw_orders
WHERE date = '{{ ds }}'
transform_sql: |
INSERT INTO stg_orders
SELECT order_id, customer_id, amount * 1.1 as adjusted_amount
FROM raw_orders WHERE date = '{{ ds }}'
load_sql: |
INSERT INTO fct_orders
SELECT * FROM stg_orders WHERE date = '{{ ds }}'
quality_rules:
not_null: "SELECT COUNT(*) FROM fct_orders WHERE order_id IS NULL"
positive_amount: "SELECT COUNT(*) FROM fct_orders WHERE amount <= 0"
tags: ["orders", "daily"]
- name: customers_weekly
owner: customer-team
start_date: "2024-01-01"
schedule: "0 6 * * 1"
source_conn: source_mysql
target_conn: warehouse_postgres
target_table: dim_customers
extract_sql: "SELECT * FROM customers WHERE updated_at >= '{{ ds }}'"
transform_sql: "INSERT INTO dim_customers SELECT * FROM stg_customers"
load_sql: "SELECT 1"
quality_rules: {}
tags: ["customers", "weekly"]
Pattern 2: Database-Driven Generation
# dags/db_driven_generator.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow import settings
from airflow.models import Variable
def fetch_pipeline_configs() -> list:
"""Fetch pipeline configurations from database."""
from sqlalchemy import text
session = settings.Session()
result = session.execute(text("""
SELECT pipeline_id, name, owner, schedule,
source_conn, target_conn, query, config_json
FROM pipeline_configurations
WHERE is_active = true
"""))
return [
{
'pipeline_id': row[0],
'name': row[1],
'owner': row[2],
'schedule': row[3],
'source_conn': row[4],
'target_conn': row[5],
'query': row[6],
'config': row[7],
}
for row in result
]
def generate_dags():
configs = fetch_pipeline_configs()
for config in configs:
dag_id = f'pipeline_{config["pipeline_id"]}'
with DAG(
dag_id=dag_id,
default_args={'owner': config['owner']},
start_date=datetime(2024, 1, 1),
schedule_interval=config['schedule'],
catchup=False,
) as dag:
def run_pipeline(**context, cfg=config):
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id=cfg['source_conn'])
records = hook.get_records(cfg['query'])
context['ti'].xcom_push(key='record_count', value=len(records))
process = PythonOperator(
task_id='process',
python_callable=run_pipeline,
)
globals()[dag_id] = dag
generate_dags()
Pattern 3: Loop-Based Generation
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
regions = ['us-east-1', 'us-west-2', 'eu-west-1', 'ap-southeast-1']
tables = ['orders', 'customers', 'products']
schedule = '0 3 * * *'
for region in regions:
for table in tables:
dag_id = f'etl_{region.replace("-", "_")}_{table}'
with DAG(
dag_id=dag_id,
start_date=datetime(2024, 1, 1),
schedule_interval=schedule,
catchup=False,
tags=[region, table],
) as dag:
start = EmptyOperator(task_id='start')
def extract(**context, r=region, t=table):
print(f'Extracting {t} from {r}')
def transform(**context, r=region, t=table):
print(f'Transforming {t} for {r}')
def load(**context, r=region, t=table):
print(f'Loading {t} to {r}')
ext = PythonOperator(task_id='extract', python_callable=extract)
xform = PythonOperator(task_id='transform', python_callable=transform)
ld = PythonOperator(task_id='load', python_callable=load)
start >> ext >> xform >> ld
globals()[dag_id] = dag
Key Concepts Table
| Pattern | Config Source | Coupling | Scalability | Complexity |
|---|---|---|---|---|
| YAML-driven | File on disk | Low | Medium (file size) | Low |
| Database-driven | SQL database | Low | High | Medium |
| Loop-based | Python literals | High | Low (code change needed) | Low |
| API-driven | External service | Low | High | High |
| Template-based | Jinja templates | Medium | Medium | Medium |
Performance Metrics
| Metric | Static DAGs | Dynamic DAGs | Consideration |
|---|---|---|---|
| Parse time | O(1) per file | O(n) per config item | Optimize config reads |
| Scheduler memory | Fixed per DAG | Proportional to DAG count | Monitor at >1000 DAGs |
| DAG file count | 1 file = 1 DAG | 1 file = N DAGs | Fewer files, more objects |
| Reconfiguration | Code change + deploy | Config change + parse | Faster iteration |
Best Practices
- Idempotent generation: Always produce equivalent DAGs from the same config. Avoid time-dependent values in DAG definitions.
- Cache config reads: When reading from databases or APIs, cache results during parse to avoid repeated calls.
- Set
max_active_runs: Prevent resource exhaustion when many generated DAGs trigger simultaneously. - Use
globals()registration: Generated DAGs must be added toglobals()for the scheduler to discover them. - Tag generated DAGs: Include source config identifiers in tags for monitoring and filtering.
- Monitor parse time: Dynamic generation increases parse overhead. Keep config reads fast and cacheable.
- Validate configs: Add schema validation for YAML/database configs before DAG generation.
- Version configs: Track configuration changes alongside code changes for reproducibility.
Generated DAGs are re-parsed every min_file_process_interval seconds. Ensure your generation function is fast and idempotent. For database-driven configs, consider caching the configuration in a Variable or file to avoid parse-time DB queries.
Dynamic DAGs generated from a single Python file share the file's parse time. If you generate 100 DAGs from one file, all 100 are parsed together — this is more efficient than 100 separate files.
Key Takeaways:
- Dynamic DAG generation produces multiple DAGs from a single factory function at parse time
- Configuration sources: YAML files, databases, APIs, or Python loops
- Always use
globals()[dag_id] = dagto register generated DAGs - Ensure idempotent generation — same config must produce equivalent DAGs
- Monitor parse time as generated DAG count grows
- Generated DAGs are re-parsed periodically; keep generation functions fast and deterministic
See Also
- DAG Design Patterns — DAG composition and dependency patterns
- Complex Multi-DAG Orchestration — Cross-DAG coordination patterns
- Scheduling and Triggers — Timetables and scheduling patterns
- Airflow Architecture — Core architecture and parse lifecycle