CW

Complex Multi-DAG Orchestration in Airflow

Free Lesson

Advertisement

Complex Multi-DAG Orchestration

Architecture Diagram

Formal Definitions

DfMulti-DAG Orchestration

Multi-DAG orchestration coordinates execution across multiple independent DAGs using triggers, sensors, callbacks, or external event systems. Given DAGs D1,D2,,DnD_1, D_2, \ldots, D_n, orchestration defines a dependency graph Ginter=(VDAGs,Edeps)G_{\text{inter}} = (V_{\text{DAGs}}, E_{\text{deps}}) where edges represent trigger, wait, or callback relationships.

DfDAG Dependency Graph

An inter-DAG dependency graph Ginter=(V,E)G_{\text{inter}} = (V, E) has nodes V={D1,,Dn}V = \{D_1, \ldots, D_n\} (each a complete DAG) and directed edges EE where (Di,Dj)E(D_i, D_j) \in E means DjD_j depends on completion of DiD_i.

DfTriggered Run

A triggered run is a DAG run created by TriggerDagRunOperator rather than the scheduler's automatic scheduling. It runs with run_type='triggered' and inherits configuration from the triggering task.

Detailed Explanation

Pattern 1: TriggerDagRunOperator

The simplest way to invoke a child DAG from a parent. The parent triggers the child and can optionally wait for completion.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.python import PythonOperator

with DAG(
    dag_id='parent_orchestrator',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False,
    tags=['orchestration', 'parent'],
) as parent_dag:

    def preprocess(**context):
        """Prepare data before child DAG execution."""
        context['ti'].xcom_push(key='record_count', value=15000)
        context['ti'].xcom_push(key='source_date', value=context['ds'])

    trigger_etl = TriggerDagRunOperator(
        task_id='trigger_etl_pipeline',
        trigger_dag_id='child_etl_pipeline',
        conf={
            'source_date': '{{ ds }}',
            'record_count': '{{ ti.xcom_pull(task_ids="preprocess", key="record_count") }}',
        },
        wait_for_completion=True,
        execution_timeout=timedelta(hours=2),
    )

    trigger_quality = TriggerDagRunOperator(
        task_id='trigger_data_quality',
        trigger_dag_id='child_data_quality',
        conf={'source_date': '{{ ds }}'},
        wait_for_completion=True,
    )

    def post_process(**context):
        """Run after all children complete."""
        import logging
        logger = logging.getLogger(__name__)
        logger.info('All child DAGs completed successfully')

    process = PythonOperator(task_id='preprocess', python_callable=preprocess)
    post = PythonOperator(task_id='post_process', python_callable=post_process)

    process >> trigger_etl >> trigger_quality >> post

Pattern 2: ExternalTaskSensor

Wait for a specific task in another DAG to reach a terminal state. This creates implicit inter-DAG dependencies.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.python import PythonOperator


with DAG(
    dag_id='reporting_pipeline',
    start_date=datetime(2024, 1, 1),
    schedule_interval='0 6 * * *',
    catchup=False,
    tags=['reporting'],
) as report_dag:

    wait_for_etl = ExternalTaskSensor(
        task_id='wait_for_etl_complete',
        external_dag_id='etl_pipeline',
        external_task_id='load_to_warehouse',
        execution_delta=timedelta(hours=1),
        timeout=timedelta(hours=4).total_seconds(),
        mode='reschedule',
        poke_interval=300,
    )

    wait_for_quality = ExternalTaskSensor(
        task_id='wait_for_quality_checks',
        external_dag_id='data_quality_checks',
        external_task_id='validate_tables',
        execution_delta=timedelta(hours=1),
        timeout=timedelta(hours=4).total_seconds(),
        mode='reschedule',
    )

    generate_reports = PythonOperator(
        task_id='generate_reports',
        python_callable=lambda: print('Generating reports...'),
    )

    [wait_for_etl, wait_for_quality] >> generate_reports

Pattern 3: Cross-DAG XCom with Dynamic DAGs

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator


def check_downstream_status(**context):
    """Query metadata DB for downstream DAG status."""
    from airflow.models import DagRun, TaskInstance
    from airflow import settings
    from airflow.utils.state import State

    session = settings.Session()
    downstream_dags = ['etl_pipeline', 'quality_checks', 'reporting']

    statuses = {}
    for dag_id in downstream_dags:
        runs = session.query(DagRun).filter(
            DagRun.dag_id == dag_id,
            DagRun.execution_date >= context['execution_date'] - timedelta(hours=1),
        ).all()

        statuses[dag_id] = {
            'run_count': len(runs),
            'latest_state': runs[-1].state if runs else 'no_runs',
        }

    context['ti'].xcom_push(key='downstream_statuses', value=statuses)
    return statuses


with DAG(
    dag_id='multi_dag_monitor',
    start_date=datetime(2024, 1, 1),
    schedule_interval='*/15 * * * *',
    catchup=False,
    tags=['monitoring'],
) as monitor_dag:

    check = PythonOperator(
        task_id='check_downstream_status',
        python_callable=check_downstream_status,
    )

    alert = SlackWebhookOperator(
        task_id='alert_on_failure',
        slack_webhook_conn_id='slack_webhook',
        message=(
            'Downstream DAGs status:\n'
            '{{ ti.xcom_pull(task_ids="check_downstream_status", key="downstream_statuses") }}'
        ),
    )

    check >> alert

Pattern 4: Callback-Driven Coordination

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import json
import logging

logger = logging.getLogger(__name__)


def on_child_failure(context):
    """Callback triggered when child DAG fails."""
    dag_run = context.get('dag_run')
    task_instance = context.get('task_instance')

    logger.error(
        f'Child DAG failed: {dag_run.dag_id}, '
        f'run_id: {dag_run.run_id}'
    )

    # Send alert
    from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
    hook = SlackWebhookHook(slack_webhook_conn_id='slack_webhook')
    hook.send(
        f'Child DAG Failure Alert\n'
        f'DAG: {dag_run.dag_id}\n'
        f'Run: {dag_run.run_id}\n'
        f'Task: {task_instance.task_id}'
    )


def on_child_success(context):
    """Callback triggered when child DAG succeeds."""
    dag_run = context.get('dag_run')
    logger.info(f'Child DAG completed: {dag_run.dag_id}')


with DAG(
    dag_id='callback_orchestrator',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False,
    tags=['callbacks'],
) as callback_dag:

    from airflow.operators.trigger_dagrun import TriggerDagRunOperator

    trigger_pipeline = TriggerDagRunOperator(
        task_id='trigger_pipeline',
        trigger_dag_id='data_pipeline',
        on_success_callback=on_child_success,
        on_failure_callback=on_child_failure,
        wait_for_completion=True,
    )

    trigger_pipeline

Pattern 5: Fan-Out / Fan-In with Multiple DAGs

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.python import PythonOperator


with DAG(
    dag_id='fanout_fanin_orchestrator',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False,
    tags=['fanout', 'fanin'],
) as orchestrator:

    # Fan-out: trigger multiple child DAGs in parallel
    regions = ['us-east-1', 'eu-west-1', 'ap-southeast-1']
    triggers = []

    for region in regions:
        trigger = TriggerDagRunOperator(
            task_id=f'trigger_{region.replace("-", "_")}',
            trigger_dag_id='region_etl',
            conf={'region': region},
            wait_for_completion=False,
        )
        triggers.append(trigger)

    # Fan-in: wait for all children to complete
    sensors = []
    for region in regions:
        sensor = ExternalTaskSensor(
            task_id=f'wait_{region.replace("-", "_")}',
            external_dag_id='region_etl',
            external_task_id='complete',
            mode='reschedule',
            poke_interval=60,
            execution_delta=timedelta(minutes=5),
        )
        sensors.append(sensor)

    consolidate = PythonOperator(
        task_id='consolidate_results',
        python_callable=lambda: print('All regions processed'),
    )

    triggers >> sensors >> consolidate

Key Concepts Table

PatternMechanismWait TypeComplexityBest For
TriggerDagRunOperatorCreates child dag_runBlocking or non-blockingLowSimple parent-child
ExternalTaskSensorPolls child task stateReschedule mode recommendedMediumImplicit dependencies
Callback-basedDAG failure/success hooksAsynchronous notificationMediumAlerting and logging
Dynamic DAGsCode-generated dependenciesVariableHighMulti-team coordination
Fan-Out/Fan-InParallel triggers + consolidated waitParallel with barrierMediumMulti-region/multi-table

Performance Metrics

MetricTriggerDagRunExternalTaskSensorCallbacks
Latency to trigger<1sN/A (polling)Immediate
Worker slot during waitReleased (if wait=False)Held (unless reschedule)N/A
ScalabilityHighMedium (sensor overhead)High
Error handlingTask failureSensor timeoutEvent-driven

Best Practices

  1. Use mode='reschedule' on ExternalTaskSensor to avoid holding worker slots during waits.
  2. Set execution_delta or execution_date_fn to match the child DAG's execution schedule.
  3. Implement wait_for_completion=True on TriggerDagRunOperator when downstream tasks depend on child completion.
  4. Use callbacks for alerting and logging — they fire independently of task state transitions.
  5. Avoid deep inter-DAG chains — they create fragile, hard-to-debug dependencies.
  6. Use XCom sparingly across DAGs — prefer external state stores for large data.
  7. Tag orchestration DAGs distinctly from child DAGs for monitoring clarity.
  8. Set execution_timeout on sensors and triggers to prevent indefinite hangs.

When using ExternalTaskSensor with multiple child DAGs running on different schedules, use execution_date_fn to dynamically compute the expected execution date rather than a fixed execution_delta.

The TriggerDagRunOperator with wait_for_completion=False creates a fire-and-forget trigger. The parent task completes immediately while the child runs independently. Use this when the parent doesn't need to wait for child results.

Key Takeaways:

  • Multi-DAG orchestration uses triggers, sensors, callbacks, or external events
  • The inter-DAG dependency graph GinterG_{\text{inter}} models cross-DAG relationships
  • TriggerDagRunOperator is simplest for parent-child DAG relationships
  • ExternalTaskSensor with mode='reschedule' avoids blocking worker slots
  • Callbacks provide event-driven alerting without polling overhead
  • Avoid deep inter-DAG dependency chains for maintainability

See Also

Advertisement

Need Expert Airflow Help?

Get personalized DAG design, scheduling optimization, or production Airflow consulting.

Advertisement