Branching Logic in Apache Airflow

Free Lesson

Advertisement

Branching Logic in Apache Airflow

Architecture Diagram

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    BRANCHING ARCHITECTURE                                    β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    BRANCHING COMPONENTS                              β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  BranchPythonOperator                                       β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ Returns task_id to execute next                        β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ Skips other branches                                   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ Follows single path                                    β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  └── Requires merge point                                   β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  ShortCircuitOperator                                       β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ Boolean return value                                   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ Skips downstream tasks if False                        β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ Simple conditional logic                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  └── No merge required                                      β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  BranchTriggerOperator                                      β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ Async branching                                        β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ Event-driven decisions                                 β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  └── Uses triggers                                          β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    BRANCHING FLOW                                     β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚   β”‚
β”‚  β”‚  β”‚  Start  │─────▢│ Branch  │─────▢│Execute  │─────▢│  Merge  β”‚ β”‚   β”‚
β”‚  β”‚  β”‚         β”‚      β”‚Decision β”‚      β”‚ Path    β”‚      β”‚  Point  β”‚ β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚   β”‚
β”‚  β”‚       β”‚                β”‚                β”‚                β”‚        β”‚   β”‚
β”‚  β”‚       β”‚   All paths    β”‚   Single path  β”‚   Skipped      β”‚        β”‚   β”‚
β”‚  β”‚       β”‚   available    β”‚   selected     β”‚   branches     β”‚        β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    BRANCHING PATTERNS                                        β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    SIMPLE BRANCH                                     β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                      β”‚   β”‚
β”‚  β”‚                    β”‚ Decisionβ”‚                                      β”‚   β”‚
β”‚  β”‚                    β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜                                      β”‚   β”‚
β”‚  β”‚                   β”Œβ”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”                                    β”‚   β”‚
β”‚  β”‚                   β–Ό           β–Ό                                    β”‚   β”‚
β”‚  β”‚              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                              β”‚   β”‚
β”‚  β”‚              β”‚ Path A  β”‚ β”‚ Path B  β”‚                              β”‚   β”‚
β”‚  β”‚              β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜                              β”‚   β”‚
β”‚  β”‚                   β””β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜                                    β”‚   β”‚
β”‚  β”‚                         β–Ό                                          β”‚   β”‚
β”‚  β”‚                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                     β”‚   β”‚
β”‚  β”‚                    β”‚  End    β”‚                                     β”‚   β”‚
β”‚  β”‚                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                     β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  Characteristics:                                                   β”‚   β”‚
β”‚  β”‚  β€’ Single decision point                                           β”‚   β”‚
β”‚  β”‚  β€’ Two possible paths                                              β”‚   β”‚
β”‚  β”‚  β€’ Simple merge                                                    β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    MULTI-BRANCH                                      β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                      β”‚   β”‚
β”‚  β”‚                    β”‚ Decisionβ”‚                                      β”‚   β”‚
β”‚  β”‚                    β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜                                      β”‚   β”‚
β”‚  β”‚              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                               β”‚   β”‚
β”‚  β”‚              β–Ό          β–Ό          β–Ό                               β”‚   β”‚
β”‚  β”‚         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                         β”‚   β”‚
β”‚  β”‚         β”‚ Path A  β”‚β”‚ Path B  β”‚β”‚ Path C  β”‚                         β”‚   β”‚
β”‚  β”‚         β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜                         β”‚   β”‚
β”‚  β”‚              β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                               β”‚   β”‚
β”‚  β”‚                         β–Ό                                          β”‚   β”‚
β”‚  β”‚                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                     β”‚   β”‚
β”‚  β”‚                    β”‚  Merge  β”‚                                     β”‚   β”‚
β”‚  β”‚                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                     β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  Characteristics:                                                   β”‚   β”‚
β”‚  β”‚  β€’ Multiple decision outcomes                                      β”‚   β”‚
β”‚  β”‚  β€’ Requires proper merge handling                                  β”‚   β”‚
β”‚  β”‚  β€’ More complex control flow                                       β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    NESTED BRANCHING                                  β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                      β”‚   β”‚
β”‚  β”‚                    β”‚Decision1β”‚                                      β”‚   β”‚
β”‚  β”‚                    β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜                                      β”‚   β”‚
β”‚  β”‚                   β”Œβ”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”                                    β”‚   β”‚
β”‚  β”‚                   β–Ό           β–Ό                                    β”‚   β”‚
β”‚  β”‚              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                              β”‚   β”‚
β”‚  β”‚              β”‚ Path A  β”‚ β”‚Decision2β”‚                              β”‚   β”‚
β”‚  β”‚              β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜                              β”‚   β”‚
β”‚  β”‚                   β”‚           β”‚                                    β”‚   β”‚
β”‚  β”‚                   β”‚      β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β”                              β”‚   β”‚
β”‚  β”‚                   β”‚      β–Ό         β–Ό                              β”‚   β”‚
β”‚  β”‚                   β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                       β”‚   β”‚
β”‚  β”‚                   β”‚ β”‚ Path B1 β”‚β”‚ Path B2 β”‚                       β”‚   β”‚
β”‚  β”‚                   β”‚ β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜                       β”‚   β”‚
β”‚  β”‚                   β”‚      β””β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜                             β”‚   β”‚
β”‚  β”‚                   β””β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜                                  β”‚   β”‚
β”‚  β”‚                         β–Ό                                          β”‚   β”‚
β”‚  β”‚                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                     β”‚   β”‚
β”‚  β”‚                    β”‚  Merge  β”‚                                     β”‚   β”‚
β”‚  β”‚                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                     β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  Characteristics:                                                   β”‚   β”‚
β”‚  β”‚  β€’ Complex decision trees                                          β”‚   β”‚
β”‚  β”‚  β€’ Multiple merge points                                           β”‚   β”‚
β”‚  β”‚  β€’ Requires careful dependency management                          β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    MERGE PATTERNS                                            β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    SIMPLE MERGE                                      β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                        β”‚   β”‚
β”‚  β”‚  β”‚ Path A  │───▢│         β”‚    β”‚         β”‚                        β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚  Merge  │───▢│  End    β”‚                        β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚         β”‚    β”‚         β”‚                        β”‚   β”‚
β”‚  β”‚  β”‚ Path B  │───▢│         β”‚    β”‚         β”‚                        β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                        β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β€’ Both paths must complete                                        β”‚   β”‚
β”‚  β”‚  β€’ Merge operator waits for all upstream tasks                     β”‚   β”‚
β”‚  β”‚  β€’ Uses TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS                   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    SELECTIVE MERGE                                   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                        β”‚   β”‚
β”‚  β”‚  β”‚ Path A  │───▢│         β”‚    β”‚         β”‚                        β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚  Merge  │───▢│  End    β”‚                        β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚         β”‚    β”‚         β”‚                        β”‚   β”‚
β”‚  β”‚  β”‚ Path B  │─ ─▢│         β”‚    β”‚         β”‚                        β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                        β”‚   β”‚
β”‚  β”‚      (skipped)                                                     β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β€’ Only executed paths contribute                                  β”‚   β”‚
β”‚  β”‚  β€’ Handles skipped upstream tasks                                  β”‚   β”‚
β”‚  β”‚  β€’ Uses TriggerRule.NONE_FAILED                                   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    CONDITIONAL MERGE                                 β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                        β”‚   β”‚
β”‚  β”‚  β”‚ Path A  │───▢│ Condition│───▢│         β”‚                        β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚  Check  β”‚    β”‚  Merge  β”‚                        β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚         β”‚    β”‚         β”‚                        β”‚   β”‚
β”‚  β”‚  β”‚ Path B  │───▢│         │───▢│         β”‚                        β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                        β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β€’ Merge based on runtime conditions                               β”‚   β”‚
β”‚  β”‚  β€’ Dynamic merge behavior                                          β”‚   β”‚
β”‚  β”‚  β€’ Custom merge logic                                              β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Formal Definitions

DfBranch Operator

A branch operator selects one or more downstream execution paths based on runtime conditions. Given a set of candidate paths P={p1,p2,…,pk}\mathcal{P} = \{p_1, p_2, \ldots, p_k\}, the branch function b:contextβ†’2Pb: \text{context} \rightarrow 2^{\mathcal{P}} returns a non-empty subset of paths to execute. All non-selected paths receive a skipped state.

DfMerge Point

A merge point is a task where multiple branches converge. The merge task uses a TriggerRule to determine when to execute given the states of upstream tasks. The most common rule is NONE_FAILED_MIN_ONE_SUCCESS, which fires when at least one upstream task succeeds and none have failed.

DfShort-Circuit Operator

A short-circuit operator returns a boolean value. If False, all downstream tasks are skipped until the end of the DAG or the next join point. Formally, fsc:context→{True,False}f_{\text{sc}}: \text{context} \rightarrow \{True, False\} where FalseFalse triggers cascade skipping.

Detailed Explanation

BranchPythonOperator

The BranchPythonOperator is the primary branching mechanism in Airflow. It allows you to choose which downstream task(s) to execute based on runtime conditions.

Return Value: The operator must return the task_id of the next task to execute. You can return a single task ID or a list of task IDs for parallel execution.

Branch Behavior: When a branch is selected, other branches are skipped. Skipped tasks receive a skipped state, which propagates downstream until a merge point.

Merge Points: All branches must converge at a merge point. The merge point should use TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS to handle skipped branches properly.

Multiple Returns: Return a list of task IDs to execute multiple branches in parallel. All returned tasks will execute simultaneously.

ShortCircuitOperator

The ShortCircuitOperator provides simpler conditional logic. It returns a boolean value, and if False, all downstream tasks are skipped.

Use Cases: Simple yes/no decisions, data validation checks, feature flags, environment-specific logic.

Difference from Branching: Unlike BranchPythonOperator, ShortCircuitOperator doesn't select specific paths. It either continues or stops entirely.

Downstream Impact: When the operator returns False, all downstream tasks are skipped until the end of the DAG or a join point.

Branch Patterns

Data-Driven Branching: Make decisions based on data characteristics. Analyze input data and choose processing paths accordingly.

b:mathcalCrightarrow2mathcalPsetminusemptysetb: \\mathcal{C} \\rightarrow 2^{\\mathcal{P}} \\setminus \\{\\emptyset\\}

Expected Task Count After Branching

mathbbE[Ntextexec]=sumi=1kpicdot∣Ti∣\\mathbb{E}[N_{\\text{exec}}] = \\sum_{i=1}^{k} p_i \\cdot |T_i|

Here,

  • kk=Number of possible branch outcomes
  • pip_i=Probability of selecting branch i
  • ∣Ti∣|T_i|=Number of tasks in branch i

Branch Overhead Ratio

Rtextbranch=fracTtextdecision+TtextskippropagationTtextexecutionR_{\\text{branch}} = \\frac{T_{\\text{decision}} + T_{\\text{skip\\_propagation}}}{T_{\\text{execution}}}

Here,

  • TdecisionT_{\text{decision}}=Time to evaluate branch conditions
  • Tskip_propagationT_{\text{skip\_propagation}}=Time for skip state to propagate through downstream tasks
  • TexecutionT_{\text{execution}}=Time to execute the selected path

ThBranch Completeness (Merge Invariant)

In a valid branching DAG, every branch must have a corresponding merge point such that: (1) all paths from the branch decision reach the merge, and (2) the merge task uses TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS or equivalent. Violation: If a branch lacks a merge point, downstream tasks may be indefinitely skipped due to unmet dependencies.

ThShort-Circuit Cascade

When a ShortCircuitOperator returns False, all downstream tasks Tdown={t:(tsc,t)∈Eβˆ—}T_{\text{down}} = \{t : (t_{\text{sc}}, t) \in E^*\} are skipped, where Eβˆ—E^* is the transitive closure of the dependency edge set. This propagation continues until a join point with an upstream task not in TdownT_{\text{down}}.

BranchPythonOperator can return a single task ID (string) or a list of task IDs. Returning a list executes multiple branches in parallel. All returned tasks will run concurrently until they reach the merge point.

Limit branch nesting depth to ≀3\leq 3 for maintainability. Deeply nested branches (>3>3 levels) significantly increase complexity and make debugging difficult. Consider refactoring into separate DAGs or using ShortCircuitOperator for simple conditions.

Environment-Based Branching: Use different logic for development, staging, and production environments.

Time-Based Branching: Execute different paths based on time of day, day of week, or other temporal conditions.

Error-Based Branching: Choose recovery paths based on error types or failure conditions.

Merge Strategies

Implicit Merge: When all branches converge at a single task, Airflow automatically handles the merge. The merge task waits for all upstream tasks to complete.

Explicit Merge: Use TriggerRule to define how the merge task handles skipped branches. Common rules include NONE_FAILED_MIN_ONE_SUCCESS and NONE_FAILED.

Conditional Merge: Implement custom merge logic using Python operators. Check upstream task states and execute appropriate logic.

Error Handling: Implement proper error handling in merge points. Consider partial failures and implement retry logic.

Key Concepts Table

OperatorReturn TypeBranching StyleMerge RequiredUse Case
BranchPythonOperatortask_id(s)Single/Multi-pathYesComplex decisions
ShortCircuitOperatorbooleanContinue/StopNoSimple conditions
BranchTriggerOperatorTriggerEventAsyncYesEvent-driven
ExternalBranchSensorExternal stateExternalYesCross-system

Code Examples

Advanced Branching Patterns

# advanced_branching.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import (
    BranchPythonOperator,
    PythonOperator,
    ShortCircuitOperator,
)
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
import random

def data_quality_branch(**context):
    """Branch based on data quality assessment."""
    # Simulate data quality check
    data_quality_score = random.uniform(0, 1)
    data_volume = random.randint(100, 10000)

    # Store metrics for downstream tasks
    context['ti'].xcom_push(key='quality_score', value=data_quality_score)
    context['ti'].xcom_push(key='data_volume', value=data_volume)

    # Decision logic
    if data_quality_score < 0.3:
        return 'data_cleaning'
    elif data_quality_score < 0.7:
        return 'data_validation'
    elif data_volume > 5000:
        return 'large_dataset_processing'
    else:
        return 'standard_processing'

def time_based_branch(**context):
    """Branch based on time of day."""
    from airflow.utils import timezone

    current_time = timezone.utcnow()
    hour = current_time.hour

    if hour < 6:
        return 'night_processing'
    elif hour < 12:
        return 'morning_processing'
    elif hour < 18:
        return 'afternoon_processing'
    else:
        return 'evening_processing'

def environment_branch(**context):
    """Branch based on environment."""
    from airflow.configuration import conf

    environment = conf.get('core', 'ENVIRONMENT')

    if environment == 'production':
        return 'production_processing'
    elif environment == 'staging':
        return 'staging_processing'
    else:
        return 'development_processing'

def error_recovery_branch(**context):
    """Branch based on error type."""
    error_type = context['ti'].xcom_pull(
        task_ids='error_detection',
        key='error_type'
    )

    error_handling_paths = {
        'data_corruption': 'data_recovery',
        'network_error': 'retry_operation',
        'resource_exhaustion': 'scale_resources',
        'timeout': 'extend_timeout',
    }

    return error_handling_paths.get(error_type, 'default_recovery')

def data_cleaning(**context):
    """Clean corrupted data."""
    print("Performing data cleaning...")
    quality_score = context['ti'].xcom_pull(
        task_ids='quality_check',
        key='quality_score'
    )
    print(f"Quality score before cleaning: {quality_score}")

def data_validation(**context):
    """Validate data quality."""
    print("Performing data validation...")
    quality_score = context['ti'].xcom_pull(
        task_ids='quality_check',
        key='quality_score'
    )
    print(f"Quality score: {quality_score}")

def large_dataset_processing(**context):
    """Process large datasets."""
    print("Processing large dataset...")
    data_volume = context['ti'].xcom_pull(
        task_ids='quality_check',
        key='data_volume'
    )
    print(f"Processing {data_volume} records")

def standard_processing(**context):
    """Standard data processing."""
    print("Performing standard processing...")

def aggregate_results(**context):
    """Aggregate results from all branches."""
    # Get all upstream task states
    upstream_tasks = ['data_cleaning', 'data_validation',
                     'large_dataset_processing', 'standard_processing']

    results = {}
    for task_id in upstream_tasks:
        try:
            result = context['ti'].xcom_pull(task_ids=task_id)
            results[task_id] = result
        except Exception:
            results[task_id] = None

    print(f"Aggregated results: {results}")
    return results

with DAG(
    'advanced_branching_dag',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='Advanced branching patterns',
    schedule_interval=timedelta(hours=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['branching', 'advanced'],
) as dag:

    start = EmptyOperator(task_id='start')

    # Quality check branch
    quality_branch = BranchPythonOperator(
        task_id='quality_check',
        python_callable=data_quality_branch,
    )

    # Branch tasks
    cleaning = PythonOperator(
        task_id='data_cleaning',
        python_callable=data_cleaning,
    )

    validation = PythonOperator(
        task_id='data_validation',
        python_callable=data_validation,
    )

    large_processing = PythonOperator(
        task_id='large_dataset_processing',
        python_callable=large_dataset_processing,
    )

    standard_processing = PythonOperator(
        task_id='standard_processing',
        python_callable=standard_processing,
    )

    # Merge point
    merge = PythonOperator(
        task_id='aggregate_results',
        python_callable=aggregate_results,
        trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
    )

    # Time-based branch
    time_branch = BranchPythonOperator(
        task_id='time_branch',
        python_callable=time_based_branch,
    )

    # Time-based tasks
    night_task = PythonOperator(
        task_id='night_processing',
        python_callable=lambda: print("Night processing"),
    )

    morning_task = PythonOperator(
        task_id='morning_processing',
        python_callable=lambda: print("Morning processing"),
    )

    afternoon_task = PythonOperator(
        task_id='afternoon_processing',
        python_callable=lambda: print("Afternoon processing"),
    )

    evening_task = PythonOperator(
        task_id='evening_processing',
        python_callable=lambda: print("Evening processing"),
    )

    # Final merge
    final_merge = EmptyOperator(
        task_id='final_merge',
        trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
    )

    # Set dependencies
    start >> quality_branch
    quality_branch >> [cleaning, validation, large_processing, standard_processing]
    [cleaning, validation, large_processing, standard_processing] >> merge
    merge >> time_branch
    time_branch >> [night_task, morning_task, afternoon_task, evening_task]
    [night_task, morning_task, afternoon_task, evening_task] >> final_merge

ShortCircuit Patterns

# shortcircuit_patterns.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import ShortCircuitOperator, PythonOperator
from airflow.operators.empty import EmptyOperator

def check_data_availability(**context):
    """Check if required data is available."""
    import os

    required_files = [
        '/data/input1.csv',
        '/data/input2.csv',
        '/data/config.json',
    ]

    missing_files = [f for f in required_files if not os.path.exists(f)]

    if missing_files:
        print(f"Missing files: {missing_files}")
        return False

    print("All required files are available")
    return True

def check_api_health(**context):
    """Check if external API is healthy."""
    import requests

    try:
        response = requests.get(
            'http://api.example.com/health',
            timeout=10
        )
        if response.status_code == 200:
            print("API is healthy")
            return True
        else:
            print(f"API returned status code: {response.status_code}")
            return False
    except Exception as e:
        print(f"API health check failed: {e}")
        return False

def check_database_connectivity(**context):
    """Check database connectivity."""
    from airflow.providers.postgres.hooks.postgres import PostgresHook

    try:
        hook = PostgresHook(postgres_conn_id='postgres_default')
        result = hook.get_first("SELECT 1")
        if result:
            print("Database connection successful")
            return True
        else:
            print("Database connection failed")
            return False
    except Exception as e:
        print(f"Database check failed: {e}")
        return False

def check_resource_availability(**context):
    """Check if sufficient resources are available."""
    import psutil

    cpu_percent = psutil.cpu_percent()
    memory = psutil.virtual_memory()

    if cpu_percent > 90:
        print(f"CPU usage too high: {cpu_percent}%")
        return False

    if memory.percent > 85:
        print(f"Memory usage too high: {memory.percent}%")
        return False

    print(f"Resources available - CPU: {cpu_percent}%, Memory: {memory.percent}%")
    return True

def process_data(**context):
    """Process data after all checks pass."""
    print("All checks passed, processing data...")

with DAG(
    'shortcircuit_patterns_dag',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='ShortCircuit operator patterns',
    schedule_interval=timedelta(hours=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['shortcircuit', 'patterns'],
) as dag:

    start = EmptyOperator(task_id='start')

    # Sequential checks
    check_files = ShortCircuitOperator(
        task_id='check_files',
        python_callable=check_data_availability,
    )

    check_api = ShortCircuitOperator(
        task_id='check_api',
        python_callable=check_api_health,
    )

    check_db = ShortCircuitOperator(
        task_id='check_database',
        python_callable=check_database_connectivity,
    )

    check_resources = ShortCircuitOperator(
        task_id='check_resources',
        python_callable=check_resource_availability,
    )

    # Processing tasks
    process = PythonOperator(
        task_id='process_data',
        python_callable=process_data,
    )

    # Cleanup
    cleanup = EmptyOperator(task_id='cleanup')

    # Set dependencies
    start >> check_files >> check_api >> check_db >> check_resources
    check_resources >> process >> cleanup

Complex Branching with External Dependencies

# complex_branching_external.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.trigger_rule import TriggerRule
from airflow.models import TaskInstance
from airflow import settings

def external_dependency_branch(**context):
    """Branch based on external DAG task states."""
    session = settings.Session()

    # Check state of external task
    external_task = session.query(TaskInstance).filter(
        TaskInstance.dag_id == 'external_pipeline',
        TaskInstance.task_id == 'extract',
        TaskInstance.execution_date == context['execution_date'],
    ).first()

    if external_task is None:
        return 'wait_for_external'

    if external_task.state == 'success':
        return 'process_data'
    elif external_task.state == 'failed':
        return 'handle_failure'
    else:
        return 'wait_for_external'

def composite_branch(**context):
    """Branch based on multiple conditions."""
    # Get data from previous tasks
    data_quality = context['ti'].xcom_pull(
        task_ids='quality_assessment',
        key='quality_score'
    )

    data_volume = context['ti'].xcom_pull(
        task_ids='volume_assessment',
        key='volume'
    )

    system_load = context['ti'].xcom_pull(
        task_ids='system_check',
        key='load'
    )

    # Composite decision logic
    if data_quality < 0.3:
        return 'data_cleaning'
    elif data_volume > 10000 and system_load > 0.8:
        return 'queue_processing'
    elif data_volume > 10000:
        return 'distributed_processing'
    else:
        return 'standard_processing'

def parallel_branch_decision(**context):
    """Decide which parallel branches to execute."""
    import random

    # Simulate random decision
    branches_to_execute = random.sample(
        ['branch_a', 'branch_b', 'branch_c', 'branch_d'],
        k=random.randint(1, 4)
    )

    return branches_to_execute

with DAG(
    'complex_branching_external_dag',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='Complex branching with external dependencies',
    schedule_interval=timedelta(hours=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['branching', 'complex', 'external'],
) as dag:

    start = EmptyOperator(task_id='start')

    # Wait for external dependency
    wait_external = ExternalTaskSensor(
        task_id='wait_for_external',
        external_dag_id='external_pipeline',
        external_task_id='extract',
        mode='reschedule',
        poke_interval=300,
    )

    # Branch based on external state
    external_branch = BranchPythonOperator(
        task_id='external_dependency_branch',
        python_callable=external_dependency_branch,
    )

    # Branch tasks
    process_data = PythonOperator(
        task_id='process_data',
        python_callable=lambda: print("Processing data..."),
    )

    handle_failure = PythonOperator(
        task_id='handle_failure',
        python_callable=lambda: print("Handling failure..."),
    )

    # Composite branch
    quality_assessment = PythonOperator(
        task_id='quality_assessment',
        python_callable=lambda: {'quality_score': 0.8},
    )

    volume_assessment = PythonOperator(
        task_id='volume_assessment',
        python_callable=lambda: {'volume': 5000},
    )

    system_check = PythonOperator(
        task_id='system_check',
        python_callable=lambda: {'load': 0.5},
    )

    composite_branch_op = BranchPythonOperator(
        task_id='composite_branch',
        python_callable=composite_branch,
    )

    # Processing branches
    data_cleaning = PythonOperator(
        task_id='data_cleaning',
        python_callable=lambda: print("Cleaning data..."),
    )

    queue_processing = PythonOperator(
        task_id='queue_processing',
        python_callable=lambda: print("Queue processing..."),
    )

    distributed_processing = PythonOperator(
        task_id='distributed_processing',
        python_callable=lambda: print("Distributed processing..."),
    )

    standard_processing = PythonOperator(
        task_id='standard_processing',
        python_callable=lambda: print("Standard processing..."),
    )

    # Parallel branch decision
    parallel_branch = BranchPythonOperator(
        task_id='parallel_branch_decision',
        python_callable=parallel_branch_decision,
    )

    # Parallel branch tasks
    branch_a = PythonOperator(
        task_id='branch_a',
        python_callable=lambda: print("Branch A executing"),
    )

    branch_b = PythonOperator(
        task_id='branch_b',
        python_callable=lambda: print("Branch B executing"),
    )

    branch_c = PythonOperator(
        task_id='branch_c',
        python_callable=lambda: print("Branch C executing"),
    )

    branch_d = PythonOperator(
        task_id='branch_d',
        python_callable=lambda: print("Branch D executing"),
    )

    # Merge points
    merge_point_1 = EmptyOperator(
        task_id='merge_point_1',
        trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
    )

    merge_point_2 = EmptyOperator(
        task_id='merge_point_2',
        trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
    )

    final_merge = EmptyOperator(
        task_id='final_merge',
        trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
    )

    # Set dependencies
    start >> wait_external >> external_branch
    external_branch >> [process_data, handle_failure]
    process_data >> [quality_assessment, volume_assessment, system_check]
    [quality_assessment, volume_assessment, system_check] >> composite_branch_op
    composite_branch_op >> [data_cleaning, queue_processing, distributed_processing, standard_processing]
    [data_cleaning, queue_processing, distributed_processing, standard_processing] >> merge_point_1
    merge_point_1 >> parallel_branch
    parallel_branch >> [branch_a, branch_b, branch_c, branch_d]
    [branch_a, branch_b, branch_c, branch_d] >> merge_point_2
    [handle_failure, merge_point_2] >> final_merge

Performance Metrics

MetricDescriptionOptimization Strategy
Branch Decision TimeTime to evaluate branch conditionsOptimize condition logic
Task Skip RatePercentage of tasks skippedBalance branch granularity
Merge ComplexityTime to resolve merge dependenciesUse efficient trigger rules
Branch DepthNesting level of branchesLimit depth for readability
Parallel Branch CountNumber of concurrent branchesBalance parallelism vs complexity
XCom UsageData passed between branchesMinimize XCom data size
Error PropagationHow errors affect branchingImplement proper error handling
Resource UsageResources consumed by branchingOptimize branch logic

Best Practices

  1. Branch Granularity: Keep branch logic simple and focused. Avoid complex nested branches that are hard to understand and maintain.

  2. Merge Point Design: Always include proper merge points after branching. Use appropriate trigger rules to handle skipped branches.

  3. Error Handling: Implement error handling for branch decisions. Consider what happens when branch logic fails.

  4. Testing: Test all branch paths thoroughly. Use Airflow's testing utilities to verify branch behavior.

  5. Documentation: Document branch logic and decision criteria. Explain why certain paths are chosen under different conditions.

  6. Monitoring: Track branch execution metrics. Monitor which branches are taken most frequently.

  7. Performance: Optimize branch decision logic to minimize execution time. Avoid expensive operations in branch conditions.

  8. Maintainability: Use descriptive task IDs for branch tasks. Keep branch logic in separate functions for clarity.

  9. XCom Management: Minimize data passed through XCom in branch decisions. Use appropriate XCom backends for larger data.

  10. Alternative Patterns: Consider using ShortCircuitOperator for simple conditions. Use TriggerOperator for event-driven branching.

Key Takeaways:

  • Branch function b:Cβ†’2Pβˆ–{βˆ…}b: \mathcal{C} \rightarrow 2^{\mathcal{P}} \setminus \{\emptyset\} selects one or more paths
  • Expected task count: E[Nexec]=βˆ‘piβ‹…βˆ£Ti∣\mathbb{E}[N_{\text{exec}}] = \sum p_i \cdot |T_i|
  • Every branch requires a merge point with appropriate TriggerRule
  • Short-circuit cascade skips all downstream tasks until a join point
  • Use TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS at merge points
  • Limit branch nesting depth to ≀3\leq 3 for maintainability

See also: Kafka Connect (kafka/03), PySpark Submit (pyspark/19), Data Engineering Orchestration (data-engineering/017)

Advertisement

Need Expert Airflow Help?

Get personalized DAG design, scheduling optimization, or production Airflow consulting.

Advertisement