Scheduling and Triggers in Apache Airflow

Free Lesson

Advertisement

Scheduling and Triggers in Apache Airflow

Architecture Diagram

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    SCHEDULING ARCHITECTURE                                   β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    SCHEDULER COMPONENTS                              β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Scheduler Heartbeat                                        β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ Check for scheduled DAG runs                           β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ Create DagRun objects                                   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ Queue task instances                                   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  └── Update task states                                     β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Timetable Engine                                           β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ CronExpression                                         β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ DeltaTimetable                                          β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ Custom Timetables                                       β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  └── Event-driven Timetables                                β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    TRIGGER TYPES                                     β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Time-based Triggers                                        β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ Cron schedules                                          β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ Interval-based schedules                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  └── Date-based schedules                                   β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Event-based Triggers                                       β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ File sensors                                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ HTTP sensors                                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  └── Database sensors                                        β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Data-driven Triggers                                       β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ Dataset triggers                                        β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ External triggers                                      β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  └── API triggers                                            β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    CRON EXPRESSION PATTERNS                                  β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚  CRON FORMAT: minute hour day_of_month month day_of_week           β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Common Patterns                                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ @hourly    β†’ 0 * * * *                                 β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ @daily     β†’ 0 0 * * *                                 β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ @weekly    β†’ 0 0 * * 0                                 β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ @monthly   β†’ 0 0 1 * *                                 β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ @yearly    β†’ 0 0 1 1 *                                 β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  └── @quarterly β†’ 0 0 1 */3 *                               β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Advanced Patterns                                          β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ Every 15 minutes: */15 * * * *                         β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ Every 2 hours: 0 */2 * * *                             β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ Weekdays only: 0 0 * * 1-5                             β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ First Monday: 0 0 1-7 * 1                              β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ Last day of month: 0 0 28-31 * *                       β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  └── Specific time: 30 8 * * 1-5                            β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    CRON VISUALIZATION                                β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  Minute:  0 1 2 3 4 5 6 7 8 9 10 ... 59                           β”‚   β”‚
β”‚  β”‚            ● β—‹ β—‹ β—‹ β—‹ β—‹ β—‹ β—‹ β—‹ β—‹ β—‹  ... β—‹  (* = all)                β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  Hour:    0 1 2 3 4 5 6 7 8 9 10 ... 23                           β”‚   β”‚
β”‚  β”‚            ● β—‹ β—‹ β—‹ β—‹ β—‹ β—‹ β—‹ β—‹ β—‹ β—‹  ... β—‹  (* = all)                β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  Day:     1 2 3 4 5 6 7 8 9 10 ... 31                             β”‚   β”‚
β”‚  β”‚            ● β—‹ β—‹ β—‹ β—‹ β—‹ β—‹ β—‹ β—‹ β—‹ β—‹  ... β—‹  (* = all)                β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  Month:   1 2 3 4 5 6 7 8 9 10 11 12                              β”‚   β”‚
β”‚  β”‚            ● β—‹ β—‹ β—‹ β—‹ β—‹ β—‹ β—‹ β—‹ β—‹ β—‹  β—‹  (* = all)                    β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  Weekday: 0 1 2 3 4 5 6                                            β”‚   β”‚
β”‚  β”‚            β—‹ ● ● ● ● ● β—‹  (Mon-Fri)                               β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    TRIGGER ARCHITECTURE                                      β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    TRIGGER COMPONENTS                                β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Triggerer Service                                          β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ Async execution of triggers                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ Event loop management                                   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  └── Trigger state tracking                                  β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Trigger Types                                              β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ BaseTrigger                                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ FileTrigger                                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ HttpTrigger                                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ SqlTrigger                                             β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  └── CustomTriggers                                         β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Trigger Events                                             β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ TRIGGER_EVENT                                          β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ EVENT_PAYLOAD                                          β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  └── EVENT_TIMESTAMP                                        β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    TRIGGER FLOW                                      β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚   β”‚
β”‚  β”‚  β”‚Operator │─────▢│ Trigger │─────▢│Triggerer│─────▢│Schedulerβ”‚ β”‚   β”‚
β”‚  β”‚  β”‚ Defers  β”‚      β”‚ Created β”‚      β”‚ Executesβ”‚      β”‚ Resumes β”‚ β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚   β”‚
β”‚  β”‚       β”‚                β”‚                β”‚                β”‚        β”‚   β”‚
β”‚  β”‚       β–Ό                β–Ό                β–Ό                β–Ό        β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚   β”‚
β”‚  β”‚  β”‚Operator β”‚    β”‚Stored inβ”‚    β”‚Event    β”‚    β”‚Task     β”‚      β”‚   β”‚
β”‚  β”‚  β”‚Pauses   β”‚    β”‚Database β”‚    β”‚Loop     β”‚    β”‚Resumes  β”‚      β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Formal Definitions

DfSchedule Interval

The schedule interval Ξ”t\Delta t is the time between consecutive DAG runs. It can be defined as a cron expression, a timedelta object, or a predefined shorthand (@daily, @hourly). The scheduler creates a new DagRun when nowβ‰₯last_run_end+Ξ”t\text{now} \geq \text{last\_run\_end} + \Delta t.

DfTrigger

A trigger is an asynchronous event handler that allows operators to defer execution without consuming worker resources. A trigger T=(run,serialize,event)T = (\text{run}, \text{serialize}, \text{event}) implements an async run() method yielding TriggerEvent when a condition is met.

DfTimetable

A timetable is Airflow's scheduling abstraction that determines when DAG runs should be created. A timetable M=(next_run,infer_interval)M = (\text{next\_run}, \text{infer\_interval}) maps the last run to the next scheduled execution time, supporting cron, interval, and event-driven patterns.

Detailed Explanation

Cron Expressions

Cron expressions are the foundation of time-based scheduling in Airflow. They provide a flexible way to define recurring schedules using a simple text format. Understanding cron syntax is essential for creating accurate schedules.

Cron Format: A cron expression consists of five fields: minute, hour, day of month, month, and day of week. Each field can contain specific values, ranges, lists, or wildcards. Airflow supports extended cron syntax with special characters like / for intervals and , for lists.

Special Characters:

  • * matches any value
  • , separates multiple values (e.g., 1,3,5)
  • - specifies ranges (e.g., 1-5)
  • / specifies steps (e.g., */15 for every 15 minutes)

Common Pitfalls: Cron expressions can be tricky. For example, 0 */2 * * * runs every 2 hours at minute 0, not every 2 minutes. Day of week ranges are 0-6 (Sunday=0). Month values are 1-12. Always test cron expressions before deploying to production.

Airflow Predefined Schedules: Airflow provides shorthand schedules like @hourly, @daily, @weekly, @monthly, and @yearly. These are convenient for common scheduling patterns and are easier to read than raw cron expressions.

Deltat=ttextnextβˆ’ttextlastcompleted\\Delta t = t_{\\text{next}} - t_{\\text{last\\_completed}}

Catchup Run Count

Ntextcatchup=leftlfloorfracttextnowβˆ’ttextstartDeltatrightrfloorN_{\\text{catchup}} = \\left\\lfloor \\frac{t_{\\text{now}} - t_{\\text{start}}}{\\Delta t} \\right\\rfloor

Here,

  • tnowt_{\text{now}}=Current time
  • tstartt_{\text{start}}=DAG start_date
  • Ξ”t\Delta t=Schedule interval

ThSchedule Drift Bound

For a scheduler heartbeat interval Ξ”thb\Delta t_{\text{hb}}, the maximum schedule drift is bounded by Ξ΄max⁑=Ξ”thb+Ο΅parse\delta_{\max} = \Delta t_{\text{hb}} + \epsilon_{\text{parse}} where Ο΅parse\epsilon_{\text{parse}} is the DAG parsing latency. Reducing Ξ”thb\Delta t_{\text{hb}} improves scheduling accuracy at the cost of increased CPU utilization.

The catchup=False parameter prevents Airflow from creating backfill runs for missed intervals. This is essential for new DAGs to avoid unintended historical execution. Use catchup=True only when explicitly needed for data recovery.

Use timezone-aware datetime objects throughout your DAGs. Airflow stores timestamps in UTC internally but displays them in the configured timezone. Be explicit about timezone handling to avoid scheduling inconsistencies.

Timetables

Timetables are Airflow's modern scheduling abstraction, introduced in Airflow 2.2. They provide more flexibility than cron expressions and support complex scheduling patterns.

CronExpression Timetable: The default timetable that uses cron expressions. It's backward compatible with existing DAGs and provides familiar scheduling syntax.

DeltaTimetable: Schedules DAGs at fixed intervals using timedelta objects. It's simpler than cron for basic interval scheduling and handles daylight saving time transitions more predictably.

Custom Timetables: You can create custom timetables for specialized scheduling requirements. Custom timetables inherit from Timetable and implement methods for determining when runs should be created.

Event-driven Timetables: These timetables create runs based on external events rather than time. They're useful for event-driven architectures where workflows are triggered by data availability or external signals.

Trigger Mechanism

Triggers enable deferrable execution, allowing operators to pause and resume based on external conditions. This is more efficient than polling because it doesn't consume worker resources while waiting.

Trigger Architecture: The triggerer service runs triggers asynchronously. When an operator defers, it creates a trigger and pauses. The triggerer monitors the trigger and resumes the operator when the condition is met.

Trigger Types: Airflow provides built-in triggers for common patterns like file availability, HTTP endpoints, and database queries. You can also create custom triggers for specialized use cases.

Trigger Events: Triggers emit events when conditions are met. These events can include payload data that the resumed operator can use for further processing.

Scheduling Best Practices

Idempotency: Ensure that DAG runs are idempotent. If a run is triggered multiple times, it should produce the same result. This is crucial for handling backfill and retry scenarios.

Timezone Awareness: Always use timezone-aware datetime objects. Airflow stores timestamps in UTC internally but displays them in the configured timezone. Be explicit about timezone handling in your DAGs.

Catchup Configuration: The catchup parameter determines whether Airflow should create runs for missed intervals. Set it to False for new DAGs to avoid unintended backfill.

Max Active Runs: Use max_active_runs to control concurrent execution. This prevents resource contention and ensures orderly processing.

Dependencies: Use depends_on_past carefully. It can create bottlenecks if not managed properly. Consider using dataset dependencies for more flexible orchestration.

Key Concepts Table

ComponentPurposeExampleUse Case
Cron ExpressionTime-based scheduling0 0 * * *Daily execution
DeltaTimetableInterval schedulingtimedelta(hours=1)Hourly processing
TriggerAsync waitingFileTriggerFile availability
SensorPolling waitingFileSensorSimple conditions
DatasetData-driven schedulingDataset('s3://data')Event-driven
BackfillHistorical runscatchup=TrueData recovery
Max Active RunsConcurrency controlmax_active_runs=1Sequential execution
TimezoneTime handlingtimezone('UTC')Global deployments

Code Examples

Advanced Cron Patterns

# advanced_cron_patterns.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.time_sensor import TimeSensor
from airflow.sensors.external_task import ExternalTaskSensor
import croniter

def business_day_schedule():
    """Generate schedule for business days only."""
    # Run at 9 AM on weekdays
    return '0 9 * * 1-5'

def month_end_schedule():
    """Generate schedule for last day of month."""
    # Run at 11 PM on last day of month
    return '0 23 28-31 * *'

def quarterly_schedule():
    """Generate schedule for quarterly processing."""
    # Run on first day of each quarter at midnight
    return '0 0 1 1,4,7,10 *'

def custom_cron_parser(cron_expression: str):
    """Parse and validate cron expression."""
    from croniter import croniter
    from datetime import datetime

    # Validate cron expression
    if not croniter.is_valid(cron_expression):
        raise ValueError(f"Invalid cron expression: {cron_expression}")

    # Get next execution times
    cron = croniter(cron_expression, datetime.now())
    next_runs = []
    for _ in range(5):
        next_runs.append(cron.get_next(datetime))

    return next_runs

def advanced_cron_example():
    """Demonstrate advanced cron patterns."""
    # Complex cron patterns
    patterns = {
        'every_15_minutes': '*/15 * * * *',
        'every_2_hours_weekdays': '0 */2 * * 1-5',
        'first_monday_of_month': '0 0 1-7 * 1',
        'last_friday_of_month': '0 0 25-31 * 5',
        'every_6_hours_utc': '0 */6 * * *',
    }

    for name, pattern in patterns.items():
        print(f"\n{name}: {pattern}")
        next_runs = custom_cron_parser(pattern)
        for run_time in next_runs:
            print(f"  Next run: {run_time}")

with DAG(
    'advanced_cron_dag',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='Advanced cron scheduling patterns',
    schedule_interval=business_day_schedule(),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['cron', 'advanced'],
) as dag:

    # Task to demonstrate cron parsing
    parse_cron = PythonOperator(
        task_id='parse_cron',
        python_callable=advanced_cron_example,
    )

    # Time sensor example
    def check_business_hours(**context):
        """Check if current time is within business hours."""
        from airflow.utils import timezone
        current_time = timezone.utcnow()

        # Business hours: 9 AM to 5 PM UTC
        if 9 <= current_time.hour < 17:
            return True
        return False

    time_check = PythonOperator(
        task_id='business_hours_check',
        python_callable=check_business_hours,
    )

    parse_cron >> time_check

Custom Timetable Implementation

# custom_timetable.py
from datetime import datetime, timedelta
from typing import Optional
from airflow.timetables.base import Timetable, DagRunInfo, TimeRestriction
from airflow.models import DagModel
from airflow import settings

class BusinessDaysTimetable(Timetable):
    """
    Custom timetable that only schedules on business days.
    Excludes weekends and major holidays.
    """

    def __init__(
        self,
        hour: int = 9,
        minute: int = 0,
        exclude_holidays: bool = True,
    ):
        super().__init__()
        self.hour = hour
        self.minute = minute
        self.exclude_holidays = exclude_holidays

    def _is_business_day(self, date: datetime) -> bool:
        """Check if date is a business day."""
        # Check if weekend
        if date.weekday() >= 5:  # Saturday (5) or Sunday (6)
            return False

        if self.exclude_holidays:
            # Check against holiday list
            holidays = self._get_holidays(date.year)
            if date.date() in holidays:
                return False

        return True

    def _get_holidays(self, year: int) -> set:
        """Get holidays for a given year."""
        # Example US federal holidays
        holidays = set()

        # New Year's Day
        holidays.add(datetime(year, 1, 1).date())

        # Independence Day
        holidays.add(datetime(year, 7, 4).date())

        # Christmas
        holidays.add(datetime(year, 12, 25).date())

        # Add more holidays as needed

        return holidays

    def get_next_dagrun_info(
        self,
        last_automated_data_interval: Optional[DataInterval],
        restriction: TimeRestriction,
    ) -> Optional[DagRunInfo]:
        """Get next DAG run information."""
        if last_automated_data_interval is None:
            # First run - start from restriction start_date
            start_date = restriction.start_date or datetime.now()
        else:
            # Next run - start from last run end
            start_date = last_automated_data_interval.end

        # Find next business day
        next_date = start_date
        while not self._is_business_day(next_date):
            next_date += timedelta(days=1)

            # Set to specified time
            next_date = next_date.replace(
                hour=self.hour,
                minute=self.minute,
                second=0,
                microsecond=0,
            )

            # Check if we've exceeded restriction
            if restriction.end and next_date > restriction.end:
                return None

        # Create data interval
        data_interval = DataInterval(start=next_date, end=next_date)

        return DagRunInfo(run_id=f"business_{next_date.isoformat()}", data_interval=data_interval)

    def infer_manual_data_interval(self, run_after: datetime) -> DataInterval:
        """Infer data interval for manual runs."""
        # Use previous business day as data interval
        previous_date = run_after - timedelta(days=1)
        while not self._is_business_day(previous_date):
            previous_date -= timedelta(days=1)

        return DataInterval(start=previous_date, end=previous_date)

# Usage in DAG
with DAG(
    'business_days_dag',
    timetable=BusinessDaysTimetable(hour=9, minute=0),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['timetable', 'business_days'],
) as dag:
    # Tasks here will only run on business days
    pass

Trigger Implementation

# trigger_implementation.py
from datetime import datetime, timedelta
from typing import Any, AsyncIterator, Dict, Optional, Tuple
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.models import Connection
import asyncio
import aiohttp

class WebhookTrigger(BaseTrigger):
    """
    Custom trigger that waits for a webhook call.

    This trigger listens for incoming HTTP requests and resumes
    the operator when a valid webhook is received.
    """

    def __init__(
        self,
        webhook_path: str,
        method: str = 'POST',
        headers: Optional[Dict[str, str]] = None,
        timeout: int = 300,
    ):
        super().__init__()
        self.webhook_path = webhook_path
        self.method = method
        self.headers = headers or {}
        self.timeout = timeout

    def serialize(self) -> Tuple[str, Dict[str, Any]]:
        """Serialize the trigger for storage."""
        return (
            "trigger_implementation.WebhookTrigger",
            {
                "webhook_path": self.webhook_path,
                "method": self.method,
                "headers": self.headers,
                "timeout": self.timeout,
            },
        )

    async def run(self) -> AsyncIterator[TriggerEvent]:
        """Run the webhook listener."""
        import aiohttp.web

        # Create a simple web server to listen for webhooks
        app = aiohttp.web.Application()
        app.router.add_route(self.method, self.webhook_path, self.handle_webhook)

        runner = aiohttp.web.AppRunner(app)
        await runner.setup()
        site = aiohttp.web.TCPSite(runner, 'localhost', 8080)
        await site.start()

        try:
            # Wait for webhook with timeout
            async with asyncio.timeout(self.timeout):
                while True:
                    await asyncio.sleep(1)
                    # Check for webhook data
                    if hasattr(self, '_webhook_data'):
                        yield TriggerEvent(self._webhook_data)
                        break
        finally:
            await runner.cleanup()

    async def handle_webhook(self, request: aiohttp.web.Request):
        """Handle incoming webhook request."""
        try:
            data = await request.json()
            self._webhook_data = {
                'status': 'success',
                'data': data,
                'timestamp': datetime.now().isoformat(),
            }
            return aiohttp.web.json_response({'status': 'received'})
        except Exception as e:
            self._webhook_data = {
                'status': 'error',
                'error': str(e),
                'timestamp': datetime.now().isoformat(),
            }
            return aiohttp.web.json_response({'status': 'error'}, status=400)

class PollingTrigger(BaseTrigger):
    """
    Custom trigger that polls an endpoint until condition is met.
    """

    def __init__(
        self,
        http_conn_id: str,
        endpoint: str,
        method: str = 'GET',
        headers: Optional[Dict[str, str]] = None,
        poll_interval: int = 30,
        timeout: int = 3600,
        success_criteria: Optional[Dict[str, Any]] = None,
    ):
        super().__init__()
        self.http_conn_id = http_conn_id
        self.endpoint = endpoint
        self.method = method
        self.headers = headers or {}
        self.poll_interval = poll_interval
        self.timeout = timeout
        self.success_criteria = success_criteria or {}

    def serialize(self) -> Tuple[str, Dict[str, Any]]:
        """Serialize the trigger for storage."""
        return (
            "trigger_implementation.PollingTrigger",
            {
                "http_conn_id": self.http_conn_id,
                "endpoint": self.endpoint,
                "method": self.method,
                "headers": self.headers,
                "poll_interval": self.poll_interval,
                "timeout": self.timeout,
                "success_criteria": self.success_criteria,
            },
        )

    async def run(self) -> AsyncIterator[TriggerEvent]:
        """Run the polling loop."""
        conn = Connection.get_connection(self.http_conn_id)
        url = f"{conn.schema}://{conn.host}:{conn.port}{self.endpoint}"

        headers = {**self.headers}
        if conn.login:
            headers['Authorization'] = f"Bearer {conn.get_password()}"

        start_time = datetime.now()
        timeout_delta = timedelta(seconds=self.timeout)

        async with aiohttp.ClientSession() as session:
            while True:
                try:
                    async with session.request(
                        self.method,
                        url,
                        headers=headers,
                    ) as response:
                        data = await response.json()

                        # Check success criteria
                        if self._check_success_criteria(data):
                            yield TriggerEvent({
                                'status': 'success',
                                'data': data,
                                'timestamp': datetime.now().isoformat(),
                            })
                            return

                except Exception as e:
                    # Log error but continue polling
                    print(f"Polling error: {e}")

                # Check timeout
                if datetime.now() - start_time > timeout_delta:
                    yield TriggerEvent({
                        'status': 'timeout',
                        'error': 'Polling timeout exceeded',
                        'timestamp': datetime.now().isoformat(),
                    })
                    return

                # Wait before next poll
                await asyncio.sleep(self.poll_interval)

    def _check_success_criteria(self, data: Dict[str, Any]) -> bool:
        """Check if response meets success criteria."""
        for key, value in self.success_criteria.items():
            if key not in data or data[key] != value:
                return False
        return True

# Usage in DAG
from airflow.operators.python import PythonOperator
from airflow.models import DAG

def process_trigger_result(**context):
    """Process the result from trigger."""
    trigger_event = context['ti'].xcom_pull(
        task_ids='wait_for_webhook',
        key='return_value',
    )
    print(f"Trigger event received: {trigger_event}")

with DAG(
    'trigger_example_dag',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='Custom trigger example',
    schedule_interval=timedelta(hours=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['trigger', 'custom'],
) as dag:

    # Webhook trigger example
    from airflow.operators.python import PythonOperator

    wait_for_webhook = PythonOperator(
        task_id='wait_for_webhook',
        python_callable=process_trigger_result,
        # In practice, you'd use a sensor with the custom trigger
    )

    # Polling trigger example
    wait_for_condition = PythonOperator(
        task_id='wait_for_condition',
        python_callable=process_trigger_result,
    )

    wait_for_webhook >> wait_for_condition

Dataset-Driven Scheduling

# dataset_scheduling.py
from datetime import datetime, timedelta
from airflow import DAG, Dataset
from airflow.operators.python import PythonOperator
from airflow.datasets import Dataset

# Define datasets
raw_data_dataset = Dataset("s3://my-bucket/raw-data/{ds}/")
processed_data_dataset = Dataset("s3://my-bucket/processed-data/{ds}/")
report_dataset = Dataset("s3://my-bucket/reports/{ds}/")

def extract_data(**context):
    """Extract data from source."""
    # Simulate data extraction
    print("Extracting data...")
    return {"status": "success", "records": 1000}

def transform_data(**context):
    """Transform extracted data."""
    # Simulate data transformation
    print("Transforming data...")
    return {"status": "success", "transformed_records": 950}

def generate_report(**context):
    """Generate report from transformed data."""
    # Simulate report generation
    print("Generating report...")
    return {"status": "success", "report_url": "s3://reports/report.pdf"}

# DAG 1: Data Extraction (writes to raw_data_dataset)
with DAG(
    'data_extraction_dag',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='Data extraction DAG',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['dataset', 'extraction'],
    # This DAG triggers when raw_data_dataset is updated
    schedule=[raw_data_dataset],
) as extraction_dag:

    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_data,
        outlets=[raw_data_dataset],  # This DAG produces this dataset
    )

# DAG 2: Data Processing (reads from raw_data_dataset, writes to processed_data_dataset)
with DAG(
    'data_processing_dag',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='Data processing DAG',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['dataset', 'processing'],
    # This DAG runs when raw_data_dataset is updated
    schedule=[raw_data_dataset],
) as processing_dag:

    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_data,
        inlets=[raw_data_dataset],  # This DAG reads this dataset
        outlets=[processed_data_dataset],  # This DAG writes this dataset
    )

# DAG 3: Report Generation (reads from processed_data_dataset)
with DAG(
    'report_generation_dag',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='Report generation DAG',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['dataset', 'reporting'],
    # This DAG runs when processed_data_dataset is updated
    schedule=[processed_data_dataset],
) as report_dag:

    report = PythonOperator(
        task_id='generate_report',
        python_callable=generate_report,
        inlets=[processed_data_dataset],  # This DAG reads this dataset
        outlets=[report_dataset],  # This DAG writes this dataset
    )

Performance Metrics

MetricDescriptionOptimization Strategy
Cron Parse TimeTime to parse cron expressionUse simple expressions
Schedule AccuracyHow closely actual run times match scheduled timesOptimize scheduler heartbeat
Trigger LatencyTime for trigger to fireUse async triggers, optimize polling
DAG Run CreationTime to create DagRun objectsOptimize database queries
Backfill SpeedTime to complete backfillParallelize, optimize task execution
Timezone HandlingAccuracy of timezone conversionsUse timezone-aware datetime objects
Schedule DriftDeviation from intended scheduleMonitor and adjust scheduling parameters
Resource UsageScheduler resource consumptionOptimize heartbeat interval, limit active runs

Best Practices

  1. Cron Expression Validation: Always validate cron expressions before deploying. Use tools like croniter to test and verify schedules.

  2. Timezone Handling: Use timezone-aware datetime objects throughout your DAGs. Be explicit about timezone conversions and avoid naive datetime objects.

  3. Catchup Configuration: Set catchup=False for new DAGs to avoid unintended backfill. Use catchup=True only when you explicitly need historical runs.

  4. Max Active Runs: Configure max_active_runs to prevent resource contention. Use max_active_runs=1 for sequential execution.

  5. Schedule Interval: Choose appropriate schedule intervals based on data requirements and resource constraints. Avoid over-scheduling.

  6. Trigger Usage: Use triggers for external dependencies instead of polling. This improves efficiency and reduces resource usage.

  7. Dataset Dependencies: Use dataset-driven scheduling for event-driven workflows. This provides more flexibility than time-based scheduling.

  8. Monitoring: Monitor schedule accuracy and trigger performance. Set up alerts for missed or delayed runs.

  9. Testing: Test scheduling logic thoroughly. Use Airflow's testing utilities to validate schedule behavior.

  10. Documentation: Document scheduling decisions and rationale. Include schedule descriptions in DAG documentation.

Key Takeaways:

  • Schedule interval Ξ”t=tnextβˆ’tlast_completed\Delta t = t_{\text{next}} - t_{\text{last\_completed}} determines DAG run frequency
  • Catchup run count is ⌊(tnowβˆ’tstart)/Ξ”tβŒ‹\lfloor (t_{\text{now}} - t_{\text{start}}) / \Delta t \rfloor
  • Schedule drift is bounded by Ξ΄max⁑=Ξ”thb+Ο΅parse\delta_{\max} = \Delta t_{\text{hb}} + \epsilon_{\text{parse}}
  • Triggers enable async deferral without worker resource consumption
  • Timetables provide flexible scheduling beyond cron expressions
  • Always use timezone-aware datetime objects to avoid scheduling bugs

See also: Kafka Connect (kafka/03), PySpark Submit (pyspark/19), Data Engineering Orchestration (data-engineering/017)

Advertisement

Need Expert Airflow Help?

Get personalized DAG design, scheduling optimization, or production Airflow consulting.

Advertisement