XCom Communications in Apache Airflow

Free Lesson

Advertisement

XCom Communications in Apache Airflow

Architecture Diagram

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    XCOM COMMUNICATION ARCHITECTURE                           β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    TASK COMMUNICATION FLOW                          β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚   β”‚
β”‚  β”‚  β”‚ Task A  │─────▢│  XCom   │─────▢│ Task B  │─────▢│ Task C  β”‚ β”‚   β”‚
β”‚  β”‚  β”‚(Producer)β”‚      β”‚ (Store) β”‚      β”‚(Consumer)β”‚      β”‚(Consumer)β”‚ β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚   β”‚
β”‚  β”‚       β”‚                β”‚                β”‚                β”‚        β”‚   β”‚
β”‚  β”‚       β”‚   xcom_push   β”‚   xcom_pull   β”‚   xcom_pull   β”‚        β”‚   β”‚
β”‚  β”‚       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Άβ”‚β—€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β—€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜        β”‚   β”‚
β”‚  β”‚                         β”‚                                        β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚   β”‚
β”‚  β”‚  β”‚  Metadata Database                                          β”‚ β”‚   β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚ β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  xcom table                                          β”‚   β”‚ β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β”œβ”€β”€ id (PK)                                         β”‚   β”‚ β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β”œβ”€β”€ key                                             β”‚   β”‚ β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β”œβ”€β”€ value (BLOB)                                    β”‚   β”‚ β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β”œβ”€β”€ timestamp                                       β”‚   β”‚ β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β”œβ”€β”€ dag_id                                           β”‚   β”‚ β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β”œβ”€β”€ task_id                                          β”‚   β”‚ β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  └── run_id                                           β”‚   β”‚ β”‚   β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚ β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    XCOM BACKEND ARCHITECTURE                                 β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    BACKEND TYPES                                    β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  1. Default Backend (Database)                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     β”œβ”€β”€ Storage: Metadata PostgreSQL                         β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     β”œβ”€β”€ Size Limit: 48KB                                     β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     β”œβ”€β”€ Performance: Good for small data                     β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     └── Use Case: Simple task communication                  β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  2. S3 Backend                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     β”œβ”€β”€ Storage: AWS S3                                      β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     β”œβ”€β”€ Size Limit: Unlimited                                β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     β”œβ”€β”€ Performance: Good for large data                     β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     └── Use Case: Large dataset transfers                    β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  3. GCS Backend                                             β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     β”œβ”€β”€ Storage: Google Cloud Storage                        β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     β”œβ”€β”€ Size Limit: Unlimited                                β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     β”œβ”€β”€ Performance: Good for large data                     β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     └── Use Case: GCP ecosystem integration                  β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  4. Custom Backend                                           β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     β”œβ”€β”€ Storage: Configurable                                β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     β”œβ”€β”€ Size Limit: Configurable                             β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     β”œβ”€β”€ Performance: Depends on implementation               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     └── Use Case: Specialized requirements                   β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    BACKEND SELECTION FLOW                           β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚   β”‚
β”‚  β”‚  β”‚  Data   │─────▢│  Size   │─────▢│ Security│─────▢│ Backend β”‚ β”‚   β”‚
β”‚  β”‚  β”‚ Analysisβ”‚      β”‚ Check   β”‚      β”‚ Requirementsβ”‚    β”‚Selectionβ”‚ β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚   β”‚
β”‚  β”‚       β”‚                β”‚                β”‚                β”‚        β”‚   β”‚
β”‚  β”‚       β–Ό                β–Ό                β–Ό                β–Ό        β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚   β”‚
β”‚  β”‚  β”‚ Small   β”‚    β”‚   <48KB β”‚    β”‚ Standardβ”‚    β”‚ Databaseβ”‚      β”‚   β”‚
β”‚  β”‚  β”‚ Medium  β”‚    β”‚   >48KB β”‚    β”‚  High   β”‚    β”‚   S3    β”‚      β”‚   β”‚
β”‚  β”‚  β”‚ Large   β”‚    β”‚ Unlimitedβ”‚   β”‚  Very Highβ”‚   β”‚ Custom  β”‚      β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    XCOM SECURITY ARCHITECTURE                                β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    SECURITY LAYERS                                   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  1. Data Encryption                                         β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     β”œβ”€β”€ At Rest: Database encryption                        β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     β”œβ”€β”€ In Transit: TLS/SSL                                 β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     β”œβ”€β”€ Application Level: Custom encryption                β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     └── Key Management: Secure key storage                  β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  2. Access Control                                          β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     β”œβ”€β”€ Task-level: Who can push/pull                       β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     β”œβ”€β”€ DAG-level: DAG-scoped XCom                          β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     β”œβ”€β”€ Run-level: Run-specific data                        β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     └── User-level: User permissions                        β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  3. Data Sanitization                                        β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     β”œβ”€β”€ Sensitive Data Detection                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     β”œβ”€β”€ Automatic Masking                                    β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     β”œβ”€β”€ Audit Logging                                        β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     └── Compliance Requirements                             β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    SECURITY FLOW                                     β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚   β”‚
β”‚  β”‚  β”‚  Task   │─────▢│ Encrypt │─────▢│  Store  │─────▢│  Audit  β”‚ β”‚   β”‚
β”‚  β”‚  β”‚  Push   β”‚      β”‚  Data   β”‚      β”‚  Data   β”‚      β”‚  Log    β”‚ β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚   β”‚
β”‚  β”‚       β”‚                β”‚                β”‚                β”‚        β”‚   β”‚
β”‚  β”‚       β–Ό                β–Ό                β–Ό                β–Ό        β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚   β”‚
β”‚  β”‚  β”‚Validate β”‚    β”‚Apply    β”‚    β”‚Secure   β”‚    β”‚Track    β”‚      β”‚   β”‚
β”‚  β”‚  β”‚Access   β”‚    β”‚Encryptionβ”‚   β”‚Storage  β”‚    β”‚Access   β”‚      β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Formal Definitions

DfXCom (Cross-Communication)

XCom is Airflow's key-value data exchange mechanism between tasks. An XCom entry is defined as (k,v,did,tid,rid,Ο„)(k, v, d_{\text{id}}, t_{\text{id}}, r_{\text{id}}, \tau) where kk is the key, vv is the serialized value, and (did,tid,rid)(d_{\text{id}}, t_{\text{id}}, r_{\text{id}}) uniquely identify the DAG run and task. The default backend stores values in the metadata database with a size limit of 48 KB.

DfXCom Backend

An XCom backend is a pluggable storage driver for XCom data. The backend B=(set,get,delete)B = (\text{set}, \text{get}, \text{delete}) implements methods for persisting, retrieving, and removing XCom entries. Backends can use databases, S3, GCS, or custom storage systems.

Detailed Explanation

XCom Fundamentals

XCom (cross-communication) is Airflow's mechanism for exchanging data between tasks. It allows tasks to push and pull small pieces of data, enabling coordination and data sharing within workflows. Understanding XCom is crucial for building sophisticated data pipelines.

XCom Data Model: XCom data is stored in the metadata database with a simple key-value structure. Each XCom entry contains a key, value, timestamp, and identifying information (dag_id, task_id, run_id). The value is serialized using Python's pickle or JSON serializers.

Push and Pull Operations: Tasks can push data to XCom using xcom_push and pull data using xcom_pull. Push operations store data, while pull operations retrieve data from previous task runs. XCom supports both explicit push/pull and implicit XCom via task return values.

Data Limitations: By default, XCom has a 48KB size limit for values stored in the database. This limit exists because XCom is designed for small metadata and coordination data, not large datasets. For larger data, use external storage systems like S3, GCS, or databases.

Stextxcom=∣k∣+∣v∣+∣dtextid∣+∣ttextid∣+∣rtextid∣+∣tau∣S_{\\text{xcom}} = |k| + |v| + |d_{\\text{id}}| + |t_{\\text{id}}| + |r_{\\text{id}}| + |\\tau|

XCom Retrieval Complexity

Ttextpull=O(logN)+O(∣v∣)quadtextwhereN=texttotalXComentriesT_{\\text{pull}} = O(\\log N) + O(|v|) \\quad \\text{where } N = \\text{total XCom entries}

Here,

  • NN=Total number of XCom entries in the database
  • ∣v∣|v|=Size of the value being retrieved

ThXCom Data Consistency

For a producer task TpT_p and consumer task TcT_c in the same DAG run rr, the XCom guarantee is: if TpT_p completes and pushes (k,v)(k, v) before TcT_c executes xcom_pull(k), then TcT_c observes value vv. This follows from the metadata database's ACID transaction semantics.

The default XCom backend stores serialized data in PostgreSQL. For workloads exceeding 48KB, use S3 or GCS backends. Custom backends must implement the BaseXCom class with set(), get(), and delete() methods.

Use the TaskFlow API (@task decorator) for modern XCom patterns. Tasks can return data directly, and XCom is handled automatically. This reduces boilerplate and improves type safety with Python type hints.

XCom Backends

Default Backend: The default XCom backend stores data in the metadata database. It's simple to configure and works well for small data transfers. However, it has size limitations and can impact database performance with high-volume usage.

S3 Backend: The S3 backend stores XCom data in Amazon S3 buckets. It supports unlimited data sizes and provides better performance for large datasets. The backend handles serialization, compression, and S3-specific operations automatically.

GCS Backend: Similar to the S3 backend, the GCS backend stores data in Google Cloud Storage. It's ideal for GCP-based deployments and provides seamless integration with other GCP services.

Custom Backends: You can implement custom XCom backends for specialized requirements. Custom backends must implement the BaseXCom class and provide methods for pushing, pulling, and deleting XCom data.

Security Considerations

Data Encryption: XCom data should be encrypted when containing sensitive information. Use Airflow's connection encryption for database storage, and enable server-side encryption for cloud storage backends. Implement application-level encryption for highly sensitive data.

Access Control: Airflow's role-based access control applies to XCom data. Users can only access XCom from DAGs they have permission to view. Task-level permissions can further restrict XCom access.

Data Sanitization: Implement data sanitization to prevent sensitive information from being stored in XCom. Use Airflow's variables or external secret management systems for credentials and API keys.

Audit Logging: Enable audit logging for XCom operations to track data access and modifications. This is essential for compliance and security monitoring.

Advanced XCom Patterns

XCom with TaskFlow API: The TaskFlow API provides a modern interface for XCom using decorators and type hints. Tasks can return data directly, and XCom is handled automatically. This pattern simplifies code and reduces boilerplate.

XCom Mapping: XCom supports mapping for dynamic task generation. Tasks can push lists of data, and downstream tasks can be mapped over these lists. This pattern enables dynamic workflow generation based on runtime data.

XCom Cleanup: Implement XCom cleanup strategies to prevent database bloat. Use Airflow's built-in cleanup mechanisms or implement custom cleanup logic in your DAGs.

Cross-DAG XCom: While XCom is typically scoped to a single DAG, you can implement cross-DAG XCom using external storage or Airflow's API. This pattern is useful for complex workflow orchestration across multiple DAGs.

Key Concepts Table

FeatureDescriptionUse CaseLimitations
Default BackendDatabase storageSmall metadata48KB limit
S3 BackendS3 bucket storageLarge datasetsAWS dependency
GCS BackendGCS bucket storageGCP ecosystemGCP dependency
TaskFlow APIDecorator-based XComModern workflowsPython-only
XCom MappingDynamic task generationVariable task countsComplex debugging
Cross-DAG XComInter-DAG communicationComplex orchestrationImplementation complexity
EncryptionData protectionSensitive dataPerformance impact
CleanupData retentionDatabase maintenanceRequires configuration

Code Examples

Custom XCom Backend Implementation

# custom_xcom_backend.py
from typing import Any, Dict, Optional
from airflow.models.xcom import BaseXCom
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
import json
import pickle
from datetime import datetime, timedelta

class S3XComBackend(BaseXCom):
    """
    Custom XCom backend that stores data in S3.

    This backend provides unlimited storage capacity and better
    performance for large datasets compared to the default database backend.
    """

    def __init__(
        self,
        bucket_name: str,
        prefix: str = 'xcom/',
        aws_conn_id: str = 'aws_default',
        compression: bool = True,
        encryption: bool = True,
    ):
        super().__init__()
        self.bucket_name = bucket_name
        self.prefix = prefix
        self.aws_conn_id = aws_conn_id
        self.compression = compression
        self.encryption = encryption

    def _get_s3_key(self, key: str, dag_id: str, run_id: str, task_id: str) -> str:
        """Generate S3 key for XCom data."""
        return f"{self.prefix}{dag_id}/{run_id}/{task_id}/{key}"

    def _serialize_value(self, value: Any) -> bytes:
        """Serialize value for storage."""
        if self.compression:
            import gzip
            data = pickle.dumps(value)
            return gzip.compress(data)
        else:
            return pickle.dumps(value)

    def _deserialize_value(self, data: bytes) -> Any:
        """Deserialize value from storage."""
        if self.compression:
            import gzip
            data = gzip.decompress(data)
        return pickle.loads(data)

    def set(
        self,
        key: str,
        value: Any,
        dag_id: str,
        run_id: str,
        task_id: str,
        **kwargs,
    ) -> None:
        """Store XCom data in S3."""
        hook = S3Hook(aws_conn_id=self.aws_conn_id)
        s3_key = self._get_s3_key(key, dag_id, run_id, task_id)

        # Serialize value
        serialized_value = self._serialize_value(value)

        # Upload to S3
        hook.load_bytes(
            bytes_data=serialized_value,
            key=s3_key,
            bucket_name=self.bucket_name,
            replace=True,
        )

        # Store metadata in database for querying
        super().set(
            key=key,
            value=json.dumps({
                's3_key': s3_key,
                'bucket': self.bucket_name,
                'size': len(serialized_value),
                'timestamp': datetime.now().isoformat(),
            }),
            dag_id=dag_id,
            run_id=run_id,
            task_id=task_id,
            **kwargs,
        )

    def get(
        self,
        key: str,
        dag_id: str,
        run_id: str,
        task_id: str,
        **kwargs,
    ) -> Any:
        """Retrieve XCom data from S3."""
        # Get metadata from database
        metadata = super().get(
            key=key,
            dag_id=dag_id,
            run_id=run_id,
            task_id=task_id,
            **kwargs,
        )

        if metadata is None:
            return None

        # Parse metadata
        metadata_dict = json.loads(metadata)
        s3_key = metadata_dict['s3_key']

        # Download from S3
        hook = S3Hook(aws_conn_id=self.aws_conn_id)
        serialized_value = hook.read_key(
            key=s3_key,
            bucket_name=self.bucket_name,
        )

        # Deserialize value
        return self._deserialize_value(serialized_value)

    def delete(
        self,
        key: str,
        dag_id: str,
        run_id: str,
        task_id: str,
        **kwargs,
    ) -> None:
        """Delete XCom data from S3."""
        # Get metadata from database
        metadata = super().get(
            key=key,
            dag_id=dag_id,
            run_id=run_id,
            task_id=task_id,
            **kwargs,
        )

        if metadata is not None:
            # Parse metadata
            metadata_dict = json.loads(metadata)
            s3_key = metadata_dict['s3_key']

            # Delete from S3
            hook = S3Hook(aws_conn_id=self.aws_conn_id)
            hook.delete_objects(
                bucket=self.bucket_name,
                keys=[s3_key],
            )

        # Delete metadata from database
        super().delete(
            key=key,
            dag_id=dag_id,
            run_id=run_id,
            task_id=task_id,
            **kwargs,
        )

Advanced XCom Patterns

# advanced_xcom_patterns.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models.xcom import XCom
from typing import List, Dict, Any
import json

def producer_task(**context):
    """Producer task that pushes multiple data items."""
    # Simulate data generation
    data_items = [
        {'id': 1, 'value': 'item1', 'timestamp': datetime.now().isoformat()},
        {'id': 2, 'value': 'item2', 'timestamp': datetime.now().isoformat()},
        {'id': 3, 'value': 'item3', 'timestamp': datetime.now().isoformat()},
    ]

    # Push each item separately
    for i, item in enumerate(data_items):
        context['ti'].xcom_push(
            key=f'item_{i}',
            value=item,
        )

    # Push a summary
    context['ti'].xcom_push(
        key='summary',
        value={
            'total_items': len(data_items),
            'processed_at': datetime.now().isoformat(),
        },
    )

def consumer_task(item_index: int, **context):
    """Consumer task that pulls specific data items."""
    # Pull specific item
    item = context['ti'].xcom_pull(
        task_ids='producer',
        key=f'item_{item_index}',
    )

    print(f"Processing item {item_index}: {item}")

    # Process item
    processed_item = {
        **item,
        'processed': True,
        'processed_by': context['task'].task_id,
    }

    return processed_item

def dynamic_task_generator(**context):
    """Generate dynamic tasks based on XCom data."""
    # Pull summary to know how many items were produced
    summary = context['ti'].xcom_pull(
        task_ids='producer',
        key='summary',
    )

    # This function would be used with dynamic task mapping
    return list(range(summary['total_items']))

def aggregator_task(**context):
    """Aggregate results from multiple consumer tasks."""
    # Pull all results using map_index
    results = context['ti'].xcom_pull(
        task_ids='consumer',
        key='return_value',
        map_indices=[0, 1, 2],  # Assuming 3 items
    )

    # Aggregate results
    aggregated = {
        'total_processed': len(results),
        'items': results,
        'aggregated_at': datetime.now().isoformat(),
    }

    return aggregated

def cross_dag_xcom_push(**context):
    """Push XCom data for cross-DAG communication."""
    # Store in external storage for cross-DAG access
    import boto3
    import json

    s3_client = boto3.client('s3')
    data = {
        'dag_id': context['dag'].dag_id,
        'run_id': context['run'].run_id,
        'data': {'key': 'value'},
        'timestamp': datetime.now().isoformat(),
    }

    s3_client.put_object(
        Bucket='my-xcom-bucket',
        Key=f"cross-dag/{context['dag'].dag_id}/{context['run'].run_id}.json",
        Body=json.dumps(data),
    )

with DAG(
    'advanced_xcom_patterns',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='Advanced XCom communication patterns',
    schedule_interval=timedelta(hours=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['xcom', 'advanced'],
) as dag:

    # Producer task
    producer = PythonOperator(
        task_id='producer',
        python_callable=producer_task,
    )

    # Consumer tasks for each item
    consumer_tasks = []
    for i in range(3):
        consumer = PythonOperator(
            task_id=f'consumer_{i}',
            python_callable=consumer_task,
            op_kwargs={'item_index': i},
        )
        consumer_tasks.append(consumer)

    # Aggregator task
    aggregator = PythonOperator(
        task_id='aggregator',
        python_callable=aggregator_task,
    )

    # Cross-DAG XCom push
    cross_dag = PythonOperator(
        task_id='cross_dag_push',
        python_callable=cross_dag_xcom_push,
    )

    # Set dependencies
    producer >> consumer_tasks
    consumer_tasks >> aggregator
    aggregator >> cross_dag

XCom with TaskFlow API

# xcom_taskflow.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task, dag
from typing import List, Dict, Any
import json

@dag(
    schedule_interval=timedelta(hours=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['xcom', 'taskflow'],
)
def xcom_taskflow_example():
    """Example DAG using TaskFlow API for XCom."""

    @task
    def extract_data() -> List[Dict[str, Any]]:
        """Extract data from source."""
        # Simulate data extraction
        return [
            {'id': 1, 'name': 'Alice', 'age': 30},
            {'id': 2, 'name': 'Bob', 'age': 25},
            {'id': 3, 'name': 'Charlie', 'age': 35},
        ]

    @task
    def transform_data(raw_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Transform extracted data."""
        # Transform data
        transformed = []
        for record in raw_data:
            transformed.append({
                **record,
                'age_group': 'young' if record['age'] < 30 else 'adult',
                'processed_at': datetime.now().isoformat(),
            })
        return transformed

    @task
    def validate_data(data: List[Dict[str, Any]]) -> bool:
        """Validate transformed data."""
        # Validation logic
        return len(data) > 0 and all('name' in item for item in data)

    @task
    def load_data(data: List[Dict[str, Any]], is_valid: bool) -> Dict[str, Any]:
        """Load validated data to target."""
        if not is_valid:
            raise ValueError("Data validation failed")

        # Simulate loading
        return {
            'records_loaded': len(data),
            'loaded_at': datetime.now().isoformat(),
            'status': 'success',
        }

    @task
    def send_notification(loading_result: Dict[str, Any]) -> None:
        """Send notification about loading completion."""
        print(f"Data loading completed: {loading_result}")

    # Define task dependencies using TaskFlow
    raw_data = extract_data()
    transformed_data = transform_data(raw_data)
    is_valid = validate_data(transformed_data)
    loading_result = load_data(transformed_data, is_valid)
    send_notification(loading_result)

# Instantiate the DAG
xcom_taskflow_dag = xcom_taskflow_example()

XCom Cleanup and Management

# xcom_cleanup.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import XCom
from airflow import settings
from sqlalchemy import and_

def cleanup_old_xcom(max_age_days: int = 7, **context):
    """Clean up old XCom entries from the database."""
    session = settings.Session()

    # Calculate cutoff date
    cutoff_date = datetime.now() - timedelta(days=max_age_days)

    # Delete old XCom entries
    deleted_count = session.query(XCom).filter(
        XCom.timestamp < cutoff_date
    ).delete()

    session.commit()

    print(f"Cleaned up {deleted_count} XCom entries older than {max_age_days} days")
    return deleted_count

def cleanup_xcom_by_dag(dag_id: str, keep_latest: int = 10, **context):
    """Keep only the latest N XCom entries for a specific DAG."""
    session = settings.Session()

    # Get all run_ids for the DAG
    from airflow.models import DagRun
    run_ids = session.query(DagRun.run_id).filter(
        DagRun.dag_id == dag_id
    ).order_by(DagRun.execution_date.desc()).limit(keep_latest).all()

    run_ids = [run_id[0] for run_id in run_ids]

    # Delete XCom entries not in the latest runs
    deleted_count = session.query(XCom).filter(
        and_(
            XCom.dag_id == dag_id,
            ~XCom.run_id.in_(run_ids)
        )
    ).delete()

    session.commit()

    print(f"Cleaned up {deleted_count} old XCom entries for DAG {dag_id}")
    return deleted_count

def compress_large_xcom(max_size_kb: int = 48, **context):
    """Compress large XCom entries."""
    session = settings.Session()

    # Find large XCom entries
    large_entries = session.query(XCom).filter(
        XCom.value.isnot(None)
    ).all()

    compressed_count = 0
    for entry in large_entries:
        if len(entry.value) > max_size_kb * 1024:  # Convert KB to bytes
            # Compress the value
            import gzip
            compressed_value = gzip.compress(entry.value)

            # Update the entry
            entry.value = compressed_value
            compressed_count += 1

    session.commit()

    print(f"Compressed {compressed_count} large XCom entries")
    return compressed_count

with DAG(
    'xcom_cleanup_dag',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='XCom cleanup and management',
    schedule_interval='0 2 * * *',  # Run daily at 2 AM
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['xcom', 'maintenance'],
) as dag:

    # Cleanup old XCom entries
    cleanup_old = PythonOperator(
        task_id='cleanup_old_xcom',
        python_callable=cleanup_old_xcom,
        op_kwargs={'max_age_days': 7},
    )

    # Cleanup XCom for specific DAGs
    cleanup_dags = PythonOperator(
        task_id='cleanup_xcom_by_dag',
        python_callable=cleanup_xcom_by_dag,
        op_kwargs={
            'dag_id': 'example_dag',
            'keep_latest': 10,
        },
    )

    # Compress large XCom entries
    compress_large = PythonOperator(
        task_id='compress_large_xcom',
        python_callable=compress_large_xcom,
        op_kwargs={'max_size_kb': 48},
    )

    # Set dependencies
    cleanup_old >> cleanup_dags >> compress_large

Performance Metrics

MetricDescriptionOptimization Strategy
XCom SizeData size per entryUse external storage for large data
Push/Pull LatencyTime for XCom operationsOptimize serialization, use caching
Database LoadMetadata DB impactUse custom backends, implement cleanup
Memory UsageXCom memory footprintStream large data, use pagination
Cleanup FrequencyHow often XCom is cleanedImplement automated cleanup
Encryption OverheadTime for encryption operationsBalance security vs performance
Compression RatioData compression efficiencyUse appropriate compression algorithms
Access PatternsXCom usage frequencyOptimize based on usage patterns

Best Practices

  1. Size Management: Keep XCom data small (< 48KB). Use external storage for larger datasets. Implement data compression for better performance.

  2. Security: Encrypt sensitive XCom data. Use secure backends for production. Implement access controls and audit logging.

  3. Cleanup Strategy: Implement automated XCom cleanup to prevent database bloat. Use appropriate retention policies based on data requirements.

  4. Backend Selection: Choose the appropriate XCom backend based on data size and requirements. Use S3/GCS backends for large datasets.

  5. Performance Optimization: Use efficient serialization formats. Implement caching for frequently accessed XCom data. Avoid unnecessary XCom operations.

  6. Error Handling: Implement proper error handling for XCom operations. Handle serialization/deserialization errors gracefully.

  7. Monitoring: Monitor XCom usage and performance. Track database impact and storage usage. Set up alerts for anomalies.

  8. Documentation: Document XCom usage patterns in your DAGs. Explain data formats and expected sizes. Provide examples for common use cases.

  9. Testing: Test XCom operations in isolation. Verify serialization/deserialization. Test cleanup and maintenance operations.

  10. Best Use Cases: Use XCom for small metadata and coordination. Avoid using XCom for large data transfers or complex data structures.

Key Takeaways:

  • XCom provides key-value data exchange with ACID consistency guarantees
  • Default backend limits values to 48KB; use S3/GCS for larger payloads
  • XCom retrieval complexity is O(log⁑N+∣v∣)O(\log N + |v|) where NN is total entries
  • The TaskFlow API simplifies XCom with decorator-based push/pull
  • Custom XCom backends implement BaseXCom with set(), get(), delete()
  • Implement XCom cleanup to prevent database bloat in production

See also: Kafka Connect (kafka/03), PySpark Submit (pyspark/19), Data Engineering Orchestration (data-engineering/017)

Advertisement

Need Expert Airflow Help?

Get personalized DAG design, scheduling optimization, or production Airflow consulting.

Advertisement