Apache Airflow Operators and Hooks

Free Lesson

Advertisement

Apache Airflow Operators and Hooks

Architecture Diagram

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    OPERATOR HIERARCHY                                        β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    BASE OPERATOR                                    β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  BaseOperator                                                β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ task_id                                                 β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ owner                                                   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ retries                                                 β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ retry_delay                                             β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ execution_timeout                                       β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ resources                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  └── on_failure_callback                                     β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                    β”‚                                        β”‚
β”‚                                    β–Ό                                        β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    OPERATOR CATEGORIES                              β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚   β”‚
β”‚  β”‚  β”‚   Action    β”‚ β”‚  Transfer   β”‚ β”‚   Sensors   β”‚ β”‚  External   β”‚ β”‚   β”‚
β”‚  β”‚  β”‚  Operators  β”‚ β”‚  Operators  β”‚ β”‚             β”‚ β”‚  Services   β”‚ β”‚   β”‚
β”‚  β”‚  β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ β”‚   β”‚
β”‚  β”‚  β”‚Python       β”‚ β”‚S3ToRedshift β”‚ β”‚FileSensor   β”‚ β”‚SlackOperatorβ”‚ β”‚   β”‚
β”‚  β”‚  β”‚Bash         β”‚ β”‚GCSToLocal   β”‚ β”‚HttpSensor   β”‚ β”‚EmailOperatorβ”‚ β”‚   β”‚
β”‚  β”‚  β”‚Email        β”‚ β”‚SFTPToS3     β”‚ β”‚SqlSensor    β”‚ β”‚JiraOperator β”‚ β”‚   β”‚
β”‚  β”‚  β”‚Dummy        β”‚ β”‚BigQuery     β”‚ β”‚TimeSensor   β”‚ β”‚GitHubOperatorβ”‚ β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    HOOK ARCHITECTURE                                          β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    HOOK TYPES                                        β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Database Hooks                                              β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ PostgresHook                                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ MySqlHook                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ SqliteHook                                              β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  └── Custom DB Hooks                                         β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Cloud Storage Hooks                                         β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ S3Hook                                                  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ GCSHook                                                 β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ AzureBlobStorageHook                                    β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  └── Custom Storage Hooks                                    β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Messaging & Notification Hooks                              β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ SlackWebhookHook                                        β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ SendGridHook                                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ PagerDutyHook                                           β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  └── Custom Notification Hooks                               β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    HOOK USAGE PATTERN                               β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚   β”‚
β”‚  β”‚  β”‚ Operator│─────▢│  Hook   │─────▢│Connection│─────▢│External β”‚ β”‚   β”‚
β”‚  β”‚  β”‚         β”‚      β”‚         β”‚      β”‚  (DB)   β”‚      β”‚ Service β”‚ β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β€’ Operators use Hooks for external communication                  β”‚   β”‚
β”‚  β”‚  β€’ Hooks manage connection details and client creation             β”‚   β”‚
β”‚  β”‚  β€’ Connections stored in metadata database                         β”‚   β”‚
β”‚  β”‚  β€’ Hooks handle retries and error handling                         β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    CUSTOM OPERATOR DEVELOPMENT                               β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    OPERATOR COMPONENTS                               β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  1. Operator Class                                          β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     β”œβ”€β”€ __init__(parameters)                                β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     β”œβ”€β”€ execute(context)                                    β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     β”œβ”€β”€ on_kill()                                           β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     └── serialize()                                         β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  2. Template Fields                                         β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     └── template_fields = ('param1', 'param2')              β”‚   β”‚   β”‚
β”‚  β”‚  β”‚         (Enables Jinja templating for parameters)            β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  3. UI Color & Icon                                         β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     β”œβ”€β”€ ui_color = '#ff0000'                                 β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     └── ui_fgcolor = '#ffffff'                               β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  4. Required Resources                                      β”‚   β”‚   β”‚
β”‚  β”‚  β”‚     └── resources = {'cpu': 2, 'memory': '4GB'}             β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    DEVELOPMENT WORKFLOW                              β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”‚   β”‚
β”‚  β”‚  β”‚ Design  │──▢│Develop  │──▢│  Test   │──▢│ Deploy  β”‚          β”‚   β”‚
β”‚  β”‚  β”‚         β”‚   β”‚         β”‚   β”‚         β”‚   β”‚         β”‚          β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜          β”‚   β”‚
β”‚  β”‚      β”‚            β”‚             β”‚             β”‚                   β”‚   β”‚
β”‚  β”‚      β–Ό            β–Ό             β–Ό             β–Ό                   β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”‚   β”‚
β”‚  β”‚  β”‚Define   β”‚ β”‚Implementβ”‚ β”‚Unit &   β”‚ β”‚Package  β”‚              β”‚   β”‚
β”‚  β”‚  β”‚Interfaceβ”‚ β”‚Logic    β”‚ β”‚Integrationβ”‚ β”‚& Distributeβ”‚          β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜              β”‚   β”‚
β”‚  β”‚                                                                     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Formal Definitions

DfOperator

An operator is an atomic unit of work in Airflow. Formally, an operator O=(tid,execute,params)O = (t_{\text{id}}, \text{execute}, \text{params}) consists of a unique task identifier, an execution function, and configuration parameters. The execute(context) method performs the actual computation.

DfHook

A hook manages connection details and client creation for external systems. A hook H=(C,client,methods)H = (C, \text{client}, \text{methods}) encapsulates a connection object CC, a client instance, and methods for interacting with the external service. Hooks handle retries, authentication, and connection pooling.

DfSensor

A sensor is a specialized operator that polls an external condition at intervals Ξ”tpoke\Delta t_{\text{poke}} until the condition is met or a timeout Ο„timeout\tau_{\text{timeout}} is reached. It returns True when satisfied, False otherwise.

Detailed Explanation

Built-in Operators

Airflow provides a rich set of built-in operators for common tasks. These operators handle the complexity of interacting with external systems while providing a consistent interface for DAG authors.

Action Operators: These operators perform specific actions like executing Python code (PythonOperator), running shell commands (BashOperator), sending emails (EmailOperator), or executing dummy tasks (DummyOperator). They are the building blocks of most workflows.

Transfer Operators: These operators move data between systems. Examples include S3ToRedshiftOperator, GCSToLocalFilesystemOperator, and PostgresToGCSOperator. They handle data serialization, compression, and error handling for data movement operations.

Sensors: Sensors are specialized operators that wait for certain conditions to be met before proceeding. They poll external systems at regular intervals until a condition is satisfied or a timeout is reached.

External Service Operators: These operators interact with external APIs and services like Slack, Jira, GitHub, and many others. They provide a standardized interface for common API operations.

Hooks

Hooks are the low-level building blocks that operators use to interact with external systems. They manage connection details, authentication, and client creation. Hooks abstract the complexity of connecting to various services and provide a consistent interface.

Connection Management: Hooks use Airflow's connection system to store and retrieve credentials. Connections are stored in the metadata database and can be managed through the UI or CLI. Hooks handle connection pooling, retry logic, and error handling.

Common Hook Patterns: Most hooks follow similar patterns: initialization with connection parameters, client creation, and methods for common operations. For example, S3Hook provides methods for uploading, downloading, and listing objects in S3 buckets.

Custom Hooks: You can create custom hooks for internal services or specialized use cases. Custom hooks inherit from BaseHook and implement connection logic for your specific needs.

Custom Operator Development

Creating custom operators allows you to encapsulate complex business logic into reusable components. A well-designed operator should be idempotent, handle errors gracefully, and provide clear documentation.

wn=bcdot2nquadtextforattemptn=0,1,2,ldots,Nmaxw_n = b \\cdot 2^n \\quad \\text{for attempt } n = 0, 1, 2, \\ldots, N_{\\max}

Total Retry Window

Wtexttotal=sumn=0Nmaxbcdot2n=b(2Nmax+1βˆ’1)W_{\\text{total}} = \\sum_{n=0}^{N_{\\max}} b \\cdot 2^n = b(2^{N_{\\max}+1} - 1)

Here,

  • WtotalW_{\text{total}}=Maximum total time spent retrying

ThIdempotency for Safe Retries

An operator OO is retry-safe if and only if it is idempotent: O(O(x))=O(x)O(O(x)) = O(x). Without idempotency, retries may cause data corruption, duplicate side effects, or inconsistent state. Implementation: Use unique identifiers, upsert operations, and atomic transactions to achieve idempotency.

Hooks manage connection pooling automatically. For high-throughput scenarios, configure pool_size and max_overflow in the hook's connection parameters. This prevents connection exhaustion under concurrent task execution.

When implementing custom operators, always set template_fields for parameters that support Jinja templating. This enables runtime parameter injection and makes operators more flexible across different DAG configurations.

Operator Structure: Custom operators inherit from BaseOperator and implement the execute method. The constructor accepts configuration parameters, and the execute method performs the actual work. Use template_fields to enable Jinja templating for parameters.

Serialization: For dynamic DAGs, implement serialize and serialize_downstream methods to enable proper serialization of operator arguments. This ensures that the operator can be properly reconstructed in different contexts.

Testing: Write unit tests for custom operators using Airflow's test utilities. Test both success and failure scenarios, and verify that the operator behaves correctly with different input parameters.

Connection and Hook Security

Credential Management: Never hardcode credentials in DAG files. Use Airflow's connection system or environment variables to manage sensitive information. Implement secret backends for production deployments.

Connection Encryption: Ensure that connections use encrypted protocols (HTTPS, SSL/TLS) when possible. Airflow supports encrypted connections for most database hooks.

Access Control: Use Airflow's role-based access control to restrict who can view and modify connections. Implement least-privilege principles for hook access.

Key Concepts Table

ComponentPurposeExampleUse Case
BaseOperatorBase class for all operatorsCustom operatorsExtending functionality
PythonOperatorExecute Python functionsData processingCustom logic execution
BashOperatorExecute shell commandsSystem operationsScript execution
SensorsWait for conditionsFile availabilityExternal dependency
Transfer OperatorsMove data between systemsS3 to RedshiftData movement
HooksConnect to external servicesS3HookService integration
ConnectionsStore credentialsDatabase connectionsSecure access
TemplatesDynamic parameters{{ ds }}Runtime configuration

Code Examples

Custom Operator with Advanced Features

# custom_operator.py
from typing import Any, Dict, Optional
from datetime import datetime
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
import requests
import json

class DataValidationOperator(BaseOperator):
    """
    Custom operator for data validation with multiple validation rules.

    This operator validates data against defined rules and generates
    validation reports.

    :param data_source: Source of data to validate
    :param validation_rules: Dictionary of validation rules
    :param report_destination: Where to store validation report
    :param alert_on_failure: Whether to send alerts on validation failure
    """

    # Fields that support Jinja templating
    template_fields = ('data_source', 'report_destination')
    ui_color = '#4CAF50'
    ui_fgcolor = '#FFFFFF'

    @apply_defaults
    def __init__(
        self,
        data_source: str,
        validation_rules: Dict[str, Any],
        report_destination: str = '/tmp/validation_report.json',
        alert_on_failure: bool = True,
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.data_source = data_source
        self.validation_rules = validation_rules
        self.report_destination = report_destination
        self.alert_on_failure = alert_on_failure

    def execute(self, context: Dict[str, Any]) -> Any:
        """Execute the validation logic."""
        self.log.info(f"Starting validation for data source: {self.data_source}")

        try:
            # Load data
            data = self._load_data()

            # Run validation rules
            validation_results = self._validate_data(data)

            # Generate report
            report = self._generate_report(validation_results)

            # Store report
            self._store_report(report)

            # Check if validation passed
            if not validation_results['passed']:
                if self.alert_on_failure:
                    self._send_alert(validation_results)
                raise AirflowException(
                    f"Data validation failed: {validation_results['summary']}"
                )

            self.log.info("Data validation completed successfully")
            return report

        except Exception as e:
            self.log.error(f"Validation failed with error: {str(e)}")
            raise

    def _load_data(self) -> Any:
        """Load data from the source."""
        # Implementation depends on data source type
        if self.data_source.startswith('s3://'):
            from airflow.providers.amazon.aws.hooks.s3 import S3Hook
            hook = S3Hook(aws_conn_id='aws_default')
            # Load from S3
        elif self.data_source.startswith('gs://'):
            from airflow.providers.google.cloud.hooks.gcs import GCSHook
            hook = GCSHook(gcp_conn_id='google_cloud_default')
            # Load from GCS
        else:
            # Load from local file
            with open(self.data_source, 'r') as f:
                return json.load(f)

    def _validate_data(self, data: Any) -> Dict[str, Any]:
        """Run validation rules against the data."""
        results = {
            'passed': True,
            'rules_checked': 0,
            'rules_passed': 0,
            'rules_failed': 0,
            'failed_rules': [],
            'summary': '',
        }

        for rule_name, rule_config in self.validation_rules.items():
            results['rules_checked'] += 1

            try:
                rule_passed = self._apply_rule(data, rule_config)
                if rule_passed:
                    results['rules_passed'] += 1
                else:
                    results['rules_failed'] += 1
                    results['passed'] = False
                    results['failed_rules'].append({
                        'rule': rule_name,
                        'message': rule_config.get('failure_message', 'Rule failed'),
                    })
            except Exception as e:
                results['rules_failed'] += 1
                results['passed'] = False
                results['failed_rules'].append({
                    'rule': rule_name,
                    'error': str(e),
                })

        results['summary'] = (
            f"{results['rules_passed']}/{results['rules_checked']} rules passed"
        )

        return results

    def _apply_rule(self, data: Any, rule_config: Dict[str, Any]) -> bool:
        """Apply a single validation rule."""
        rule_type = rule_config.get('type')

        if rule_type == 'not_null':
            return data is not None
        elif rule_type == 'range':
            min_val = rule_config.get('min')
            max_val = rule_config.get('max')
            return min_val <= data <= max_val
        elif rule_type == 'regex':
            import re
            pattern = rule_config.get('pattern')
            return bool(re.match(pattern, str(data)))
        elif rule_type == 'custom':
            # Custom validation logic
            custom_func = rule_config.get('function')
            return custom_func(data)
        else:
            raise ValueError(f"Unknown rule type: {rule_type}")

    def _generate_report(self, validation_results: Dict[str, Any]) -> Dict[str, Any]:
        """Generate a detailed validation report."""
        return {
            'timestamp': datetime.now().isoformat(),
            'data_source': self.data_source,
            'results': validation_results,
            'metadata': {
                'operator': self.__class__.__name__,
                'task_id': self.task_id,
            },
        }

    def _store_report(self, report: Dict[str, Any]) -> None:
        """Store the validation report."""
        with open(self.report_destination, 'w') as f:
            json.dump(report, f, indent=2)

    def _send_alert(self, validation_results: Dict[str, Any]) -> None:
        """Send alert about validation failure."""
        from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook

        hook = SlackWebhookHook(slack_webhook_conn_id='slack_webhook')
        message = (
            f"Data Validation Failed\n"
            f"Source: {self.data_source}\n"
            f"Summary: {validation_results['summary']}\n"
            f"Failed Rules: {len(validation_results['failed_rules'])}"
        )
        hook.send(message)

    def on_kill(self) -> None:
        """Handle operator termination."""
        self.log.info("Operator was killed")

Custom Hook Implementation

# custom_hook.py
from typing import Any, Dict, Optional
from airflow.hooks.base import BaseHook
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class CustomAPIHook(BaseHook):
    """
    Custom hook for interacting with external APIs.

    This hook provides a standardized interface for API calls with
    retry logic, authentication, and error handling.

    :param api_conn_id: Airflow connection ID for API credentials
    :param api_base_url: Base URL for API endpoints
    :param timeout: Request timeout in seconds
    :param max_retries: Maximum number of retry attempts
    """

    conn_name_attr = 'api_conn_id'
    default_conn_name = 'custom_api_default'
    conn_type = 'custom_api'
    hook_name = 'Custom API'

    def __init__(
        self,
        api_conn_id: str = default_conn_name,
        api_base_url: Optional[str] = None,
        timeout: int = 30,
        max_retries: int = 3,
    ):
        super().__init__()
        self.api_conn_id = api_conn_id
        self.api_base_url = api_base_url or self._get_base_url()
        self.timeout = timeout
        self.max_retries = max_retries
        self._session = None

    def get_conn(self) -> requests.Session:
        """Get a requests session with retry logic."""
        if self._session is None:
            self._session = requests.Session()

            # Configure retry strategy
            retry_strategy = Retry(
                total=self.max_retries,
                backoff_factor=1,
                status_forcelist=[429, 500, 502, 503, 504],
            )

            adapter = HTTPAdapter(max_retries=retry_strategy)
            self._session.mount("http://", adapter)
            self._session.mount("https://", adapter)

            # Set authentication headers
            headers = self._get_auth_headers()
            self._session.headers.update(headers)

        return self._session

    def _get_base_url(self) -> str:
        """Get base URL from connection."""
        conn = self.get_connection(self.api_conn_id)
        return f"{conn.schema}://{conn.host}:{conn.port}"

    def _get_auth_headers(self) -> Dict[str, str]:
        """Get authentication headers from connection."""
        conn = self.get_connection(self.api_conn_id)
        password = conn.get_password()

        return {
            'Authorization': f'Bearer {password}',
            'Content-Type': 'application/json',
        }

    def make_request(
        self,
        method: str,
        endpoint: str,
        data: Optional[Dict[str, Any]] = None,
        params: Optional[Dict[str, Any]] = None,
    ) -> Dict[str, Any]:
        """
        Make an API request.

        :param method: HTTP method (GET, POST, PUT, DELETE)
        :param endpoint: API endpoint
        :param data: Request body data
        :param params: Query parameters
        :return: API response
        """
        session = self.get_conn()
        url = f"{self.api_base_url}{endpoint}"

        try:
            response = session.request(
                method=method,
                url=url,
                json=data,
                params=params,
                timeout=self.timeout,
            )
            response.raise_for_status()

            return response.json()

        except requests.exceptions.RequestException as e:
            self.log.error(f"API request failed: {str(e)}")
            raise

    def get(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """Make a GET request."""
        return self.make_request('GET', endpoint, params=params)

    def post(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
        """Make a POST request."""
        return self.make_request('POST', endpoint, data=data)

    def put(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
        """Make a PUT request."""
        return self.make_request('PUT', endpoint, data=data)

    def delete(self, endpoint: str) -> Dict[str, Any]:
        """Make a DELETE request."""
        return self.make_request('DELETE', endpoint)

    def test_connection(self) -> bool:
        """Test the connection to the API."""
        try:
            response = self.get('/health')
            return response.get('status') == 'ok'
        except Exception:
            return False

Operator with Hook Integration

# operator_with_hook.py
from typing import Any, Dict, Optional
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from datetime import datetime

class DataSynchronizationOperator(BaseOperator):
    """
    Operator for synchronizing data between systems using hooks.

    This operator demonstrates how to create operators that integrate
    with multiple hooks for complex data operations.

    :param source_system: Source system identifier
    :param target_system: Target system identifier
    :param sync_config: Synchronization configuration
    :param batch_size: Number of records to process in each batch
    """

    template_fields = ('source_system', 'target_system', 'sync_config')

    @apply_defaults
    def __init__(
        self,
        source_system: str,
        target_system: str,
        sync_config: Dict[str, Any],
        batch_size: int = 1000,
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.source_system = source_system
        self.target_system = target_system
        self.sync_config = sync_config
        self.batch_size = batch_size

    def execute(self, context: Dict[str, Any]) -> Any:
        """Execute the data synchronization."""
        self.log.info(
            f"Starting sync from {self.source_system} to {self.target_system}"
        )

        # Initialize hooks
        source_hook = self._get_source_hook()
        target_hook = self._get_target_hook()

        try:
            # Extract data from source
            source_data = self._extract_data(source_hook)

            # Transform data if needed
            transformed_data = self._transform_data(source_data)

            # Load data to target
            sync_results = self._load_data(target_hook, transformed_data)

            # Validate synchronization
            self._validate_sync(source_hook, target_hook, sync_results)

            self.log.info("Data synchronization completed successfully")
            return sync_results

        except Exception as e:
            self.log.error(f"Synchronization failed: {str(e)}")
            raise

    def _get_source_hook(self) -> Any:
        """Get appropriate hook for source system."""
        if self.source_system == 'postgresql':
            from airflow.providers.postgres.hooks.postgres import PostgresHook
            return PostgresHook(postgres_conn_id='source_postgres')
        elif self.source_system == 'mysql':
            from airflow.providers.mysql.hooks.mysql import MySqlHook
            return MySqlHook(mysql_conn_id='source_mysql')
        elif self.source_system == 's3':
            from airflow.providers.amazon.aws.hooks.s3 import S3Hook
            return S3Hook(aws_conn_id='source_aws')
        else:
            raise ValueError(f"Unsupported source system: {self.source_system}")

    def _get_target_hook(self) -> Any:
        """Get appropriate hook for target system."""
        if self.target_system == 'postgresql':
            from airflow.providers.postgres.hooks.postgres import PostgresHook
            return PostgresHook(postgres_conn_id='target_postgres')
        elif self.target_system == 'redshift':
            from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook
            return RedshiftSQLHook(redshift_conn_id='target_redshift')
        elif self.target_system == 'gcs':
            from airflow.providers.google.cloud.hooks.gcs import GCSHook
            return GCSHook(gcp_conn_id='target_gcs')
        else:
            raise ValueError(f"Unsupported target system: {self.target_system}")

    def _extract_data(self, source_hook: Any) -> Any:
        """Extract data from source system."""
        query = self.sync_config.get('extract_query')

        if hasattr(source_hook, 'get_records'):
            return source_hook.get_records(query)
        elif hasattr(source_hook, 'read_key'):
            return source_hook.read_key(self.sync_config.get('source_key'))
        else:
            raise ValueError("Source hook does not support data extraction")

    def _transform_data(self, data: Any) -> Any:
        """Transform data if needed."""
        transformations = self.sync_config.get('transformations', [])

        for transformation in transformations:
            if transformation['type'] == 'filter':
                data = [row for row in data if self._apply_filter(row, transformation)]
            elif transformation['type'] == 'map':
                data = [self._apply_mapping(row, transformation) for row in data]
            elif transformation['type'] == 'aggregate':
                data = self._apply_aggregation(data, transformation)

        return data

    def _load_data(self, target_hook: Any, data: Any) -> Dict[str, Any]:
        """Load data to target system."""
        results = {'records_loaded': 0, 'errors': []}

        # Process in batches
        for i in range(0, len(data), self.batch_size):
            batch = data[i:i + self.batch_size]

            try:
                if hasattr(target_hook, 'insert_rows'):
                    target_hook.insert_rows(
                        table=self.sync_config.get('target_table'),
                        rows=batch,
                    )
                elif hasattr(target_hook, 'upload'):
                    target_hook.upload(
                        bucket_name=self.sync_config.get('target_bucket'),
                        object_name=f"batch_{i}.json",
                        data=str(batch),
                    )

                results['records_loaded'] += len(batch)

            except Exception as e:
                results['errors'].append({
                    'batch_start': i,
                    'error': str(e),
                })

        return results

    def _validate_sync(
        self,
        source_hook: Any,
        target_hook: Any,
        sync_results: Dict[str, Any]
    ) -> None:
        """Validate that synchronization was successful."""
        if sync_results['errors']:
            raise ValueError(
                f"Sync completed with errors: {sync_results['errors']}"
            )

        # Additional validation logic can be added here
        self.log.info(
            f"Sync validation passed: {sync_results['records_loaded']} records loaded"
        )

    def _apply_filter(self, row: Any, transformation: Dict[str, Any]) -> bool:
        """Apply filter transformation."""
        column = transformation.get('column')
        operator = transformation.get('operator')
        value = transformation.get('value')

        if operator == 'eq':
            return row[column] == value
        elif operator == 'gt':
            return row[column] > value
        elif operator == 'lt':
            return row[column] < value
        elif operator == 'contains':
            return value in str(row[column])
        else:
            return True

    def _apply_mapping(self, row: Any, transformation: Dict[str, Any]) -> Dict[str, Any]:
        """Apply mapping transformation."""
        mapping = transformation.get('mapping', {})
        return {mapping.get(k, k): v for k, v in row.items()}

    def _apply_aggregation(self, data: Any, transformation: Dict[str, Any]) -> Any:
        """Apply aggregation transformation."""
        group_by = transformation.get('group_by')
        aggregate_column = transformation.get('aggregate_column')
        aggregate_func = transformation.get('aggregate_function', 'sum')

        groups = {}
        for row in data:
            key = row[group_by]
            if key not in groups:
                groups[key] = []
            groups[key].append(row[aggregate_column])

        result = []
        for key, values in groups.items():
            if aggregate_func == 'sum':
                aggregated = sum(values)
            elif aggregate_func == 'avg':
                aggregated = sum(values) / len(values)
            elif aggregate_func == 'count':
                aggregated = len(values)
            else:
                aggregated = values

            result.append({group_by: key, aggregate_column: aggregated})

        return result

Performance Metrics

MetricDescriptionOptimization Strategy
Operator Execution TimeTime to complete operator executionOptimize external calls, use batching
Hook Connection TimeTime to establish connectionConnection pooling, caching
API Response TimeExternal API response timeAsync operations, retry logic
Data Transfer RateAmount of data movedCompression, streaming, batching
Error RatePercentage of failed operationsRetry logic, circuit breakers
Memory UsageOperator memory footprintStreaming processing, pagination
CPU UtilizationProcessing power usageParallel execution, optimization
Connection Pool SizeActive connectionsPool tuning, monitoring

Best Practices

  1. Operator Design: Keep operators focused on single responsibilities. Implement idempotency to handle retries safely. Use template fields for dynamic parameters.

  2. Hook Implementation: Follow the hook pattern for connection management. Implement proper error handling and retry logic. Use connection pooling for high-throughput scenarios.

  3. Error Handling: Implement comprehensive error handling with meaningful error messages. Use Airflow's exception classes for proper error propagation.

  4. Testing: Write unit tests for operators and hooks. Mock external dependencies to ensure test isolation. Test both success and failure scenarios.

  5. Documentation: Provide clear documentation for custom operators and hooks. Include usage examples and parameter descriptions. Document any external dependencies.

  6. Security: Never hardcode credentials. Use Airflow's connection system for credential management. Implement proper access controls for sensitive operations.

  7. Performance: Optimize external calls to minimize latency. Use batching for bulk operations. Implement connection pooling for high-throughput scenarios.

  8. Monitoring: Add logging for important operations. Implement custom metrics for monitoring. Use Airflow's callback system for alerting.

  9. Maintainability: Write clean, readable code with proper separation of concerns. Follow Python coding standards. Use type hints for better code clarity.

  10. Reusability: Design operators and hooks for reuse across multiple DAGs. Use configuration to customize behavior. Implement proper abstraction layers.

Key Takeaways:

  • Operators encapsulate unit-of-work logic; hooks manage external connections
  • Idempotency (O(O(x))=O(x)O(O(x)) = O(x)) is required for safe retries
  • Total retry window grows exponentially: W=b(2Nmax⁑+1βˆ’1)W = b(2^{N_{\max}+1} - 1)
  • Hooks abstract connection details, authentication, and client creation
  • Custom operators must implement execute() and define template_fields
  • Never hardcode credentials; use Airflow's connection system

See also: Kafka Connect (kafka/03), PySpark Submit (pyspark/19), Data Engineering Orchestration (data-engineering/017)

Advertisement

Need Expert Airflow Help?

Get personalized DAG design, scheduling optimization, or production Airflow consulting.

Advertisement