CW

Dynamic DAG Generation in Airflow

Free Lesson

Advertisement

Dynamic DAG Generation in Airflow

Architecture Diagram

Formal Definitions

DfDynamic DAG Generation

Dynamic DAG generation is the process of creating DAG objects at parse time using a factory function that reads external configuration. Given a configuration set C={c1,c2,,cn}C = \{c_1, c_2, \ldots, c_n\}, the generator produces nn DAGs: Di=generate(ci)D_i = \text{generate}(c_i) for each ciCc_i \in C.

DfDAG Factory Function

A DAG factory function f:CDAGf: C \rightarrow \text{DAG} takes a configuration object and returns a fully-formed DAG instance. The function encapsulates task creation, dependency wiring, and scheduling logic parametrically.

DfIdempotent Generation

Idempotent generation means that running the factory with the same configuration produces equivalent DAGs. Formally, cC\forall c \in C: f(c)f(c)f(c) \equiv f(c) (same task structure, dependencies, and parameters). This is critical for scheduler stability.

Detailed Explanation

Why Dynamic DAGs?

Static DAG definitions work well for small deployments. As the number of pipelines grows (hundreds or thousands), manual DAG definitions become unsustainable. Dynamic generation lets you define pipeline logic once and instantiate it across multiple datasets, teams, or environments from configuration.

Pattern 1: YAML-Driven Generation

# dags/pipeline_factory.py
import yaml
from pathlib import Path
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator


def load_pipeline_config(config_path: str = '/opt/airflow/configs/pipelines.yaml') -> list:
    with open(config_path, 'r') as f:
        return yaml.safe_load(f)


def create_etl_task(task_id: str, sql: str, conn_id: str):
    """Factory for creating ETL tasks."""
    return PostgresOperator(
        task_id=task_id,
        postgres_conn_id=conn_id,
        sql=sql,
    )


def create_quality_task(task_id: str, table: str, rules: dict, conn_id: str):
    """Factory for creating quality check tasks."""
    def check_quality(**context):
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        hook = PostgresHook(postgres_conn_id=conn_id)

        for rule_name, rule_sql in rules.items():
            result = hook.get_first(rule_sql)
            if result[0] == 0:
                raise ValueError(f'Quality rule failed: {rule_name}')
        return True

    return PythonOperator(
        task_id=task_id,
        python_callable=check_quality,
    )


def generate_dags():
    """Main DAG generator function."""
    configs = load_pipeline_config()

    for config in configs:
        dag_id = f'etl_{config["name"]}'
        schedule = config.get('schedule', '@daily')

        with DAG(
            dag_id=dag_id,
            default_args={
                'owner': config.get('owner', 'data-team'),
                'retries': config.get('retries', 2),
                'retry_delay': timedelta(minutes=5),
            },
            start_date=datetime.fromisoformat(config['start_date']),
            schedule_interval=schedule,
            catchup=config.get('catchup', False),
            tags=config.get('tags', []),
            max_active_runs=config.get('max_active_runs', 1),
        ) as dag:

            extract = create_etl_task(
                task_id='extract',
                sql=config['extract_sql'],
                conn_id=config['source_conn'],
            )

            transform = create_etl_task(
                task_id='transform',
                sql=config['transform_sql'],
                conn_id=config['target_conn'],
            )

            load = create_etl_task(
                task_id='load',
                sql=config['load_sql'],
                conn_id=config['target_conn'],
            )

            quality = create_quality_task(
                task_id='quality_check',
                table=config['target_table'],
                rules=config.get('quality_rules', {}),
                conn_id=config['target_conn'],
            )

            extract >> transform >> load >> quality

        globals()[dag_id] = dag


generate_dags()

Pipeline Configuration YAML

# /opt/airflow/configs/pipelines.yaml
- name: orders_daily
  owner: orders-team
  start_date: "2024-01-01"
  schedule: "0 2 * * *"
  source_conn: source_postgres
  target_conn: warehouse_postgres
  target_table: fct_orders
  extract_sql: |
    SELECT * FROM raw_orders
    WHERE date = '{{ ds }}'
  transform_sql: |
    INSERT INTO stg_orders
    SELECT order_id, customer_id, amount * 1.1 as adjusted_amount
    FROM raw_orders WHERE date = '{{ ds }}'
  load_sql: |
    INSERT INTO fct_orders
    SELECT * FROM stg_orders WHERE date = '{{ ds }}'
  quality_rules:
    not_null: "SELECT COUNT(*) FROM fct_orders WHERE order_id IS NULL"
    positive_amount: "SELECT COUNT(*) FROM fct_orders WHERE amount <= 0"
  tags: ["orders", "daily"]

- name: customers_weekly
  owner: customer-team
  start_date: "2024-01-01"
  schedule: "0 6 * * 1"
  source_conn: source_mysql
  target_conn: warehouse_postgres
  target_table: dim_customers
  extract_sql: "SELECT * FROM customers WHERE updated_at >= '{{ ds }}'"
  transform_sql: "INSERT INTO dim_customers SELECT * FROM stg_customers"
  load_sql: "SELECT 1"
  quality_rules: {}
  tags: ["customers", "weekly"]

Pattern 2: Database-Driven Generation

# dags/db_driven_generator.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow import settings
from airflow.models import Variable


def fetch_pipeline_configs() -> list:
    """Fetch pipeline configurations from database."""
    from sqlalchemy import text

    session = settings.Session()
    result = session.execute(text("""
        SELECT pipeline_id, name, owner, schedule,
               source_conn, target_conn, query, config_json
        FROM pipeline_configurations
        WHERE is_active = true
    """))

    return [
        {
            'pipeline_id': row[0],
            'name': row[1],
            'owner': row[2],
            'schedule': row[3],
            'source_conn': row[4],
            'target_conn': row[5],
            'query': row[6],
            'config': row[7],
        }
        for row in result
    ]


def generate_dags():
    configs = fetch_pipeline_configs()

    for config in configs:
        dag_id = f'pipeline_{config["pipeline_id"]}'

        with DAG(
            dag_id=dag_id,
            default_args={'owner': config['owner']},
            start_date=datetime(2024, 1, 1),
            schedule_interval=config['schedule'],
            catchup=False,
        ) as dag:

            def run_pipeline(**context, cfg=config):
                from airflow.providers.postgres.hooks.postgres import PostgresHook
                hook = PostgresHook(postgres_conn_id=cfg['source_conn'])
                records = hook.get_records(cfg['query'])
                context['ti'].xcom_push(key='record_count', value=len(records))

            process = PythonOperator(
                task_id='process',
                python_callable=run_pipeline,
            )

        globals()[dag_id] = dag


generate_dags()

Pattern 3: Loop-Based Generation

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator


regions = ['us-east-1', 'us-west-2', 'eu-west-1', 'ap-southeast-1']
tables = ['orders', 'customers', 'products']
schedule = '0 3 * * *'

for region in regions:
    for table in tables:
        dag_id = f'etl_{region.replace("-", "_")}_{table}'

        with DAG(
            dag_id=dag_id,
            start_date=datetime(2024, 1, 1),
            schedule_interval=schedule,
            catchup=False,
            tags=[region, table],
        ) as dag:

            start = EmptyOperator(task_id='start')

            def extract(**context, r=region, t=table):
                print(f'Extracting {t} from {r}')

            def transform(**context, r=region, t=table):
                print(f'Transforming {t} for {r}')

            def load(**context, r=region, t=table):
                print(f'Loading {t} to {r}')

            ext = PythonOperator(task_id='extract', python_callable=extract)
            xform = PythonOperator(task_id='transform', python_callable=transform)
            ld = PythonOperator(task_id='load', python_callable=load)

            start >> ext >> xform >> ld

        globals()[dag_id] = dag

Key Concepts Table

PatternConfig SourceCouplingScalabilityComplexity
YAML-drivenFile on diskLowMedium (file size)Low
Database-drivenSQL databaseLowHighMedium
Loop-basedPython literalsHighLow (code change needed)Low
API-drivenExternal serviceLowHighHigh
Template-basedJinja templatesMediumMediumMedium

Performance Metrics

MetricStatic DAGsDynamic DAGsConsideration
Parse timeO(1) per fileO(n) per config itemOptimize config reads
Scheduler memoryFixed per DAGProportional to DAG countMonitor at >1000 DAGs
DAG file count1 file = 1 DAG1 file = N DAGsFewer files, more objects
ReconfigurationCode change + deployConfig change + parseFaster iteration

Best Practices

  1. Idempotent generation: Always produce equivalent DAGs from the same config. Avoid time-dependent values in DAG definitions.
  2. Cache config reads: When reading from databases or APIs, cache results during parse to avoid repeated calls.
  3. Set max_active_runs: Prevent resource exhaustion when many generated DAGs trigger simultaneously.
  4. Use globals() registration: Generated DAGs must be added to globals() for the scheduler to discover them.
  5. Tag generated DAGs: Include source config identifiers in tags for monitoring and filtering.
  6. Monitor parse time: Dynamic generation increases parse overhead. Keep config reads fast and cacheable.
  7. Validate configs: Add schema validation for YAML/database configs before DAG generation.
  8. Version configs: Track configuration changes alongside code changes for reproducibility.

Generated DAGs are re-parsed every min_file_process_interval seconds. Ensure your generation function is fast and idempotent. For database-driven configs, consider caching the configuration in a Variable or file to avoid parse-time DB queries.

Dynamic DAGs generated from a single Python file share the file's parse time. If you generate 100 DAGs from one file, all 100 are parsed together — this is more efficient than 100 separate files.

Key Takeaways:

  • Dynamic DAG generation produces multiple DAGs from a single factory function at parse time
  • Configuration sources: YAML files, databases, APIs, or Python loops
  • Always use globals()[dag_id] = dag to register generated DAGs
  • Ensure idempotent generation — same config must produce equivalent DAGs
  • Monitor parse time as generated DAG count grows
  • Generated DAGs are re-parsed periodically; keep generation functions fast and deterministic

See Also

Advertisement

Need Expert Airflow Help?

Get personalized DAG design, scheduling optimization, or production Airflow consulting.

Advertisement