Complex Multi-DAG Orchestration
Architecture Diagram
Formal Definitions
DfMulti-DAG Orchestration
Multi-DAG orchestration coordinates execution across multiple independent DAGs using triggers, sensors, callbacks, or external event systems. Given DAGs , orchestration defines a dependency graph where edges represent trigger, wait, or callback relationships.
DfDAG Dependency Graph
An inter-DAG dependency graph has nodes (each a complete DAG) and directed edges where means depends on completion of .
DfTriggered Run
A triggered run is a DAG run created by TriggerDagRunOperator rather than the scheduler's automatic scheduling. It runs with run_type='triggered' and inherits configuration from the triggering task.
Detailed Explanation
Pattern 1: TriggerDagRunOperator
The simplest way to invoke a child DAG from a parent. The parent triggers the child and can optionally wait for completion.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.python import PythonOperator
with DAG(
dag_id='parent_orchestrator',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
catchup=False,
tags=['orchestration', 'parent'],
) as parent_dag:
def preprocess(**context):
"""Prepare data before child DAG execution."""
context['ti'].xcom_push(key='record_count', value=15000)
context['ti'].xcom_push(key='source_date', value=context['ds'])
trigger_etl = TriggerDagRunOperator(
task_id='trigger_etl_pipeline',
trigger_dag_id='child_etl_pipeline',
conf={
'source_date': '{{ ds }}',
'record_count': '{{ ti.xcom_pull(task_ids="preprocess", key="record_count") }}',
},
wait_for_completion=True,
execution_timeout=timedelta(hours=2),
)
trigger_quality = TriggerDagRunOperator(
task_id='trigger_data_quality',
trigger_dag_id='child_data_quality',
conf={'source_date': '{{ ds }}'},
wait_for_completion=True,
)
def post_process(**context):
"""Run after all children complete."""
import logging
logger = logging.getLogger(__name__)
logger.info('All child DAGs completed successfully')
process = PythonOperator(task_id='preprocess', python_callable=preprocess)
post = PythonOperator(task_id='post_process', python_callable=post_process)
process >> trigger_etl >> trigger_quality >> post
Pattern 2: ExternalTaskSensor
Wait for a specific task in another DAG to reach a terminal state. This creates implicit inter-DAG dependencies.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.python import PythonOperator
with DAG(
dag_id='reporting_pipeline',
start_date=datetime(2024, 1, 1),
schedule_interval='0 6 * * *',
catchup=False,
tags=['reporting'],
) as report_dag:
wait_for_etl = ExternalTaskSensor(
task_id='wait_for_etl_complete',
external_dag_id='etl_pipeline',
external_task_id='load_to_warehouse',
execution_delta=timedelta(hours=1),
timeout=timedelta(hours=4).total_seconds(),
mode='reschedule',
poke_interval=300,
)
wait_for_quality = ExternalTaskSensor(
task_id='wait_for_quality_checks',
external_dag_id='data_quality_checks',
external_task_id='validate_tables',
execution_delta=timedelta(hours=1),
timeout=timedelta(hours=4).total_seconds(),
mode='reschedule',
)
generate_reports = PythonOperator(
task_id='generate_reports',
python_callable=lambda: print('Generating reports...'),
)
[wait_for_etl, wait_for_quality] >> generate_reports
Pattern 3: Cross-DAG XCom with Dynamic DAGs
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
def check_downstream_status(**context):
"""Query metadata DB for downstream DAG status."""
from airflow.models import DagRun, TaskInstance
from airflow import settings
from airflow.utils.state import State
session = settings.Session()
downstream_dags = ['etl_pipeline', 'quality_checks', 'reporting']
statuses = {}
for dag_id in downstream_dags:
runs = session.query(DagRun).filter(
DagRun.dag_id == dag_id,
DagRun.execution_date >= context['execution_date'] - timedelta(hours=1),
).all()
statuses[dag_id] = {
'run_count': len(runs),
'latest_state': runs[-1].state if runs else 'no_runs',
}
context['ti'].xcom_push(key='downstream_statuses', value=statuses)
return statuses
with DAG(
dag_id='multi_dag_monitor',
start_date=datetime(2024, 1, 1),
schedule_interval='*/15 * * * *',
catchup=False,
tags=['monitoring'],
) as monitor_dag:
check = PythonOperator(
task_id='check_downstream_status',
python_callable=check_downstream_status,
)
alert = SlackWebhookOperator(
task_id='alert_on_failure',
slack_webhook_conn_id='slack_webhook',
message=(
'Downstream DAGs status:\n'
'{{ ti.xcom_pull(task_ids="check_downstream_status", key="downstream_statuses") }}'
),
)
check >> alert
Pattern 4: Callback-Driven Coordination
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import json
import logging
logger = logging.getLogger(__name__)
def on_child_failure(context):
"""Callback triggered when child DAG fails."""
dag_run = context.get('dag_run')
task_instance = context.get('task_instance')
logger.error(
f'Child DAG failed: {dag_run.dag_id}, '
f'run_id: {dag_run.run_id}'
)
# Send alert
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
hook = SlackWebhookHook(slack_webhook_conn_id='slack_webhook')
hook.send(
f'Child DAG Failure Alert\n'
f'DAG: {dag_run.dag_id}\n'
f'Run: {dag_run.run_id}\n'
f'Task: {task_instance.task_id}'
)
def on_child_success(context):
"""Callback triggered when child DAG succeeds."""
dag_run = context.get('dag_run')
logger.info(f'Child DAG completed: {dag_run.dag_id}')
with DAG(
dag_id='callback_orchestrator',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
catchup=False,
tags=['callbacks'],
) as callback_dag:
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
trigger_pipeline = TriggerDagRunOperator(
task_id='trigger_pipeline',
trigger_dag_id='data_pipeline',
on_success_callback=on_child_success,
on_failure_callback=on_child_failure,
wait_for_completion=True,
)
trigger_pipeline
Pattern 5: Fan-Out / Fan-In with Multiple DAGs
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.python import PythonOperator
with DAG(
dag_id='fanout_fanin_orchestrator',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
catchup=False,
tags=['fanout', 'fanin'],
) as orchestrator:
# Fan-out: trigger multiple child DAGs in parallel
regions = ['us-east-1', 'eu-west-1', 'ap-southeast-1']
triggers = []
for region in regions:
trigger = TriggerDagRunOperator(
task_id=f'trigger_{region.replace("-", "_")}',
trigger_dag_id='region_etl',
conf={'region': region},
wait_for_completion=False,
)
triggers.append(trigger)
# Fan-in: wait for all children to complete
sensors = []
for region in regions:
sensor = ExternalTaskSensor(
task_id=f'wait_{region.replace("-", "_")}',
external_dag_id='region_etl',
external_task_id='complete',
mode='reschedule',
poke_interval=60,
execution_delta=timedelta(minutes=5),
)
sensors.append(sensor)
consolidate = PythonOperator(
task_id='consolidate_results',
python_callable=lambda: print('All regions processed'),
)
triggers >> sensors >> consolidate
Key Concepts Table
| Pattern | Mechanism | Wait Type | Complexity | Best For |
|---|---|---|---|---|
| TriggerDagRunOperator | Creates child dag_run | Blocking or non-blocking | Low | Simple parent-child |
| ExternalTaskSensor | Polls child task state | Reschedule mode recommended | Medium | Implicit dependencies |
| Callback-based | DAG failure/success hooks | Asynchronous notification | Medium | Alerting and logging |
| Dynamic DAGs | Code-generated dependencies | Variable | High | Multi-team coordination |
| Fan-Out/Fan-In | Parallel triggers + consolidated wait | Parallel with barrier | Medium | Multi-region/multi-table |
Performance Metrics
| Metric | TriggerDagRun | ExternalTaskSensor | Callbacks |
|---|---|---|---|
| Latency to trigger | <1s | N/A (polling) | Immediate |
| Worker slot during wait | Released (if wait=False) | Held (unless reschedule) | N/A |
| Scalability | High | Medium (sensor overhead) | High |
| Error handling | Task failure | Sensor timeout | Event-driven |
Best Practices
- Use
mode='reschedule'on ExternalTaskSensor to avoid holding worker slots during waits. - Set
execution_deltaorexecution_date_fnto match the child DAG's execution schedule. - Implement
wait_for_completion=Trueon TriggerDagRunOperator when downstream tasks depend on child completion. - Use callbacks for alerting and logging — they fire independently of task state transitions.
- Avoid deep inter-DAG chains — they create fragile, hard-to-debug dependencies.
- Use XCom sparingly across DAGs — prefer external state stores for large data.
- Tag orchestration DAGs distinctly from child DAGs for monitoring clarity.
- Set
execution_timeouton sensors and triggers to prevent indefinite hangs.
When using ExternalTaskSensor with multiple child DAGs running on different schedules, use execution_date_fn to dynamically compute the expected execution date rather than a fixed execution_delta.
The TriggerDagRunOperator with wait_for_completion=False creates a fire-and-forget trigger. The parent task completes immediately while the child runs independently. Use this when the parent doesn't need to wait for child results.
Key Takeaways:
- Multi-DAG orchestration uses triggers, sensors, callbacks, or external events
- The inter-DAG dependency graph models cross-DAG relationships
- TriggerDagRunOperator is simplest for parent-child DAG relationships
- ExternalTaskSensor with
mode='reschedule'avoids blocking worker slots - Callbacks provide event-driven alerting without polling overhead
- Avoid deep inter-DAG dependency chains for maintainability
See Also
- DAG Design Patterns — DAG composition and dependency patterns
- Scheduling and Triggers — Timetables and scheduling patterns
- Branching Logic — BranchPythonOperator and conditional workflows
- XCom Communications — Task communication and data passing