Sensors and Operators in Apache Airflow
Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SENSOR ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β SENSOR COMPONENTS β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β BaseSensor β β β
β β β βββ poke_interval (default: 60s) β β β
β β β βββ timeout (default: 7 * 24 * 3600s) β β β
β β β βββ soft_fail (default: False) β β β
β β β βββ mode (default: 'poke') β β β
β β β βββ exponential_backoff (default: False) β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Sensor Modes β β β
β β β βββ Poke Mode β β β
β β β β βββ Active polling (uses worker slot) β β β
β β β βββ Reschedule Mode β β β
β β β β βββ Release worker between pokes β β β
β β β βββ Deferred Mode β β β
β β β βββ Async waiting (uses triggerer) β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β SENSOR TYPES β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β File System Sensors β β β
β β β βββ FileSensor β β β
β β β βββ S3KeySensor β β β
β β β βββ GCSObjectExistenceSensor β β β
β β β βββ LocalFilesystemSensor β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β HTTP Sensors β β β
β β β βββ HttpSensor β β β
β β β βββ SimpleHttpSensor β β β
β β β βββ WebHdfsSensor β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Database Sensors β β β
β β β βββ SqlSensor β β β
β β β βββ ExternalTaskSensor β β β
β β β βββ S3PrefixSensor β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SENSOR MODES COMPARISON β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β POKE MODE β β
β β β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β β Sensor βββββββΆβ Poke βββββββΆβ Wait βββββββΆβ Repeat β β β
β β β Starts β β Check β β Intervalβ β Until β β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β β β β β β β
β β β Uses worker β Blocks β Keeps β β β
β β β slot β execution β slot β β β
β β β β
β β Characteristics: β β
β β β’ Simple implementation β β
β β β’ Uses worker slot continuously β β
β β β’ Good for short waits (< 5 minutes) β β
β β β’ Not resource efficient for long waits β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β RESCHEDULE MODE β β
β β β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β β Sensor βββββββΆβ Poke βββββββΆβ Release βββββββΆβRe-queue β β β
β β β Starts β β Check β β Worker β β β β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β β β β β β β
β β β Frees worker β No blocking β Reuses β β β
β β β slot β during wait β slot β β β
β β β β
β β Characteristics: β β
β β β’ Releases worker slot between pokes β β
β β β’ More resource efficient for long waits β β
β β β’ Slightly higher latency due to re-queueing β β
β β β’ Good for waits > 5 minutes β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β DEFERRED MODE β β
β β β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β β OperatorβββββββΆβ Trigger βββββββΆβTriggererβββββββΆβ Resume β β β
β β β Defers β β Created β β Waits β β Operatorβ β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β β β β β β β
β β β No worker β Async wait β No worker β β β
β β β slot used β outside β slot needed β β β
β β β β
β β Characteristics: β β
β β β’ Most resource efficient β β
β β β’ Uses async triggers β β
β β β’ No worker resources used during wait β β
β β β’ Best for long waits (hours/days) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SENSOR OPTIMIZATION PATTERNS β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β EXPONENTIAL BACKOFF β β
β β β β
β β Poke 1 Poke 2 Poke 3 Poke 4 Poke 5 β β
β β βββββ βββββ βββββ βββββ βββββ β β
β β β 1sβ β 2sβ β 4sβ β 8sβ β16sβ β β
β β βββββ βββββ βββββ βββββ βββββ β β
β β β β β β β β β
β β βββββββββββ΄ββββββββββ΄ββββββββββ΄ββββββββββ β β
β β Exponentially increasing intervals β β
β β β β
β β Benefits: β β
β β β’ Reduces load on external systems β β
β β β’ Conserves worker resources β β
β β β’ Good for unpredictable wait times β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β SENSOR CHAINING β β
β β β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β βSensor A βββββββΆβSensor B βββββββΆβSensor C βββββββΆβ Task β β β
β β β(File) β β(HTTP) β β(DB) β β β β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β β β β β β β
β β β Waits for β Then waits β Finally β β β
β β β file β for API β for DB β β β
β β β β
β β Benefits: β β
β β β’ Complex dependency chains β β
β β β’ Fine-grained control β β
β β β’ Error isolation β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β SENSOR AGGREGATION β β
β β β β
β β βββββββββββ β β
β β βSensor A βββββββ β β
β β βββββββββββ β β β
β β βββββββββββ β βββββββββββ βββββββββββ β β
β β βSensor B βββββββΌβββββΆβ Join βββββββΆβ Task β β β
β β βββββββββββ β β Operatorβ β β β β
β β βββββββββββ β βββββββββββ βββββββββββ β β
β β βSensor C βββββββ β β
β β βββββββββββ β β
β β β β
β β Benefits: β β
β β β’ Wait for multiple conditions β β
β β β’ Parallel monitoring β β
β β β’ Reduced overall wait time β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Formal Definitions
DfSensor
A sensor is a specialized operator that polls an external condition at intervals until the condition returns True or a timeout is exceeded. A sensor is defined as where poke() is the condition check function.
DfPoke Interval
The poke interval is the time (in seconds) between consecutive condition checks by a sensor. The sensor thread is occupied during each poke in poke mode, or released and re-queued in reschedule mode.
DfDeferred Mode
Deferred mode is a sensor execution strategy where the operator creates an async trigger and releases the worker slot entirely. The triggerer service monitors the condition without consuming worker resources. This is the most efficient mode for long waits.
Detailed Explanation
Sensor Fundamentals
Sensors are specialized operators that wait for certain conditions to be met before proceeding. They poll external systems at regular intervals until a condition is satisfied or a timeout is reached. Understanding sensor behavior is crucial for building efficient workflows.
Poke Method: The poke method is the core of every sensor. It returns True when the condition is met and False otherwise. The sensor repeatedly calls this method until it returns True or the timeout is exceeded.
Timeout Handling: Sensors have a configurable timeout (default: 7 days). If the condition isn't met within the timeout, the sensor fails. Set appropriate timeouts based on expected wait times.
Soft Fail: The soft_fail parameter determines whether sensor failure marks the task as failed or skipped. Set soft_fail=True for optional conditions that shouldn't fail the entire workflow.
Sensor Modes
Poke Mode: The default mode where the sensor actively polls the condition. It holds a worker slot during the entire wait period. This is simple but resource-intensive for long waits.
Reschedule Mode: The sensor releases the worker slot between pokes and re-queues itself. This is more resource-efficient but adds latency due to re-queueing. Use for waits longer than 5 minutes.
Deferred Mode: The sensor uses Airflow's triggerer to wait asynchronously. No worker resources are used during the wait. This is the most efficient mode for long waits but requires triggerer service.
Exponential Backoff Interval
Here,
- =Initial poke interval
- =Multiplier (typically 2.0)
- =Poke attempt number (0-indexed)
- =Maximum poke interval cap
Worker Slot Utilization (Poke Mode)
Here,
- =Total sensor wait time
- =Downstream task execution time
ThSensor Timeout Guarantee
If a sensor has timeout and poke interval , the sensor is guaranteed to either succeed or fail within where is the maximum poke execution time. Proof: The sensor checks at most times before exceeding the timeout.
In poke mode, the sensor occupies a worker slot for the entire wait duration. For sensors waiting simultaneously, the effective parallelism is reduced by . Use reschedule or deferred mode to free slots during waits.
For exponential backoff sensors, set max_wait to prevent excessively long intervals. A common pattern is seconds (5 minutes) to balance responsiveness with external system load.
Advanced Sensor Patterns
Exponential Backoff: Sensors can use exponential backoff for poke intervals. The interval starts at poke_interval and doubles with each poke, up to max_wait. This reduces load on external systems while maintaining responsiveness.
Sensor Chaining: Multiple sensors can be chained to wait for complex conditions. Each sensor waits for its specific condition before allowing the next sensor to start.
Sensor Aggregation: Use BranchPythonOperator or custom logic to wait for multiple conditions in parallel. This pattern reduces overall wait time compared to sequential sensor chaining.
Custom Sensors: Create custom sensors for specialized conditions. Inherit from BaseSensorOperator and implement the poke method with your specific logic.
External Task Sensor
The ExternalTaskSensor waits for a task in another DAG or the same DAG to complete. It's useful for cross-DAG dependencies and complex workflow orchestration.
Configuration: Specify the external_dag_id, external_task_id, and optionally execution_date. The sensor checks if the external task has reached the expected state.
States: By default, the sensor waits for the task to succeed. Use allowed_states to wait for specific states like success, failed, or skipped.
Execution Date Matching: The sensor can match execution dates to ensure proper synchronization between DAGs. Use execution_delta or execution_date_fn for flexible date matching.
Key Concepts Table
| Sensor Type | Mode | Best For | Resource Usage | Latency |
|---|---|---|---|---|
| FileSensor | Poke | Short file waits | High | Low |
| S3KeySensor | Reschedule | S3 object waits | Medium | Medium |
| HttpSensor | Deferred | API availability | Low | High |
| SqlSensor | Poke | Database conditions | High | Low |
| ExternalTaskSensor | Poke/Reschedule | Cross-DAG deps | Medium | Medium |
| TimeSensor | Poke | Scheduled waits | High | Low |
Code Examples
Advanced Sensor Implementation
# advanced_sensors.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
import requests
import json
class AdvancedHttpSensor(BaseSensorOperator):
"""
Advanced HTTP sensor with multiple condition support.
This sensor supports multiple conditions, custom headers,
and response validation.
"""
template_fields = ('endpoint', 'headers', 'expected_status')
ui_color = '#4285F4'
@apply_defaults
def __init__(
self,
endpoint: str,
method: str = 'GET',
headers: dict = None,
expected_status: int = 200,
json_path: str = None,
expected_value: any = None,
auth_type: str = 'bearer',
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.endpoint = endpoint
self.method = method
self.headers = headers or {}
self.expected_status = expected_status
self.json_path = json_path
self.expected_value = expected_value
self.auth_type = auth_type
def poke(self, context):
"""Check if the HTTP condition is met."""
try:
# Get connection details
from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection(self.http_conn_id)
# Build request
url = f"{conn.schema}://{conn.host}:{conn.port}{self.endpoint}"
headers = {**self.headers}
# Add authentication
if self.auth_type == 'bearer':
headers['Authorization'] = f"Bearer {conn.get_password()}"
elif self.auth_type == 'basic':
import base64
credentials = base64.b64encode(
f"{conn.login}:{conn.get_password()}".encode()
).decode()
headers['Authorization'] = f"Basic {credentials}"
# Make request
response = requests.request(
method=self.method,
url=url,
headers=headers,
timeout=30,
)
# Check status code
if response.status_code != self.expected_status:
self.log.warning(
f"Expected status {self.expected_status}, "
f"got {response.status_code}"
)
return False
# Check JSON path if specified
if self.json_path and self.expected_value is not None:
data = response.json()
actual_value = self._get_json_value(data, self.json_path)
if actual_value != self.expected_value:
self.log.warning(
f"Expected value {self.expected_value} at path "
f"{self.json_path}, got {actual_value}"
)
return False
self.log.info("HTTP condition met successfully")
return True
except Exception as e:
self.log.error(f"HTTP sensor failed: {str(e)}")
return False
def _get_json_value(self, data: dict, path: str):
"""Get value from JSON using dot notation path."""
keys = path.split('.')
value = data
for key in keys:
if isinstance(value, dict):
value = value.get(key)
else:
return None
return value
class MultiConditionSensor(BaseSensorOperator):
"""
Sensor that waits for multiple conditions to be met.
"""
@apply_defaults
def __init__(
self,
conditions: list,
operator: str = 'AND',
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.conditions = conditions
self.operator = operator
def poke(self, context):
"""Check if conditions are met."""
results = []
for condition in self.conditions:
condition_type = condition.get('type')
result = False
if condition_type == 'file':
result = self._check_file_condition(condition)
elif condition_type == 'http':
result = self._check_http_condition(condition)
elif condition_type == 'database':
result = self._check_database_condition(condition)
results.append(result)
if self.operator == 'AND':
return all(results)
else: # OR
return any(results)
def _check_file_condition(self, condition: dict) -> bool:
"""Check file-based condition."""
import os
path = condition.get('path')
exists = condition.get('exists', True)
file_exists = os.path.exists(path)
return file_exists == exists
def _check_http_condition(self, condition: dict) -> bool:
"""Check HTTP-based condition."""
url = condition.get('url')
expected_status = condition.get('expected_status', 200)
try:
response = requests.get(url, timeout=10)
return response.status_code == expected_status
except Exception:
return False
def _check_database_condition(self, condition: dict) -> bool:
"""Check database-based condition."""
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id=condition.get('conn_id'))
query = condition.get('query')
result = hook.get_first(query)
return bool(result)
# Usage in DAG
with DAG(
'advanced_sensor_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Advanced sensor examples',
schedule_interval=timedelta(hours=1),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['sensors', 'advanced'],
) as dag:
# Advanced HTTP sensor
api_sensor = AdvancedHttpSensor(
task_id='api_availability_sensor',
endpoint='/api/v1/status',
expected_status=200,
json_path='status',
expected_value='healthy',
mode='reschedule',
poke_interval=30,
timeout=300,
)
# Multi-condition sensor
multi_condition = MultiConditionSensor(
task_id='multi_condition_sensor',
conditions=[
{'type': 'file', 'path': '/data/input.csv', 'exists': True},
{'type': 'http', 'url': 'http://api:8080/health', 'expected_status': 200},
{'type': 'database', 'conn_id': 'postgres', 'query': 'SELECT 1'},
],
operator='AND',
mode='poke',
poke_interval=60,
)
# Process task
process = PythonOperator(
task_id='process_data',
python_callable=lambda: print("Processing data..."),
)
api_sensor >> multi_condition >> process
External Task Sensor Patterns
# external_task_patterns.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.sensors.time_sensor import TimeSensor
from airflow.utils.state import State
def process_data(**context):
"""Process data after external task completes."""
print("Processing data after external dependency met")
def generate_report(**context):
"""Generate report after processing."""
print("Generating report...")
# DAG 1: Upstream DAG (external dependency)
with DAG(
'upstream_data_pipeline',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Upstream data pipeline',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['external', 'upstream'],
) as upstream_dag:
def extract_data(**context):
"""Extract data from source."""
print("Extracting data...")
return {"status": "success"}
def transform_data(**context):
"""Transform extracted data."""
print("Transforming data...")
return {"status": "success"}
extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
)
transform = PythonOperator(
task_id='transform',
python_callable=transform_data,
)
extract >> transform
# DAG 2: Downstream DAG (waits for upstream)
with DAG(
'downstream_data_pipeline',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Downstream data pipeline',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['external', 'downstream'],
) as downstream_dag:
# Wait for upstream DAG to complete
wait_for_upstream = ExternalTaskSensor(
task_id='wait_for_upstream',
external_dag_id='upstream_data_pipeline',
external_task_id='transform',
allowed_states=[State.SUCCESS],
failed_states=[State.FAILED],
execution_delta=timedelta(hours=1),
mode='reschedule',
poke_interval=300,
timeout=3600,
)
# Wait for specific execution date
wait_for_yesterday = ExternalTaskSensor(
task_id='wait_for_yesterday',
external_dag_id='upstream_data_pipeline',
external_task_id='transform',
execution_date_fn=lambda exec_date: exec_date - timedelta(days=1),
mode='reschedule',
poke_interval=300,
)
# Process after upstream completes
process = PythonOperator(
task_id='process',
python_callable=process_data,
)
# Generate report
report = PythonOperator(
task_id='report',
python_callable=generate_report,
)
[wait_for_upstream, wait_for_yesterday] >> process >> report
# DAG 3: Complex external dependencies
with DAG(
'complex_external_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Complex external dependency example',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['external', 'complex'],
) as complex_dag:
# Wait for multiple external tasks
wait_task_1 = ExternalTaskSensor(
task_id='wait_task_1',
external_dag_id='upstream_data_pipeline',
external_task_id='extract',
mode='reschedule',
poke_interval=300,
)
wait_task_2 = ExternalTaskSensor(
task_id='wait_task_2',
external_dag_id='another_pipeline',
external_task_id='load',
mode='reschedule',
poke_interval=300,
)
# Time sensor
time_check = TimeSensor(
task_id='time_check',
target_time=datetime.strptime('09:00', '%H:%M').time(),
mode='reschedule',
)
# Process after all conditions met
process = PythonOperator(
task_id='process',
python_callable=process_data,
)
[wait_task_1, wait_task_2, time_check] >> process
Sensor Optimization Patterns
# sensor_optimization.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
import time
import random
class OptimizedFileSensor(BaseSensorOperator):
"""
File sensor with optimization features.
"""
template_fields = ('filepath',)
ui_color = '#FF9800'
@apply_defaults
def __init__(
self,
filepath: str,
recursive: bool = False,
min_file_size: int = 0,
max_file_age_days: int = None,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.filepath = filepath
self.recursive = recursive
self.min_file_size = min_file_size
self.max_file_age_days = max_file_age_days
def poke(self, context):
"""Check if file meets all criteria."""
import os
from datetime import datetime, timedelta
if not os.path.exists(self.filepath):
return False
# Check file size
file_size = os.path.getsize(self.filepath)
if file_size < self.min_file_size:
self.log.info(
f"File size {file_size} < minimum {self.min_file_size}"
)
return False
# Check file age
if self.max_file_age_days:
file_mtime = os.path.getmtime(self.filepath)
file_age = datetime.now() - datetime.fromtimestamp(file_mtime)
if file_age > timedelta(days=self.max_file_age_days):
self.log.info(f"File is too old: {file_age}")
return False
self.log.info(f"File {self.filepath} meets all criteria")
return True
class ExponentialBackoffSensor(BaseSensorOperator):
"""
Sensor with exponential backoff.
"""
@apply_defaults
def __init__(
self,
initial_interval: int = 10,
max_interval: int = 300,
multiplier: float = 2.0,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.initial_interval = initial_interval
self.max_interval = max_interval
self.multiplier = multiplier
self._current_interval = initial_interval
def poke(self, context):
"""Implement exponential backoff logic."""
# Check condition
condition_met = self._check_condition()
if condition_met:
return True
# Update interval for next poke
self._current_interval = min(
self._current_interval * self.multiplier,
self.max_interval
)
self.log.info(
f"Condition not met. Next poke in {self._current_interval} seconds"
)
# Sleep for current interval
time.sleep(self._current_interval)
return False
def _check_condition(self):
"""Override this method to implement your condition check."""
# Example: Random condition for demonstration
return random.random() < 0.1 # 10% chance of success
with DAG(
'sensor_optimization_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Sensor optimization patterns',
schedule_interval=timedelta(hours=1),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['sensors', 'optimization'],
) as dag:
# Optimized file sensor
file_sensor = OptimizedFileSensor(
task_id='optimized_file_sensor',
filepath='/data/input.csv',
min_file_size=1024, # At least 1KB
max_file_age_days=1, # Not older than 1 day
mode='reschedule',
poke_interval=60,
timeout=3600,
)
# Exponential backoff sensor
backoff_sensor = ExponentialBackoffSensor(
task_id='exponential_backoff_sensor',
initial_interval=10,
max_interval=300,
multiplier=2.0,
mode='poke',
timeout=3600,
)
# Process task
process = PythonOperator(
task_id='process',
python_callable=lambda: print("Processing data..."),
)
file_sensor >> backoff_sensor >> process
Performance Metrics
| Metric | Description | Optimization Strategy |
|---|---|---|
| Poke Interval | Time between condition checks | Adjust based on expected wait time |
| Worker Slot Usage | Resources consumed during wait | Use reschedule or deferred mode |
| Latency | Time from condition met to task start | Minimize poke interval for time-sensitive tasks |
| Timeout Rate | Percentage of sensors that timeout | Set appropriate timeouts, monitor external systems |
| Resource Efficiency | Worker resources used per wait | Use deferred mode for long waits |
| Error Rate | Percentage of failed sensor checks | Implement proper error handling |
| False Positive Rate | Incorrect condition detection | Validate sensor logic thoroughly |
| Throughput | Number of sensors handled concurrently | Optimize poke intervals, use deferred mode |
Best Practices
-
Mode Selection: Choose the appropriate mode based on wait time. Use poke for short waits (< 5 min), reschedule for medium waits (5-60 min), and deferred for long waits (> 1 hour).
-
Timeout Configuration: Set reasonable timeouts based on expected wait times. Consider external system reliability and implement graceful failure handling.
-
Poke Interval Optimization: Balance between responsiveness and resource usage. Use exponential backoff for unpredictable wait times.
-
Error Handling: Implement proper error handling in sensor logic. Use
soft_failfor optional conditions. Log meaningful error messages. -
Resource Management: Monitor sensor resource usage. Use pools to limit concurrent sensors. Implement sensor prioritization.
-
Testing: Test sensors in isolation with mock external dependencies. Verify timeout behavior and error handling. Test different modes.
-
Monitoring: Track sensor performance metrics. Monitor poke intervals, success rates, and resource usage. Set up alerts for anomalies.
-
Documentation: Document sensor dependencies and expected behavior. Include timeout and retry configurations. Provide troubleshooting guidance.
-
Maintenance: Regularly review and update sensor configurations. Remove obsolete sensors. Update timeout values based on changing external systems.
-
Security: Implement proper authentication for external system access. Use secrets management for credentials. Validate input parameters.
Key Takeaways:
- Sensors poll conditions at interval until success or timeout
- Exponential backoff:
- Timeout guarantee: sensor resolves within
- Poke mode uses worker slots; reschedule mode releases between pokes; deferred mode uses triggerer
- Worker slot utilization in poke mode:
- Use
ExternalTaskSensorfor cross-DAG dependencies with execution date matching
See also: Kafka Connect (kafka/03), PySpark Submit (pyspark/19), Data Engineering Orchestration (data-engineering/017)