Sensors and Operators in Apache Airflow

Free Lesson

Advertisement

Sensors and Operators in Apache Airflow

Architecture Diagram

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    SENSOR ARCHITECTURE                                        β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    SENSOR COMPONENTS                                 β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  BaseSensor                                                  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ poke_interval (default: 60s)                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ timeout (default: 7 * 24 * 3600s)                       β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ soft_fail (default: False)                              β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ mode (default: 'poke')                                  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  └── exponential_backoff (default: False)                    β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Sensor Modes                                                β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ Poke Mode                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚   └── Active polling (uses worker slot)                  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ Reschedule Mode                                         β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚   └── Release worker between pokes                       β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  └── Deferred Mode                                           β”‚   β”‚   β”‚
β”‚  β”‚  β”‚      └── Async waiting (uses triggerer)                     β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    SENSOR TYPES                                      β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  File System Sensors                                         β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ FileSensor                                              β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ S3KeySensor                                             β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ GCSObjectExistenceSensor                                β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  └── LocalFilesystemSensor                                   β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  HTTP Sensors                                                β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ HttpSensor                                              β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ SimpleHttpSensor                                        β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  └── WebHdfsSensor                                           β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Database Sensors                                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ SqlSensor                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ ExternalTaskSensor                                      β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  └── S3PrefixSensor                                          β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    SENSOR MODES COMPARISON                                   β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    POKE MODE                                         β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚   β”‚
β”‚  β”‚  β”‚ Sensor  │─────▢│  Poke   │─────▢│ Wait    │─────▢│ Repeat  β”‚ β”‚   β”‚
β”‚  β”‚  β”‚ Starts  β”‚      β”‚ Check   β”‚      β”‚ Intervalβ”‚      β”‚ Until   β”‚ β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚   β”‚
β”‚  β”‚       β”‚                β”‚                β”‚                β”‚        β”‚   β”‚
β”‚  β”‚       β”‚   Uses worker  β”‚   Blocks       β”‚   Keeps        β”‚        β”‚   β”‚
β”‚  β”‚       β”‚   slot         β”‚   execution    β”‚   slot         β”‚        β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  Characteristics:                                                   β”‚   β”‚
β”‚  β”‚  β€’ Simple implementation                                           β”‚   β”‚
β”‚  β”‚  β€’ Uses worker slot continuously                                   β”‚   β”‚
β”‚  β”‚  β€’ Good for short waits (< 5 minutes)                             β”‚   β”‚
β”‚  β”‚  β€’ Not resource efficient for long waits                           β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    RESCHEDULE MODE                                   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚   β”‚
β”‚  β”‚  β”‚ Sensor  │─────▢│  Poke   │─────▢│ Release │─────▢│Re-queue β”‚ β”‚   β”‚
β”‚  β”‚  β”‚ Starts  β”‚      β”‚ Check   β”‚      β”‚ Worker  β”‚      β”‚         β”‚ β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚   β”‚
β”‚  β”‚       β”‚                β”‚                β”‚                β”‚        β”‚   β”‚
β”‚  β”‚       β”‚   Frees worker β”‚   No blocking  β”‚   Reuses       β”‚        β”‚   β”‚
β”‚  β”‚       β”‚   slot         β”‚   during wait  β”‚   slot         β”‚        β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  Characteristics:                                                   β”‚   β”‚
β”‚  β”‚  β€’ Releases worker slot between pokes                              β”‚   β”‚
β”‚  β”‚  β€’ More resource efficient for long waits                          β”‚   β”‚
β”‚  β”‚  β€’ Slightly higher latency due to re-queueing                      β”‚   β”‚
β”‚  β”‚  β€’ Good for waits > 5 minutes                                      β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    DEFERRED MODE                                     β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚   β”‚
β”‚  β”‚  β”‚ Operator│─────▢│ Trigger │─────▢│Triggerer│─────▢│ Resume  β”‚ β”‚   β”‚
β”‚  β”‚  β”‚ Defers  β”‚      β”‚ Created β”‚      β”‚ Waits   β”‚      β”‚ Operatorβ”‚ β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚   β”‚
β”‚  β”‚       β”‚                β”‚                β”‚                β”‚        β”‚   β”‚
β”‚  β”‚       β”‚   No worker    β”‚   Async wait   β”‚   No worker    β”‚        β”‚   β”‚
β”‚  β”‚       β”‚   slot used    β”‚   outside      β”‚   slot needed  β”‚        β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  Characteristics:                                                   β”‚   β”‚
β”‚  β”‚  β€’ Most resource efficient                                         β”‚   β”‚
β”‚  β”‚  β€’ Uses async triggers                                             β”‚   β”‚
β”‚  β”‚  β€’ No worker resources used during wait                            β”‚   β”‚
β”‚  β”‚  β€’ Best for long waits (hours/days)                                β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    SENSOR OPTIMIZATION PATTERNS                              β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    EXPONENTIAL BACKOFF                               β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  Poke 1    Poke 2    Poke 3    Poke 4    Poke 5                   β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”     β”Œβ”€β”€β”€β”     β”Œβ”€β”€β”€β”     β”Œβ”€β”€β”€β”     β”Œβ”€β”€β”€β”                   β”‚   β”‚
β”‚  β”‚  β”‚ 1sβ”‚     β”‚ 2sβ”‚     β”‚ 4sβ”‚     β”‚ 8sβ”‚     β”‚16sβ”‚                   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”˜     β””β”€β”€β”€β”˜     β””β”€β”€β”€β”˜     β””β”€β”€β”€β”˜     β””β”€β”€β”€β”˜                   β”‚   β”‚
β”‚  β”‚    β”‚         β”‚         β”‚         β”‚         β”‚                       β”‚   β”‚
β”‚  β”‚    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                       β”‚   β”‚
β”‚  β”‚           Exponentially increasing intervals                       β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  Benefits:                                                          β”‚   β”‚
β”‚  β”‚  β€’ Reduces load on external systems                                β”‚   β”‚
β”‚  β”‚  β€’ Conserves worker resources                                      β”‚   β”‚
β”‚  β”‚  β€’ Good for unpredictable wait times                               β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    SENSOR CHAINING                                   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚   β”‚
β”‚  β”‚  β”‚Sensor A │─────▢│Sensor B │─────▢│Sensor C │─────▢│  Task   β”‚ β”‚   β”‚
β”‚  β”‚  β”‚(File)   β”‚      β”‚(HTTP)   β”‚      β”‚(DB)     β”‚      β”‚         β”‚ β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚   β”‚
β”‚  β”‚       β”‚                β”‚                β”‚                β”‚        β”‚   β”‚
β”‚  β”‚       β”‚   Waits for    β”‚   Then waits   β”‚   Finally      β”‚        β”‚   β”‚
β”‚  β”‚       β”‚   file         β”‚   for API      β”‚   for DB       β”‚        β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  Benefits:                                                          β”‚   β”‚
β”‚  β”‚  β€’ Complex dependency chains                                       β”‚   β”‚
β”‚  β”‚  β€’ Fine-grained control                                            β”‚   β”‚
β”‚  β”‚  β€’ Error isolation                                                 β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    SENSOR AGGREGATION                                β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                                       β”‚   β”‚
β”‚  β”‚  β”‚Sensor A │─────┐                                                 β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚                                                 β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”‚   β”‚
β”‚  β”‚  β”‚Sensor B │─────┼────▢│  Join   │─────▢│  Task   β”‚              β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚     β”‚ Operatorβ”‚      β”‚         β”‚              β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜              β”‚   β”‚
β”‚  β”‚  β”‚Sensor C β”‚β”€β”€β”€β”€β”€β”˜                                                 β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                                       β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  Benefits:                                                          β”‚   β”‚
β”‚  β”‚  β€’ Wait for multiple conditions                                    β”‚   β”‚
β”‚  β”‚  β€’ Parallel monitoring                                             β”‚   β”‚
β”‚  β”‚  β€’ Reduced overall wait time                                       β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Formal Definitions

DfSensor

A sensor is a specialized operator that polls an external condition at intervals Ξ”tpoke\Delta t_{\text{poke}} until the condition returns True or a timeout Ο„\tau is exceeded. A sensor is defined as S=(poke,Ξ”tpoke,Ο„,mode)S = (\text{poke}, \Delta t_{\text{poke}}, \tau, \text{mode}) where poke() is the condition check function.

DfPoke Interval

The poke interval Ξ”tpoke\Delta t_{\text{poke}} is the time (in seconds) between consecutive condition checks by a sensor. The sensor thread is occupied during each poke in poke mode, or released and re-queued in reschedule mode.

DfDeferred Mode

Deferred mode is a sensor execution strategy where the operator creates an async trigger and releases the worker slot entirely. The triggerer service monitors the condition without consuming worker resources. This is the most efficient mode for long waits.

Detailed Explanation

Sensor Fundamentals

Sensors are specialized operators that wait for certain conditions to be met before proceeding. They poll external systems at regular intervals until a condition is satisfied or a timeout is reached. Understanding sensor behavior is crucial for building efficient workflows.

Poke Method: The poke method is the core of every sensor. It returns True when the condition is met and False otherwise. The sensor repeatedly calls this method until it returns True or the timeout is exceeded.

Timeout Handling: Sensors have a configurable timeout (default: 7 days). If the condition isn't met within the timeout, the sensor fails. Set appropriate timeouts based on expected wait times.

Soft Fail: The soft_fail parameter determines whether sensor failure marks the task as failed or skipped. Set soft_fail=True for optional conditions that shouldn't fail the entire workflow.

Sensor Modes

Poke Mode: The default mode where the sensor actively polls the condition. It holds a worker slot during the entire wait period. This is simple but resource-intensive for long waits.

Reschedule Mode: The sensor releases the worker slot between pokes and re-queues itself. This is more resource-efficient but adds latency due to re-queueing. Use for waits longer than 5 minutes.

Deferred Mode: The sensor uses Airflow's triggerer to wait asynchronously. No worker resources are used during the wait. This is the most efficient mode for long waits but requires triggerer service.

Ttextwait=ncdotDeltattextpoke+sumi=1nttextpoke,iT_{\\text{wait}} = n \\cdot \\Delta t_{\\text{poke}} + \\sum_{i=1}^{n} t_{\\text{poke},i}

Exponential Backoff Interval

Deltati=minleft(Deltat0cdotmi,;Deltatmaxright)\\Delta t_i = \\min\\left(\\Delta t_0 \\cdot m^i,\\; \\Delta t_{\\max}\\right)

Here,

  • Ξ”t0\Delta t_0=Initial poke interval
  • mm=Multiplier (typically 2.0)
  • ii=Poke attempt number (0-indexed)
  • Ξ”tmax⁑\Delta t_{\max}=Maximum poke interval cap

Worker Slot Utilization (Poke Mode)

Utextslot=fracTtextwaitTtextwait+Ttexttasktimes100U_{\\text{slot}} = \\frac{T_{\\text{wait}}}{T_{\\text{wait}} + T_{\\text{task}}} \\times 100\\%

Here,

  • TwaitT_{\text{wait}}=Total sensor wait time
  • TtaskT_{\text{task}}=Downstream task execution time

ThSensor Timeout Guarantee

If a sensor has timeout Ο„\tau and poke interval Ξ”tpoke\Delta t_{\text{poke}}, the sensor is guaranteed to either succeed or fail within Tmax=Ο„+tpokeT_{\text{max}} = \tau + t_{\text{poke}} where tpoket_{\text{poke}} is the maximum poke execution time. Proof: The sensor checks at most βŒˆΟ„/Ξ”tpokeβŒ‰+1\lceil \tau / \Delta t_{\text{poke}} \rceil + 1 times before exceeding the timeout.

In poke mode, the sensor occupies a worker slot for the entire wait duration. For kk sensors waiting simultaneously, the effective parallelism is reduced by kk. Use reschedule or deferred mode to free slots during waits.

For exponential backoff sensors, set max_wait to prevent excessively long intervals. A common pattern is Ξ”tmax⁑=300\Delta t_{\max} = 300 seconds (5 minutes) to balance responsiveness with external system load.

Advanced Sensor Patterns

Exponential Backoff: Sensors can use exponential backoff for poke intervals. The interval starts at poke_interval and doubles with each poke, up to max_wait. This reduces load on external systems while maintaining responsiveness.

Sensor Chaining: Multiple sensors can be chained to wait for complex conditions. Each sensor waits for its specific condition before allowing the next sensor to start.

Sensor Aggregation: Use BranchPythonOperator or custom logic to wait for multiple conditions in parallel. This pattern reduces overall wait time compared to sequential sensor chaining.

Custom Sensors: Create custom sensors for specialized conditions. Inherit from BaseSensorOperator and implement the poke method with your specific logic.

External Task Sensor

The ExternalTaskSensor waits for a task in another DAG or the same DAG to complete. It's useful for cross-DAG dependencies and complex workflow orchestration.

Configuration: Specify the external_dag_id, external_task_id, and optionally execution_date. The sensor checks if the external task has reached the expected state.

States: By default, the sensor waits for the task to succeed. Use allowed_states to wait for specific states like success, failed, or skipped.

Execution Date Matching: The sensor can match execution dates to ensure proper synchronization between DAGs. Use execution_delta or execution_date_fn for flexible date matching.

Key Concepts Table

Sensor TypeModeBest ForResource UsageLatency
FileSensorPokeShort file waitsHighLow
S3KeySensorRescheduleS3 object waitsMediumMedium
HttpSensorDeferredAPI availabilityLowHigh
SqlSensorPokeDatabase conditionsHighLow
ExternalTaskSensorPoke/RescheduleCross-DAG depsMediumMedium
TimeSensorPokeScheduled waitsHighLow

Code Examples

Advanced Sensor Implementation

# advanced_sensors.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
import requests
import json

class AdvancedHttpSensor(BaseSensorOperator):
    """
    Advanced HTTP sensor with multiple condition support.

    This sensor supports multiple conditions, custom headers,
    and response validation.
    """

    template_fields = ('endpoint', 'headers', 'expected_status')
    ui_color = '#4285F4'

    @apply_defaults
    def __init__(
        self,
        endpoint: str,
        method: str = 'GET',
        headers: dict = None,
        expected_status: int = 200,
        json_path: str = None,
        expected_value: any = None,
        auth_type: str = 'bearer',
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.endpoint = endpoint
        self.method = method
        self.headers = headers or {}
        self.expected_status = expected_status
        self.json_path = json_path
        self.expected_value = expected_value
        self.auth_type = auth_type

    def poke(self, context):
        """Check if the HTTP condition is met."""
        try:
            # Get connection details
            from airflow.hooks.base import BaseHook
            conn = BaseHook.get_connection(self.http_conn_id)

            # Build request
            url = f"{conn.schema}://{conn.host}:{conn.port}{self.endpoint}"
            headers = {**self.headers}

            # Add authentication
            if self.auth_type == 'bearer':
                headers['Authorization'] = f"Bearer {conn.get_password()}"
            elif self.auth_type == 'basic':
                import base64
                credentials = base64.b64encode(
                    f"{conn.login}:{conn.get_password()}".encode()
                ).decode()
                headers['Authorization'] = f"Basic {credentials}"

            # Make request
            response = requests.request(
                method=self.method,
                url=url,
                headers=headers,
                timeout=30,
            )

            # Check status code
            if response.status_code != self.expected_status:
                self.log.warning(
                    f"Expected status {self.expected_status}, "
                    f"got {response.status_code}"
                )
                return False

            # Check JSON path if specified
            if self.json_path and self.expected_value is not None:
                data = response.json()
                actual_value = self._get_json_value(data, self.json_path)

                if actual_value != self.expected_value:
                    self.log.warning(
                        f"Expected value {self.expected_value} at path "
                        f"{self.json_path}, got {actual_value}"
                    )
                    return False

            self.log.info("HTTP condition met successfully")
            return True

        except Exception as e:
            self.log.error(f"HTTP sensor failed: {str(e)}")
            return False

    def _get_json_value(self, data: dict, path: str):
        """Get value from JSON using dot notation path."""
        keys = path.split('.')
        value = data
        for key in keys:
            if isinstance(value, dict):
                value = value.get(key)
            else:
                return None
        return value

class MultiConditionSensor(BaseSensorOperator):
    """
    Sensor that waits for multiple conditions to be met.
    """

    @apply_defaults
    def __init__(
        self,
        conditions: list,
        operator: str = 'AND',
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.conditions = conditions
        self.operator = operator

    def poke(self, context):
        """Check if conditions are met."""
        results = []

        for condition in self.conditions:
            condition_type = condition.get('type')
            result = False

            if condition_type == 'file':
                result = self._check_file_condition(condition)
            elif condition_type == 'http':
                result = self._check_http_condition(condition)
            elif condition_type == 'database':
                result = self._check_database_condition(condition)

            results.append(result)

        if self.operator == 'AND':
            return all(results)
        else:  # OR
            return any(results)

    def _check_file_condition(self, condition: dict) -> bool:
        """Check file-based condition."""
        import os
        path = condition.get('path')
        exists = condition.get('exists', True)

        file_exists = os.path.exists(path)
        return file_exists == exists

    def _check_http_condition(self, condition: dict) -> bool:
        """Check HTTP-based condition."""
        url = condition.get('url')
        expected_status = condition.get('expected_status', 200)

        try:
            response = requests.get(url, timeout=10)
            return response.status_code == expected_status
        except Exception:
            return False

    def _check_database_condition(self, condition: dict) -> bool:
        """Check database-based condition."""
        from airflow.providers.postgres.hooks.postgres import PostgresHook

        hook = PostgresHook(postgres_conn_id=condition.get('conn_id'))
        query = condition.get('query')

        result = hook.get_first(query)
        return bool(result)

# Usage in DAG
with DAG(
    'advanced_sensor_dag',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='Advanced sensor examples',
    schedule_interval=timedelta(hours=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['sensors', 'advanced'],
) as dag:

    # Advanced HTTP sensor
    api_sensor = AdvancedHttpSensor(
        task_id='api_availability_sensor',
        endpoint='/api/v1/status',
        expected_status=200,
        json_path='status',
        expected_value='healthy',
        mode='reschedule',
        poke_interval=30,
        timeout=300,
    )

    # Multi-condition sensor
    multi_condition = MultiConditionSensor(
        task_id='multi_condition_sensor',
        conditions=[
            {'type': 'file', 'path': '/data/input.csv', 'exists': True},
            {'type': 'http', 'url': 'http://api:8080/health', 'expected_status': 200},
            {'type': 'database', 'conn_id': 'postgres', 'query': 'SELECT 1'},
        ],
        operator='AND',
        mode='poke',
        poke_interval=60,
    )

    # Process task
    process = PythonOperator(
        task_id='process_data',
        python_callable=lambda: print("Processing data..."),
    )

    api_sensor >> multi_condition >> process

External Task Sensor Patterns

# external_task_patterns.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.sensors.time_sensor import TimeSensor
from airflow.utils.state import State

def process_data(**context):
    """Process data after external task completes."""
    print("Processing data after external dependency met")

def generate_report(**context):
    """Generate report after processing."""
    print("Generating report...")

# DAG 1: Upstream DAG (external dependency)
with DAG(
    'upstream_data_pipeline',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='Upstream data pipeline',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['external', 'upstream'],
) as upstream_dag:

    def extract_data(**context):
        """Extract data from source."""
        print("Extracting data...")
        return {"status": "success"}

    def transform_data(**context):
        """Transform extracted data."""
        print("Transforming data...")
        return {"status": "success"}

    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_data,
    )

    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_data,
    )

    extract >> transform

# DAG 2: Downstream DAG (waits for upstream)
with DAG(
    'downstream_data_pipeline',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='Downstream data pipeline',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['external', 'downstream'],
) as downstream_dag:

    # Wait for upstream DAG to complete
    wait_for_upstream = ExternalTaskSensor(
        task_id='wait_for_upstream',
        external_dag_id='upstream_data_pipeline',
        external_task_id='transform',
        allowed_states=[State.SUCCESS],
        failed_states=[State.FAILED],
        execution_delta=timedelta(hours=1),
        mode='reschedule',
        poke_interval=300,
        timeout=3600,
    )

    # Wait for specific execution date
    wait_for_yesterday = ExternalTaskSensor(
        task_id='wait_for_yesterday',
        external_dag_id='upstream_data_pipeline',
        external_task_id='transform',
        execution_date_fn=lambda exec_date: exec_date - timedelta(days=1),
        mode='reschedule',
        poke_interval=300,
    )

    # Process after upstream completes
    process = PythonOperator(
        task_id='process',
        python_callable=process_data,
    )

    # Generate report
    report = PythonOperator(
        task_id='report',
        python_callable=generate_report,
    )

    [wait_for_upstream, wait_for_yesterday] >> process >> report

# DAG 3: Complex external dependencies
with DAG(
    'complex_external_dag',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='Complex external dependency example',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['external', 'complex'],
) as complex_dag:

    # Wait for multiple external tasks
    wait_task_1 = ExternalTaskSensor(
        task_id='wait_task_1',
        external_dag_id='upstream_data_pipeline',
        external_task_id='extract',
        mode='reschedule',
        poke_interval=300,
    )

    wait_task_2 = ExternalTaskSensor(
        task_id='wait_task_2',
        external_dag_id='another_pipeline',
        external_task_id='load',
        mode='reschedule',
        poke_interval=300,
    )

    # Time sensor
    time_check = TimeSensor(
        task_id='time_check',
        target_time=datetime.strptime('09:00', '%H:%M').time(),
        mode='reschedule',
    )

    # Process after all conditions met
    process = PythonOperator(
        task_id='process',
        python_callable=process_data,
    )

    [wait_task_1, wait_task_2, time_check] >> process

Sensor Optimization Patterns

# sensor_optimization.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
import time
import random

class OptimizedFileSensor(BaseSensorOperator):
    """
    File sensor with optimization features.
    """

    template_fields = ('filepath',)
    ui_color = '#FF9800'

    @apply_defaults
    def __init__(
        self,
        filepath: str,
        recursive: bool = False,
        min_file_size: int = 0,
        max_file_age_days: int = None,
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.filepath = filepath
        self.recursive = recursive
        self.min_file_size = min_file_size
        self.max_file_age_days = max_file_age_days

    def poke(self, context):
        """Check if file meets all criteria."""
        import os
        from datetime import datetime, timedelta

        if not os.path.exists(self.filepath):
            return False

        # Check file size
        file_size = os.path.getsize(self.filepath)
        if file_size < self.min_file_size:
            self.log.info(
                f"File size {file_size} < minimum {self.min_file_size}"
            )
            return False

        # Check file age
        if self.max_file_age_days:
            file_mtime = os.path.getmtime(self.filepath)
            file_age = datetime.now() - datetime.fromtimestamp(file_mtime)
            if file_age > timedelta(days=self.max_file_age_days):
                self.log.info(f"File is too old: {file_age}")
                return False

        self.log.info(f"File {self.filepath} meets all criteria")
        return True

class ExponentialBackoffSensor(BaseSensorOperator):
    """
    Sensor with exponential backoff.
    """

    @apply_defaults
    def __init__(
        self,
        initial_interval: int = 10,
        max_interval: int = 300,
        multiplier: float = 2.0,
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.initial_interval = initial_interval
        self.max_interval = max_interval
        self.multiplier = multiplier
        self._current_interval = initial_interval

    def poke(self, context):
        """Implement exponential backoff logic."""
        # Check condition
        condition_met = self._check_condition()

        if condition_met:
            return True

        # Update interval for next poke
        self._current_interval = min(
            self._current_interval * self.multiplier,
            self.max_interval
        )

        self.log.info(
            f"Condition not met. Next poke in {self._current_interval} seconds"
        )

        # Sleep for current interval
        time.sleep(self._current_interval)

        return False

    def _check_condition(self):
        """Override this method to implement your condition check."""
        # Example: Random condition for demonstration
        return random.random() < 0.1  # 10% chance of success

with DAG(
    'sensor_optimization_dag',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='Sensor optimization patterns',
    schedule_interval=timedelta(hours=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['sensors', 'optimization'],
) as dag:

    # Optimized file sensor
    file_sensor = OptimizedFileSensor(
        task_id='optimized_file_sensor',
        filepath='/data/input.csv',
        min_file_size=1024,  # At least 1KB
        max_file_age_days=1,  # Not older than 1 day
        mode='reschedule',
        poke_interval=60,
        timeout=3600,
    )

    # Exponential backoff sensor
    backoff_sensor = ExponentialBackoffSensor(
        task_id='exponential_backoff_sensor',
        initial_interval=10,
        max_interval=300,
        multiplier=2.0,
        mode='poke',
        timeout=3600,
    )

    # Process task
    process = PythonOperator(
        task_id='process',
        python_callable=lambda: print("Processing data..."),
    )

    file_sensor >> backoff_sensor >> process

Performance Metrics

MetricDescriptionOptimization Strategy
Poke IntervalTime between condition checksAdjust based on expected wait time
Worker Slot UsageResources consumed during waitUse reschedule or deferred mode
LatencyTime from condition met to task startMinimize poke interval for time-sensitive tasks
Timeout RatePercentage of sensors that timeoutSet appropriate timeouts, monitor external systems
Resource EfficiencyWorker resources used per waitUse deferred mode for long waits
Error RatePercentage of failed sensor checksImplement proper error handling
False Positive RateIncorrect condition detectionValidate sensor logic thoroughly
ThroughputNumber of sensors handled concurrentlyOptimize poke intervals, use deferred mode

Best Practices

  1. Mode Selection: Choose the appropriate mode based on wait time. Use poke for short waits (< 5 min), reschedule for medium waits (5-60 min), and deferred for long waits (> 1 hour).

  2. Timeout Configuration: Set reasonable timeouts based on expected wait times. Consider external system reliability and implement graceful failure handling.

  3. Poke Interval Optimization: Balance between responsiveness and resource usage. Use exponential backoff for unpredictable wait times.

  4. Error Handling: Implement proper error handling in sensor logic. Use soft_fail for optional conditions. Log meaningful error messages.

  5. Resource Management: Monitor sensor resource usage. Use pools to limit concurrent sensors. Implement sensor prioritization.

  6. Testing: Test sensors in isolation with mock external dependencies. Verify timeout behavior and error handling. Test different modes.

  7. Monitoring: Track sensor performance metrics. Monitor poke intervals, success rates, and resource usage. Set up alerts for anomalies.

  8. Documentation: Document sensor dependencies and expected behavior. Include timeout and retry configurations. Provide troubleshooting guidance.

  9. Maintenance: Regularly review and update sensor configurations. Remove obsolete sensors. Update timeout values based on changing external systems.

  10. Security: Implement proper authentication for external system access. Use secrets management for credentials. Validate input parameters.

Key Takeaways:

  • Sensors poll conditions at interval Ξ”tpoke\Delta t_{\text{poke}} until success or timeout Ο„\tau
  • Exponential backoff: Ξ”ti=min⁑(Ξ”t0β‹…mi,Ξ”tmax⁑)\Delta t_i = \min(\Delta t_0 \cdot m^i, \Delta t_{\max})
  • Timeout guarantee: sensor resolves within Tmax⁑=Ο„+tpokeT_{\max} = \tau + t_{\text{poke}}
  • Poke mode uses worker slots; reschedule mode releases between pokes; deferred mode uses triggerer
  • Worker slot utilization in poke mode: U=Twait/(Twait+Ttask)U = T_{\text{wait}} / (T_{\text{wait}} + T_{\text{task}})
  • Use ExternalTaskSensor for cross-DAG dependencies with execution date matching

See also: Kafka Connect (kafka/03), PySpark Submit (pyspark/19), Data Engineering Orchestration (data-engineering/017)

Advertisement

Need Expert Airflow Help?

Get personalized DAG design, scheduling optimization, or production Airflow consulting.

Advertisement