Apache Airflow Architecture Deep Dive

Free Lesson

Advertisement

Apache Airflow Architecture

Architecture Diagram

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                        APACHE AIRFLOW ARCHITECTURE                          β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                  β”‚
β”‚  β”‚   Web Server β”‚    β”‚   Scheduler  β”‚    β”‚   Triggerer  β”‚                  β”‚
β”‚  β”‚   (Flask)    β”‚    β”‚   (Core)     β”‚    β”‚   (Async)    β”‚                  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜                  β”‚
β”‚         β”‚                   β”‚                   β”‚                           β”‚
β”‚         β”‚    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚                           β”‚
β”‚         β”‚    β”‚                             β”‚    β”‚                           β”‚
β”‚         β–Ό    β–Ό                             β–Ό    β–Ό                           β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                     METADATA DATABASE (PostgreSQL)                  β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚   β”‚
β”‚  β”‚  β”‚  dag_run β”‚ β”‚   task   β”‚ β”‚   log    β”‚ β”‚  slot    β”‚ β”‚  serial  β”‚ β”‚   β”‚
β”‚  β”‚  β”‚          β”‚ β”‚  instanceβ”‚ β”‚          β”‚ β”‚  pool    β”‚ β”‚  pickles β”‚ β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚         β”‚                   β”‚                   β”‚                           β”‚
β”‚         β–Ό                   β–Ό                   β–Ό                           β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                        EXECUTOR LAYER                               β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β”‚
β”‚  β”‚  β”‚ Sequential  β”‚ β”‚   Local     β”‚ β”‚   Celery    β”‚ β”‚ Kubernetes  β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  Executor   β”‚ β”‚  Executor   β”‚ β”‚  Executor   β”‚ β”‚  Executor   β”‚  β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚         β”‚                   β”‚                   β”‚                           β”‚
β”‚         β–Ό                   β–Ό                   β–Ό                           β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                      WORKER NODES                                   β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚   β”‚
β”‚  β”‚  β”‚ Worker 1 β”‚ β”‚ Worker 2 β”‚ β”‚ Worker 3 β”‚ β”‚ Worker N β”‚ β”‚  Pods    β”‚ β”‚   β”‚
β”‚  β”‚  β”‚ (Celery) β”‚ β”‚ (Celery) β”‚ β”‚ (Celery) β”‚ β”‚ (Celery) β”‚ β”‚ (K8s)    β”‚ β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    SCHEDULER INTERNAL WORKFLOW                               β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                     DAG FILE PROCESSOR                               β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  DAG File Processor Manager                                 β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚ File 1  β”‚ β”‚ File 2  β”‚ β”‚ File 3  β”‚ β”‚ File N  β”‚          β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚ Parse   β”‚ β”‚ Parse   β”‚ β”‚ Parse   β”‚ β”‚ Parse   β”‚          β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜          β”‚   β”‚   β”‚
β”‚  β”‚  β”‚       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                   β–Ό           β–Ό                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚           β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                     β”‚   β”‚   β”‚
β”‚  β”‚  β”‚           β”‚  Serialized DAGs Cache  β”‚                     β”‚   β”‚   β”‚
β”‚  β”‚  β”‚           β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                     β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                    β”‚                                        β”‚
β”‚                                    β–Ό                                        β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                     SCHEDULER HEARTBEAT                              β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  1. Check for new DAG files                                 β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  2. Update DAG parsing statistics                           β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  3. Create DagRuns for scheduled DAGs                       β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  4. Queue task instances for execution                      β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  5. Update task instance states                             β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  6. Process callbacks                                       β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  7. Emit heartbeats                                        β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                    β”‚                                        β”‚
β”‚                                    β–Ό                                        β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                     TASK QUEUING                                    β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚ Priorityβ”‚   β”‚Priority β”‚   β”‚Priority β”‚   β”‚Priority β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚   1     │──▢│   2     │──▢│   3     │──▢│   N     β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    METADATA DATABASE SCHEMA                                   β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚
β”‚  β”‚      dag         β”‚       β”‚    dag_run       β”‚       β”‚  task_instance   β”‚β”‚
β”‚  β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€       β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€       β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”‚
β”‚  β”‚ dag_id (PK)      │──┐    β”‚ id (PK)          │──┐    β”‚ task_id (PK)     β”‚β”‚
β”‚  β”‚ root_dag_id      β”‚  β”‚    β”‚ dag_id (FK)      β”‚  β”‚    β”‚ dag_id (PK)      β”‚β”‚
β”‚  β”‚ parent_dag_id    β”‚  β”‚    β”‚ run_id (PK)      β”‚  β”‚    β”‚ run_id (PK)      β”‚β”‚
β”‚  β”‚ file_token       β”‚  └───▢│ execution_date   β”‚  └───▢│ execution_date   β”‚β”‚
β”‚  β”‚ fileloc          β”‚       β”‚ logical_date     β”‚       β”‚ start_date       β”‚β”‚
β”‚  β”‚ owners           β”‚       β”‚ start_date       β”‚       β”‚ end_date         β”‚β”‚
β”‚  β”‚ is_active        β”‚       β”‚ end_date         β”‚       β”‚ duration         β”‚β”‚
β”‚  β”‚ is_paused        β”‚       β”‚ state            β”‚       β”‚ state            β”‚β”‚
β”‚  β”‚ is_subdag        β”‚       β”‚ run_type         β”‚       β”‚ try_number       β”‚β”‚
β”‚  β”‚ schedule_intervalβ”‚       β”‚ conf             β”‚       β”‚ max_tries        β”‚β”‚
β”‚  β”‚ timetable_slug   β”‚       β”‚ external_trigger β”‚       β”‚ hostname         β”‚β”‚
β”‚  β”‚ tags             β”‚       β”‚ last_scheduling_decisionβ”‚ β”‚ unixname        β”‚β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚
β”‚           β”‚                          β”‚                          β”‚           β”‚
β”‚           β”‚                          β”‚                          β”‚           β”‚
β”‚           β–Ό                          β–Ό                          β–Ό           β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚
β”‚  β”‚      log         β”‚       β”‚      slot_pool   β”‚       β”‚  serialized_dag  β”‚β”‚
β”‚  β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€       β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€       β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”‚
β”‚  β”‚ id (PK)          β”‚       β”‚ id (PK)          β”‚       β”‚ dag_id (PK)      β”‚β”‚
β”‚  β”‚ dag_id           β”‚       β”‚ name             β”‚       β”‚ file_token       β”‚β”‚
β”‚  β”‚ task_id          β”‚       β”‚ slots            β”‚       β”‚ data             β”‚β”‚
β”‚  β”‚ execution_date   β”‚       β”‚ occupied_slots   β”‚       β”‚ processor_subdir β”‚β”‚
β”‚  β”‚ event            β”‚       β”‚ open_slots       β”‚       β”‚ created_at       β”‚β”‚
β”‚  β”‚ timestamp        β”‚       β”‚                  β”‚       β”‚ updated_at       β”‚β”‚
β”‚  β”‚ message          β”‚       β”‚                  β”‚       β”‚                  β”‚β”‚
β”‚  β”‚ level            β”‚       β”‚                  β”‚       β”‚                  β”‚β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚
β”‚                                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Formal Definitions

DfDirected Acyclic Graph (DAG)

A DAG is a finite directed graph with no directed cycles. In Airflow, a DAG is a collection of tasks with organized dependencies, defining the execution order and schedule of a workflow. Formally, G=(V,E)G = (V, E) where VV is the set of tasks and EE is the set of dependency edges, with no cycle v1→v2→⋯→vk→v1v_1 \rightarrow v_2 \rightarrow \cdots \rightarrow v_k \rightarrow v_1.

DfScheduler

The scheduler is the core orchestrator component responsible for triggering scheduled workflows, monitoring task instances, and managing dependencies. It operates on a heartbeat model with interval Ξ”theartbeat\Delta t_{\text{heartbeat}}, creating DagRuns and queuing task instances for execution.

DfExecutor

An executor determines how task instances are executed. It abstracts the execution mechanism, mapping tasks to compute resources. The parallelism is bounded by Pmax⁑=min⁑(Eavailable,Tpending)P_{\max} = \min(E_{\text{available}}, T_{\text{pending}}) where EavailableE_{\text{available}} is the number of available executor slots and TpendingT_{\text{pending}} is the count of tasks ready for execution.

DfMetadata Database

The metadata database (typically PostgreSQL) stores all state information about DAGs, task instances, connections, variables, and other Airflow objects. It serves as the central coordination point ensuring consistency across all distributed components via ACID transactions.

Detailed Explanation

Apache Airflow is a platform for programmatically authoring, scheduling, and monitoring workflows. At its core, Airflow follows a modular architecture with several key components that work together to provide reliable workflow orchestration.

Core Components

The Web Server provides the user interface for monitoring and managing DAGs. Built on Flask, it offers a RESTful API and a rich UI that displays DAG runs, task instances, logs, and other metadata. The web server connects to the metadata database to fetch information about workflow execution and presents it in an intuitive dashboard.

The Scheduler is the heart of Airflow. It's responsible for triggering scheduled workflows, monitoring task instances, and managing dependencies. The scheduler uses the metadata database to track state and maintain coordination between different components. It operates on a heartbeat model, periodically checking for new DAG files, creating DagRuns, and queuing task instances for execution.

The Metadata Database stores all state information about DAGs, task instances, users, connections, variables, and other Airflow objects. PostgreSQL is recommended for production deployments due to its ACID compliance and performance characteristics. The database schema is carefully designed to support concurrent operations and maintain data integrity across distributed components.

The Executor determines how task instances are executed. Airflow supports several executors with different trade-offs in terms of complexity, scalability, and resource utilization. The executor layer abstracts the execution mechanism, allowing users to switch between different execution strategies without modifying their DAGs.

Deltattextschedule=textenddateβˆ’textstartdate\\Delta t_{\\text{schedule}} = \\text{end\\_date} - \\text{start\\_date}

Total Task Execution Time

Ttexttotal=sumi=1nTtexttask,i+sumi=1nTtextoverhead,iT_{\\text{total}} = \\sum_{i=1}^{n} T_{\\text{task},i} + \\sum_{i=1}^{n} T_{\\text{overhead},i}

Here,

  • nn=Number of tasks in the DAG
  • Ttask,iT_{\text{task},i}=Execution time of task i
  • Toverhead,iT_{\text{overhead},i}=Scheduling and queuing overhead for task i

Maximum Parallelism

Pmax=min(Etextexecutors,;Ttexttaskcount)P_{\\max} = \\min(E_{\\text{executors}},\\; T_{\\text{task\\_count}})

Here,

  • EexecutorsE_{\text{executors}}=Number of available executor slots
  • Ttask_countT_{\text{task\_count}}=Total tasks ready for execution

ThDAG Correctness (Acyclicity Invariant)

A valid Airflow DAG must be a directed acyclic graph. If the dependency graph G=(V,E)G = (V, E) contains a cycle, the scheduler will reject the DAG during parsing. Formally, βˆ„\nexists a path v1β†’v2β†’β‹―β†’vkβ†’v1v_1 \rightarrow v_2 \rightarrow \cdots \rightarrow v_k \rightarrow v_1 for any subset of tasks v1,…,vk∈Vv_1, \ldots, v_k \in V.

The scheduler processes DAG files at a configurable interval (min_file_process_interval). Reducing this interval improves responsiveness but increases CPU overhead. For most production deployments, 30 seconds is a reasonable default.

Component Interactions

When a DAG file is parsed, the scheduler creates serialized DAG objects and stores them in the metadata database. The scheduler then monitors for scheduled intervals and creates DagRuns when appropriate. Task instances are queued based on their dependencies and available resources. The executor picks up queued tasks and dispatches them to worker nodes or processes.

The metadata database serves as the central coordination point, ensuring consistency across all components. It tracks task states, manages concurrency limits, and provides the information needed for the web server to display current status. The database also stores connection details, variables, and other configuration that tasks may need during execution.

Scalability Considerations

Airflow's architecture supports horizontal scaling through the executor layer. The Celery executor allows distributing tasks across multiple worker nodes, while the Kubernetes executor dynamically provisions pods for task execution. The scheduler can be configured to parse DAG files in parallel, and the metadata database can be optimized with proper indexing and connection pooling.

The web server is stateless and can be scaled horizontally behind a load balancer. However, the scheduler is typically deployed as a single instance to avoid conflicts in scheduling decisions. For high-availability requirements, Airflow supports running multiple schedulers with careful coordination through the metadata database.

Reliability and Fault Tolerance

Airflow provides several mechanisms for ensuring reliability. The scheduler uses heartbeats to detect and recover from failures. Task instances can be configured with retry logic to handle transient failures. The metadata database stores enough information to resume execution after component failures. DAG runs are tracked with unique identifiers, ensuring that workflows can be properly restarted or resumed.

The system also provides comprehensive logging, with task logs stored centrally and accessible through the web server. This enables debugging and monitoring of workflow execution across distributed environments. Airflow's callback system allows integration with external monitoring tools for proactive alerting on failures or performance issues.

Key Concepts Table

ComponentPurposeConfigurationScalability
Web ServerUser interface and APIwebserver_config.pyHorizontal scaling with load balancer
SchedulerWorkflow orchestrationairflow.cfg [scheduler]Single instance recommended
Metadata DBState managementairflow.cfg [database]Read replicas for queries
ExecutorTask executionairflow.cfg [core]Depends on executor type
WorkerTask processingCelery/K8s configHorizontal scaling
TriggererAsync trigger handlingairflow.cfg [triggerer]Horizontal scaling
DAG ProcessorFile parsingairflow.cfg [dag_processor_manager]Parallel parsing

Code Examples

Custom Executor Configuration

# dags/custom_executor_example.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

# Define default arguments for the DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Create the DAG
with DAG(
    'architecture_demo_dag',
    default_args=default_args,
    description='Demonstration of Airflow architecture concepts',
    schedule_interval=timedelta(hours=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['architecture', 'demo'],
) as dag:

    # Task to demonstrate scheduler behavior
    def scheduler_info(**context):
        """Display scheduler information and context."""
        from airflow.models import DagRun
        from airflow.utils.state import State

        # Get current DAG run information
        dag_run = context['dag_run']
        execution_date = context['execution_date']

        print(f"Scheduler triggered DAG run: {dag_run.run_id}")
        print(f"Execution date: {execution_date}")
        print(f"Run type: {dag_run.run_type}")

        # Access metadata database through ORM
        from airflow.models import TaskInstance
        from airflow import settings

        session = settings.Session()
        task_instances = session.query(TaskInstance).filter(
            TaskInstance.dag_id == context['dag'].dag_id,
            TaskInstance.run_id == dag_run.run_id
        ).all()

        for ti in task_instances:
            print(f"Task: {ti.task_id}, State: {ti.state}")

    # Task to demonstrate executor behavior
    def executor_demo(**context):
        """Demonstrate executor-specific behavior."""
        import platform
        import os

        executor_info = {
            'hostname': platform.node(),
            'pid': os.getpid(),
            'executor': context['ti'].executor or 'default',
            'worker_id': os.environ.get('HOSTNAME', 'unknown'),
        }

        print(f"Executing on: {executor_info['hostname']}")
        print(f"Process ID: {executor_info['pid']}")
        print(f"Executor: {executor_info['executor']}")

        # Simulate work
        import time
        time.sleep(2)

        return executor_info

    # Define tasks
    task1 = PythonOperator(
        task_id='scheduler_info',
        python_callable=scheduler_info,
    )

    task2 = PythonOperator(
        task_id='executor_demo',
        python_callable=executor_demo,
    )

    task3 = BashOperator(
        task_id='bash_task',
        command='echo "Task executed on $(hostname) at $(date)"',
    )

    # Set task dependencies
    task1 >> task2 >> task3

Metadata Database Optimization

# metadata_db_optimization.py
from airflow import settings
from airflow.models import DagRun, TaskInstance
from sqlalchemy import text

def optimize_metadata_db():
    """Optimize metadata database for better performance."""
    session = settings.Session()

    # Analyze query performance
    analysis_queries = [
        # Find slow queries
        """
        SELECT query, mean_time, calls
        FROM pg_stat_statements
        WHERE query LIKE '%task_instance%'
        ORDER BY mean_time DESC
        LIMIT 10;
        """,

        # Check index usage
        """
        SELECT indexrelname, idx_scan, idx_tup_read, idx_tup_fetch
        FROM pg_stat_user_indexes
        WHERE schemaname = 'public'
        ORDER BY idx_scan DESC;
        """,

        # Table statistics
        """
        SELECT relname, n_tup_ins, n_tup_upd, n_tup_del, n_live_tup, n_dead_tup
        FROM pg_stat_user_tables
        WHERE schemaname = 'public'
        ORDER BY n_live_tup DESC;
        """,
    ]

    for query in analysis_queries:
        result = session.execute(text(query))
        print("Query Results:")
        for row in result:
            print(row)

    # Optimize connection pooling
    def configure_connection_pool():
        """Configure SQLAlchemy connection pool for Airflow."""
        from sqlalchemy.pool import QueuePool

        engine = settings.engine
        if hasattr(engine.pool, '_configure'):
            engine.pool._configure(
                pool_size=5,           # Number of connections to maintain
                max_overflow=10,       # Maximum overflow connections
                pool_timeout=30,       # Timeout for getting connection
                pool_recycle=1800,     # Recycle connections after 30 minutes
                pool_pre_ping=True     # Verify connections before use
            )

    configure_connection_pool()

    # Create additional indexes for better query performance
    additional_indexes = [
        """
        CREATE INDEX IF NOT EXISTS idx_task_instance_dag_run
        ON task_instance(dag_id, run_id);
        """,
        """
        CREATE INDEX IF NOT EXISTS idx_task_instance_state
        ON task_instance(state);
        """,
        """
        CREATE INDEX IF NOT EXISTS idx_dag_run_state
        ON dag_run(state);
        """,
    ]

    for index_sql in additional_indexes:
        try:
            session.execute(text(index_sql))
            session.commit()
        except Exception as e:
            print(f"Index creation failed: {e}")
            session.rollback()

if __name__ == "__main__":
    optimize_metadata_db()

Web Server Customization

# web_server_customization.py
from airflow.www.app import create_app
from airflow.security import permissions
from flask import Blueprint

# Custom blueprint for additional endpoints
custom_bp = Blueprint(
    'custom_bp',
    __name__,
    template_folder='templates',
    static_folder='static',
)

@custom_bp.route('/api/v1/custom/metrics')
def get_custom_metrics():
    """Custom API endpoint for metrics."""
    from airflow.models import DagRun, TaskInstance
    from airflow.utils.state import State
    from airflow import settings
    from flask import jsonify

    session = settings.Session()

    # Get overall statistics
    total_dags = session.query(DagRun).count()
    running_tasks = session.query(TaskInstance).filter(
        TaskInstance.state == State.RUNNING
    ).count()
    failed_tasks = session.query(TaskInstance).filter(
        TaskInstance.state == State.FAILED
    ).count()

    metrics = {
        'total_dag_runs': total_dags,
        'running_tasks': running_tasks,
        'failed_tasks': failed_tasks,
        'success_rate': (total_dags - failed_tasks) / total_dags * 100 if total_dags > 0 else 0,
    }

    return jsonify(metrics)

def create_custom_app(config=None):
    """Create Airflow web app with customizations."""
    app = create_app(config)

    # Register custom blueprint
    app.register_blueprint(custom_bp, url_prefix='/airflow')

    # Add custom template filters
    @app.template_filter('datetime_format')
    def datetime_format(value, format='%Y-%m-%d %H:%M:%S'):
        return value.strftime(format) if value else ''

    # Add custom context processor
    @app.context_processor
    def inject_custom_variables():
        return {
            'custom_title': 'Airflow Dashboard',
            'version': '2.8.0',
        }

    return app

Performance Metrics

MetricValueOptimization Strategy
Scheduler Heartbeat Interval5 secondsAdjust based on workload
DAG Parsing Time1-10 seconds per fileUse dag_bag caching
Database Query Time< 100msProper indexing, connection pooling
Task Queue Latency< 5 secondsOptimized executor configuration
Web Server Response Time< 500msCaching, load balancing
Metadata DB Connections50-100Connection pooling, read replicas
Worker Memory Usage1-4 GBResource limits, monitoring
Log Storage1-10 GB/dayLog rotation, external storage

Best Practices

  1. Database Optimization: Use PostgreSQL with proper indexing. Monitor query performance and add indexes for frequently accessed tables. Consider read replicas for heavy read workloads.

  2. Executor Selection: Choose the executor based on your scale requirements. Start with LocalExecutor for development, CeleryExecutor for medium scale, and KubernetesExecutor for large-scale deployments.

  3. Scheduler Configuration: Tune min_file_process_interval and dag_dir_list_interval based on your DAG file count and parsing complexity. Use parallelism to control concurrent task execution.

  4. Connection Management: Implement connection pooling for database connections. Monitor connection usage and adjust pool sizes based on workload patterns.

  5. Monitoring and Alerting: Set up comprehensive monitoring for all components. Track scheduler heartbeat, task success rates, and database performance metrics.

  6. Security: Implement role-based access control. Use encrypted connections for metadata database and worker communication. Regularly rotate credentials and secrets.

  7. Backup Strategy: Implement regular backups of the metadata database. Test restoration procedures to ensure data integrity. Consider point-in-time recovery for critical deployments.

  8. Resource Planning: Estimate resource requirements based on DAG count, task frequency, and concurrency limits. Monitor resource utilization and scale proactively.

  9. Logging Configuration: Centralize logs for debugging and compliance. Implement log rotation and retention policies. Consider using external logging systems for large deployments.

  10. High Availability: For production deployments, consider redundant components where possible. Implement health checks and automatic failover for critical services.

Key Takeaways:

  • Airflow follows a modular architecture: Scheduler, Executor, Web Server, Metadata DB, Workers
  • The DAG correctness theorem requires acyclic dependency graphs for valid scheduling
  • Maximum parallelism is bounded by Pmax⁑=min⁑(Eexecutors,Ttask_count)P_{\max} = \min(E_{\text{executors}}, T_{\text{task\_count}})
  • Total execution time includes both task computation and scheduling overhead
  • The metadata database is the central coordination point for all distributed components
  • Heartbeat-based scheduling ensures fault tolerance and consistency

See also: Kafka Connect (kafka/03), PySpark Submit (pyspark/19), Data Engineering Orchestration (data-engineering/017)

Advertisement

Need Expert Airflow Help?

Get personalized DAG design, scheduling optimization, or production Airflow consulting.

Advertisement