Apache Airflow Operators and Hooks
Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β OPERATOR HIERARCHY β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β BASE OPERATOR β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β BaseOperator β β β
β β β βββ task_id β β β
β β β βββ owner β β β
β β β βββ retries β β β
β β β βββ retry_delay β β β
β β β βββ execution_timeout β β β
β β β βββ resources β β β
β β β βββ on_failure_callback β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β OPERATOR CATEGORIES β β
β β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β β β Action β β Transfer β β Sensors β β External β β β
β β β Operators β β Operators β β β β Services β β β
β β βββββββββββββββ€ βββββββββββββββ€ βββββββββββββββ€ βββββββββββββββ€ β β
β β βPython β βS3ToRedshift β βFileSensor β βSlackOperatorβ β β
β β βBash β βGCSToLocal β βHttpSensor β βEmailOperatorβ β β
β β βEmail β βSFTPToS3 β βSqlSensor β βJiraOperator β β β
β β βDummy β βBigQuery β βTimeSensor β βGitHubOperatorβ β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β HOOK ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β HOOK TYPES β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Database Hooks β β β
β β β βββ PostgresHook β β β
β β β βββ MySqlHook β β β
β β β βββ SqliteHook β β β
β β β βββ Custom DB Hooks β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Cloud Storage Hooks β β β
β β β βββ S3Hook β β β
β β β βββ GCSHook β β β
β β β βββ AzureBlobStorageHook β β β
β β β βββ Custom Storage Hooks β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Messaging & Notification Hooks β β β
β β β βββ SlackWebhookHook β β β
β β β βββ SendGridHook β β β
β β β βββ PagerDutyHook β β β
β β β βββ Custom Notification Hooks β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β HOOK USAGE PATTERN β β
β β β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β β OperatorβββββββΆβ Hook βββββββΆβConnectionβββββββΆβExternal β β β
β β β β β β β (DB) β β Service β β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β β β
β β β’ Operators use Hooks for external communication β β
β β β’ Hooks manage connection details and client creation β β
β β β’ Connections stored in metadata database β β
β β β’ Hooks handle retries and error handling β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β CUSTOM OPERATOR DEVELOPMENT β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β OPERATOR COMPONENTS β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β 1. Operator Class β β β
β β β βββ __init__(parameters) β β β
β β β βββ execute(context) β β β
β β β βββ on_kill() β β β
β β β βββ serialize() β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β 2. Template Fields β β β
β β β βββ template_fields = ('param1', 'param2') β β β
β β β (Enables Jinja templating for parameters) β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β 3. UI Color & Icon β β β
β β β βββ ui_color = '#ff0000' β β β
β β β βββ ui_fgcolor = '#ffffff' β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β 4. Required Resources β β β
β β β βββ resources = {'cpu': 2, 'memory': '4GB'} β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β DEVELOPMENT WORKFLOW β β
β β β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β β Design ββββΆβDevelop ββββΆβ Test ββββΆβ Deploy β β β
β β β β β β β β β β β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β β β β β β β
β β βΌ βΌ βΌ βΌ β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β βDefine β βImplementβ βUnit & β βPackage β β β
β β βInterfaceβ βLogic β βIntegrationβ β& Distributeβ β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Formal Definitions
DfOperator
An operator is an atomic unit of work in Airflow. Formally, an operator consists of a unique task identifier, an execution function, and configuration parameters. The execute(context) method performs the actual computation.
DfHook
A hook manages connection details and client creation for external systems. A hook encapsulates a connection object , a client instance, and methods for interacting with the external service. Hooks handle retries, authentication, and connection pooling.
DfSensor
A sensor is a specialized operator that polls an external condition at intervals until the condition is met or a timeout is reached. It returns True when satisfied, False otherwise.
Detailed Explanation
Built-in Operators
Airflow provides a rich set of built-in operators for common tasks. These operators handle the complexity of interacting with external systems while providing a consistent interface for DAG authors.
Action Operators: These operators perform specific actions like executing Python code (PythonOperator), running shell commands (BashOperator), sending emails (EmailOperator), or executing dummy tasks (DummyOperator). They are the building blocks of most workflows.
Transfer Operators: These operators move data between systems. Examples include S3ToRedshiftOperator, GCSToLocalFilesystemOperator, and PostgresToGCSOperator. They handle data serialization, compression, and error handling for data movement operations.
Sensors: Sensors are specialized operators that wait for certain conditions to be met before proceeding. They poll external systems at regular intervals until a condition is satisfied or a timeout is reached.
External Service Operators: These operators interact with external APIs and services like Slack, Jira, GitHub, and many others. They provide a standardized interface for common API operations.
Hooks
Hooks are the low-level building blocks that operators use to interact with external systems. They manage connection details, authentication, and client creation. Hooks abstract the complexity of connecting to various services and provide a consistent interface.
Connection Management: Hooks use Airflow's connection system to store and retrieve credentials. Connections are stored in the metadata database and can be managed through the UI or CLI. Hooks handle connection pooling, retry logic, and error handling.
Common Hook Patterns: Most hooks follow similar patterns: initialization with connection parameters, client creation, and methods for common operations. For example, S3Hook provides methods for uploading, downloading, and listing objects in S3 buckets.
Custom Hooks: You can create custom hooks for internal services or specialized use cases. Custom hooks inherit from BaseHook and implement connection logic for your specific needs.
Custom Operator Development
Creating custom operators allows you to encapsulate complex business logic into reusable components. A well-designed operator should be idempotent, handle errors gracefully, and provide clear documentation.
Total Retry Window
Here,
- =Maximum total time spent retrying
ThIdempotency for Safe Retries
An operator is retry-safe if and only if it is idempotent: . Without idempotency, retries may cause data corruption, duplicate side effects, or inconsistent state. Implementation: Use unique identifiers, upsert operations, and atomic transactions to achieve idempotency.
Hooks manage connection pooling automatically. For high-throughput scenarios, configure pool_size and max_overflow in the hook's connection parameters. This prevents connection exhaustion under concurrent task execution.
When implementing custom operators, always set template_fields for parameters that support Jinja templating. This enables runtime parameter injection and makes operators more flexible across different DAG configurations.
Operator Structure: Custom operators inherit from BaseOperator and implement the execute method. The constructor accepts configuration parameters, and the execute method performs the actual work. Use template_fields to enable Jinja templating for parameters.
Serialization: For dynamic DAGs, implement serialize and serialize_downstream methods to enable proper serialization of operator arguments. This ensures that the operator can be properly reconstructed in different contexts.
Testing: Write unit tests for custom operators using Airflow's test utilities. Test both success and failure scenarios, and verify that the operator behaves correctly with different input parameters.
Connection and Hook Security
Credential Management: Never hardcode credentials in DAG files. Use Airflow's connection system or environment variables to manage sensitive information. Implement secret backends for production deployments.
Connection Encryption: Ensure that connections use encrypted protocols (HTTPS, SSL/TLS) when possible. Airflow supports encrypted connections for most database hooks.
Access Control: Use Airflow's role-based access control to restrict who can view and modify connections. Implement least-privilege principles for hook access.
Key Concepts Table
| Component | Purpose | Example | Use Case |
|---|---|---|---|
| BaseOperator | Base class for all operators | Custom operators | Extending functionality |
| PythonOperator | Execute Python functions | Data processing | Custom logic execution |
| BashOperator | Execute shell commands | System operations | Script execution |
| Sensors | Wait for conditions | File availability | External dependency |
| Transfer Operators | Move data between systems | S3 to Redshift | Data movement |
| Hooks | Connect to external services | S3Hook | Service integration |
| Connections | Store credentials | Database connections | Secure access |
| Templates | Dynamic parameters | {{ ds }} | Runtime configuration |
Code Examples
Custom Operator with Advanced Features
# custom_operator.py
from typing import Any, Dict, Optional
from datetime import datetime
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
import requests
import json
class DataValidationOperator(BaseOperator):
"""
Custom operator for data validation with multiple validation rules.
This operator validates data against defined rules and generates
validation reports.
:param data_source: Source of data to validate
:param validation_rules: Dictionary of validation rules
:param report_destination: Where to store validation report
:param alert_on_failure: Whether to send alerts on validation failure
"""
# Fields that support Jinja templating
template_fields = ('data_source', 'report_destination')
ui_color = '#4CAF50'
ui_fgcolor = '#FFFFFF'
@apply_defaults
def __init__(
self,
data_source: str,
validation_rules: Dict[str, Any],
report_destination: str = '/tmp/validation_report.json',
alert_on_failure: bool = True,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.data_source = data_source
self.validation_rules = validation_rules
self.report_destination = report_destination
self.alert_on_failure = alert_on_failure
def execute(self, context: Dict[str, Any]) -> Any:
"""Execute the validation logic."""
self.log.info(f"Starting validation for data source: {self.data_source}")
try:
# Load data
data = self._load_data()
# Run validation rules
validation_results = self._validate_data(data)
# Generate report
report = self._generate_report(validation_results)
# Store report
self._store_report(report)
# Check if validation passed
if not validation_results['passed']:
if self.alert_on_failure:
self._send_alert(validation_results)
raise AirflowException(
f"Data validation failed: {validation_results['summary']}"
)
self.log.info("Data validation completed successfully")
return report
except Exception as e:
self.log.error(f"Validation failed with error: {str(e)}")
raise
def _load_data(self) -> Any:
"""Load data from the source."""
# Implementation depends on data source type
if self.data_source.startswith('s3://'):
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
hook = S3Hook(aws_conn_id='aws_default')
# Load from S3
elif self.data_source.startswith('gs://'):
from airflow.providers.google.cloud.hooks.gcs import GCSHook
hook = GCSHook(gcp_conn_id='google_cloud_default')
# Load from GCS
else:
# Load from local file
with open(self.data_source, 'r') as f:
return json.load(f)
def _validate_data(self, data: Any) -> Dict[str, Any]:
"""Run validation rules against the data."""
results = {
'passed': True,
'rules_checked': 0,
'rules_passed': 0,
'rules_failed': 0,
'failed_rules': [],
'summary': '',
}
for rule_name, rule_config in self.validation_rules.items():
results['rules_checked'] += 1
try:
rule_passed = self._apply_rule(data, rule_config)
if rule_passed:
results['rules_passed'] += 1
else:
results['rules_failed'] += 1
results['passed'] = False
results['failed_rules'].append({
'rule': rule_name,
'message': rule_config.get('failure_message', 'Rule failed'),
})
except Exception as e:
results['rules_failed'] += 1
results['passed'] = False
results['failed_rules'].append({
'rule': rule_name,
'error': str(e),
})
results['summary'] = (
f"{results['rules_passed']}/{results['rules_checked']} rules passed"
)
return results
def _apply_rule(self, data: Any, rule_config: Dict[str, Any]) -> bool:
"""Apply a single validation rule."""
rule_type = rule_config.get('type')
if rule_type == 'not_null':
return data is not None
elif rule_type == 'range':
min_val = rule_config.get('min')
max_val = rule_config.get('max')
return min_val <= data <= max_val
elif rule_type == 'regex':
import re
pattern = rule_config.get('pattern')
return bool(re.match(pattern, str(data)))
elif rule_type == 'custom':
# Custom validation logic
custom_func = rule_config.get('function')
return custom_func(data)
else:
raise ValueError(f"Unknown rule type: {rule_type}")
def _generate_report(self, validation_results: Dict[str, Any]) -> Dict[str, Any]:
"""Generate a detailed validation report."""
return {
'timestamp': datetime.now().isoformat(),
'data_source': self.data_source,
'results': validation_results,
'metadata': {
'operator': self.__class__.__name__,
'task_id': self.task_id,
},
}
def _store_report(self, report: Dict[str, Any]) -> None:
"""Store the validation report."""
with open(self.report_destination, 'w') as f:
json.dump(report, f, indent=2)
def _send_alert(self, validation_results: Dict[str, Any]) -> None:
"""Send alert about validation failure."""
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
hook = SlackWebhookHook(slack_webhook_conn_id='slack_webhook')
message = (
f"Data Validation Failed\n"
f"Source: {self.data_source}\n"
f"Summary: {validation_results['summary']}\n"
f"Failed Rules: {len(validation_results['failed_rules'])}"
)
hook.send(message)
def on_kill(self) -> None:
"""Handle operator termination."""
self.log.info("Operator was killed")
Custom Hook Implementation
# custom_hook.py
from typing import Any, Dict, Optional
from airflow.hooks.base import BaseHook
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class CustomAPIHook(BaseHook):
"""
Custom hook for interacting with external APIs.
This hook provides a standardized interface for API calls with
retry logic, authentication, and error handling.
:param api_conn_id: Airflow connection ID for API credentials
:param api_base_url: Base URL for API endpoints
:param timeout: Request timeout in seconds
:param max_retries: Maximum number of retry attempts
"""
conn_name_attr = 'api_conn_id'
default_conn_name = 'custom_api_default'
conn_type = 'custom_api'
hook_name = 'Custom API'
def __init__(
self,
api_conn_id: str = default_conn_name,
api_base_url: Optional[str] = None,
timeout: int = 30,
max_retries: int = 3,
):
super().__init__()
self.api_conn_id = api_conn_id
self.api_base_url = api_base_url or self._get_base_url()
self.timeout = timeout
self.max_retries = max_retries
self._session = None
def get_conn(self) -> requests.Session:
"""Get a requests session with retry logic."""
if self._session is None:
self._session = requests.Session()
# Configure retry strategy
retry_strategy = Retry(
total=self.max_retries,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self._session.mount("http://", adapter)
self._session.mount("https://", adapter)
# Set authentication headers
headers = self._get_auth_headers()
self._session.headers.update(headers)
return self._session
def _get_base_url(self) -> str:
"""Get base URL from connection."""
conn = self.get_connection(self.api_conn_id)
return f"{conn.schema}://{conn.host}:{conn.port}"
def _get_auth_headers(self) -> Dict[str, str]:
"""Get authentication headers from connection."""
conn = self.get_connection(self.api_conn_id)
password = conn.get_password()
return {
'Authorization': f'Bearer {password}',
'Content-Type': 'application/json',
}
def make_request(
self,
method: str,
endpoint: str,
data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
Make an API request.
:param method: HTTP method (GET, POST, PUT, DELETE)
:param endpoint: API endpoint
:param data: Request body data
:param params: Query parameters
:return: API response
"""
session = self.get_conn()
url = f"{self.api_base_url}{endpoint}"
try:
response = session.request(
method=method,
url=url,
json=data,
params=params,
timeout=self.timeout,
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
self.log.error(f"API request failed: {str(e)}")
raise
def get(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Make a GET request."""
return self.make_request('GET', endpoint, params=params)
def post(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Make a POST request."""
return self.make_request('POST', endpoint, data=data)
def put(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Make a PUT request."""
return self.make_request('PUT', endpoint, data=data)
def delete(self, endpoint: str) -> Dict[str, Any]:
"""Make a DELETE request."""
return self.make_request('DELETE', endpoint)
def test_connection(self) -> bool:
"""Test the connection to the API."""
try:
response = self.get('/health')
return response.get('status') == 'ok'
except Exception:
return False
Operator with Hook Integration
# operator_with_hook.py
from typing import Any, Dict, Optional
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from datetime import datetime
class DataSynchronizationOperator(BaseOperator):
"""
Operator for synchronizing data between systems using hooks.
This operator demonstrates how to create operators that integrate
with multiple hooks for complex data operations.
:param source_system: Source system identifier
:param target_system: Target system identifier
:param sync_config: Synchronization configuration
:param batch_size: Number of records to process in each batch
"""
template_fields = ('source_system', 'target_system', 'sync_config')
@apply_defaults
def __init__(
self,
source_system: str,
target_system: str,
sync_config: Dict[str, Any],
batch_size: int = 1000,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.source_system = source_system
self.target_system = target_system
self.sync_config = sync_config
self.batch_size = batch_size
def execute(self, context: Dict[str, Any]) -> Any:
"""Execute the data synchronization."""
self.log.info(
f"Starting sync from {self.source_system} to {self.target_system}"
)
# Initialize hooks
source_hook = self._get_source_hook()
target_hook = self._get_target_hook()
try:
# Extract data from source
source_data = self._extract_data(source_hook)
# Transform data if needed
transformed_data = self._transform_data(source_data)
# Load data to target
sync_results = self._load_data(target_hook, transformed_data)
# Validate synchronization
self._validate_sync(source_hook, target_hook, sync_results)
self.log.info("Data synchronization completed successfully")
return sync_results
except Exception as e:
self.log.error(f"Synchronization failed: {str(e)}")
raise
def _get_source_hook(self) -> Any:
"""Get appropriate hook for source system."""
if self.source_system == 'postgresql':
from airflow.providers.postgres.hooks.postgres import PostgresHook
return PostgresHook(postgres_conn_id='source_postgres')
elif self.source_system == 'mysql':
from airflow.providers.mysql.hooks.mysql import MySqlHook
return MySqlHook(mysql_conn_id='source_mysql')
elif self.source_system == 's3':
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
return S3Hook(aws_conn_id='source_aws')
else:
raise ValueError(f"Unsupported source system: {self.source_system}")
def _get_target_hook(self) -> Any:
"""Get appropriate hook for target system."""
if self.target_system == 'postgresql':
from airflow.providers.postgres.hooks.postgres import PostgresHook
return PostgresHook(postgres_conn_id='target_postgres')
elif self.target_system == 'redshift':
from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook
return RedshiftSQLHook(redshift_conn_id='target_redshift')
elif self.target_system == 'gcs':
from airflow.providers.google.cloud.hooks.gcs import GCSHook
return GCSHook(gcp_conn_id='target_gcs')
else:
raise ValueError(f"Unsupported target system: {self.target_system}")
def _extract_data(self, source_hook: Any) -> Any:
"""Extract data from source system."""
query = self.sync_config.get('extract_query')
if hasattr(source_hook, 'get_records'):
return source_hook.get_records(query)
elif hasattr(source_hook, 'read_key'):
return source_hook.read_key(self.sync_config.get('source_key'))
else:
raise ValueError("Source hook does not support data extraction")
def _transform_data(self, data: Any) -> Any:
"""Transform data if needed."""
transformations = self.sync_config.get('transformations', [])
for transformation in transformations:
if transformation['type'] == 'filter':
data = [row for row in data if self._apply_filter(row, transformation)]
elif transformation['type'] == 'map':
data = [self._apply_mapping(row, transformation) for row in data]
elif transformation['type'] == 'aggregate':
data = self._apply_aggregation(data, transformation)
return data
def _load_data(self, target_hook: Any, data: Any) -> Dict[str, Any]:
"""Load data to target system."""
results = {'records_loaded': 0, 'errors': []}
# Process in batches
for i in range(0, len(data), self.batch_size):
batch = data[i:i + self.batch_size]
try:
if hasattr(target_hook, 'insert_rows'):
target_hook.insert_rows(
table=self.sync_config.get('target_table'),
rows=batch,
)
elif hasattr(target_hook, 'upload'):
target_hook.upload(
bucket_name=self.sync_config.get('target_bucket'),
object_name=f"batch_{i}.json",
data=str(batch),
)
results['records_loaded'] += len(batch)
except Exception as e:
results['errors'].append({
'batch_start': i,
'error': str(e),
})
return results
def _validate_sync(
self,
source_hook: Any,
target_hook: Any,
sync_results: Dict[str, Any]
) -> None:
"""Validate that synchronization was successful."""
if sync_results['errors']:
raise ValueError(
f"Sync completed with errors: {sync_results['errors']}"
)
# Additional validation logic can be added here
self.log.info(
f"Sync validation passed: {sync_results['records_loaded']} records loaded"
)
def _apply_filter(self, row: Any, transformation: Dict[str, Any]) -> bool:
"""Apply filter transformation."""
column = transformation.get('column')
operator = transformation.get('operator')
value = transformation.get('value')
if operator == 'eq':
return row[column] == value
elif operator == 'gt':
return row[column] > value
elif operator == 'lt':
return row[column] < value
elif operator == 'contains':
return value in str(row[column])
else:
return True
def _apply_mapping(self, row: Any, transformation: Dict[str, Any]) -> Dict[str, Any]:
"""Apply mapping transformation."""
mapping = transformation.get('mapping', {})
return {mapping.get(k, k): v for k, v in row.items()}
def _apply_aggregation(self, data: Any, transformation: Dict[str, Any]) -> Any:
"""Apply aggregation transformation."""
group_by = transformation.get('group_by')
aggregate_column = transformation.get('aggregate_column')
aggregate_func = transformation.get('aggregate_function', 'sum')
groups = {}
for row in data:
key = row[group_by]
if key not in groups:
groups[key] = []
groups[key].append(row[aggregate_column])
result = []
for key, values in groups.items():
if aggregate_func == 'sum':
aggregated = sum(values)
elif aggregate_func == 'avg':
aggregated = sum(values) / len(values)
elif aggregate_func == 'count':
aggregated = len(values)
else:
aggregated = values
result.append({group_by: key, aggregate_column: aggregated})
return result
Performance Metrics
| Metric | Description | Optimization Strategy |
|---|---|---|
| Operator Execution Time | Time to complete operator execution | Optimize external calls, use batching |
| Hook Connection Time | Time to establish connection | Connection pooling, caching |
| API Response Time | External API response time | Async operations, retry logic |
| Data Transfer Rate | Amount of data moved | Compression, streaming, batching |
| Error Rate | Percentage of failed operations | Retry logic, circuit breakers |
| Memory Usage | Operator memory footprint | Streaming processing, pagination |
| CPU Utilization | Processing power usage | Parallel execution, optimization |
| Connection Pool Size | Active connections | Pool tuning, monitoring |
Best Practices
-
Operator Design: Keep operators focused on single responsibilities. Implement idempotency to handle retries safely. Use template fields for dynamic parameters.
-
Hook Implementation: Follow the hook pattern for connection management. Implement proper error handling and retry logic. Use connection pooling for high-throughput scenarios.
-
Error Handling: Implement comprehensive error handling with meaningful error messages. Use Airflow's exception classes for proper error propagation.
-
Testing: Write unit tests for operators and hooks. Mock external dependencies to ensure test isolation. Test both success and failure scenarios.
-
Documentation: Provide clear documentation for custom operators and hooks. Include usage examples and parameter descriptions. Document any external dependencies.
-
Security: Never hardcode credentials. Use Airflow's connection system for credential management. Implement proper access controls for sensitive operations.
-
Performance: Optimize external calls to minimize latency. Use batching for bulk operations. Implement connection pooling for high-throughput scenarios.
-
Monitoring: Add logging for important operations. Implement custom metrics for monitoring. Use Airflow's callback system for alerting.
-
Maintainability: Write clean, readable code with proper separation of concerns. Follow Python coding standards. Use type hints for better code clarity.
-
Reusability: Design operators and hooks for reuse across multiple DAGs. Use configuration to customize behavior. Implement proper abstraction layers.
Key Takeaways:
- Operators encapsulate unit-of-work logic; hooks manage external connections
- Idempotency () is required for safe retries
- Total retry window grows exponentially:
- Hooks abstract connection details, authentication, and client creation
- Custom operators must implement
execute()and definetemplate_fields - Never hardcode credentials; use Airflow's connection system
See also: Kafka Connect (kafka/03), PySpark Submit (pyspark/19), Data Engineering Orchestration (data-engineering/017)