Deferrable and Async Operators in Airflow
Architecture Diagram
Formal Definitions
DfDeferrable Operator
A deferrable operator is an operator whose execute() method yields a Trigger object instead of performing blocking work. When the trigger is yielded, the worker slot is released and the triggerer process monitors the async event. The task is requeued upon trigger completion.
DfTrigger
A trigger is a serializable object that defines an async callable. It runs in the triggerer process using Python's asyncio event loop. Triggers must inherit from BaseTrigger and implement run() as an async generator.
DfTriggerer
The triggerer is a separate Airflow process that executes triggers asynchronously. It maintains a configurable number of async workers () and polls the metadata database for pending triggers. Triggerer capacity determines maximum concurrent deferrable operations.
Detailed Explanation
Why Deferrable Operators?
Traditional blocking operators hold a worker slot while waiting for external conditions. For long-running operations like API callbacks, database queries, or cloud job completions, this wastes valuable executor resources. Deferrable operators release the worker slot immediately and delegate the wait to the lightweight triggerer process.
Here,
- =Time spent doing actual work
- =Total time the worker slot was held
For a blocking sensor polling every 30 seconds for 1 hour: (nearly all time is wasted polling). A deferrable equivalent achieves since the worker slot is released immediately.
Creating a Deferrable Operator
from typing import Any, Dict, AsyncIterator
from datetime import timedelta
from airflow.models import BaseOperator
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.utils.decorators import apply_defaults
import asyncio
import aiohttp
class WaitForResultTrigger(BaseTrigger):
"""Async trigger that polls an external API for result completion."""
def __init__(self, poll_url: str, poll_interval: int = 30, token: str = ''):
super().__init__()
self.poll_url = poll_url
self.poll_interval = poll_interval
self.token = token
def serialize(self) -> tuple:
return (
'path.to.WaitForResultTrigger',
{
'poll_url': self.poll_url,
'poll_interval': self.poll_interval,
'token': self.token,
},
)
async def run(self) -> AsyncIterator['TriggerEvent']:
while True:
async with aiohttp.ClientSession() as session:
headers = {'Authorization': f'Bearer {self.token}'}
async with session.get(
self.poll_url, headers=headers
) as response:
data = await response.json()
if data.get('status') == 'completed':
yield TriggerEvent({'status': 'completed', 'result': data})
return
elif data.get('status') == 'failed':
yield TriggerEvent({'status': 'failed', 'error': data.get('error')})
return
await asyncio.sleep(self.poll_interval)
class AsyncJobMonitorOperator(BaseOperator):
"""
Deferrable operator that monitors an external async job.
:param job_url: URL to check job status
:param poll_interval: Seconds between polls
:param execution_timeout: Max time for the entire operation
"""
template_fields = ('job_url',)
@apply_defaults
def __init__(
self,
job_url: str,
poll_interval: int = 30,
execution_timeout: timedelta = timedelta(hours=1),
*args,
**kwargs,
):
super().__init__(*args, execution_timeout=execution_timeout, **kwargs)
self.job_url = job_url
self.poll_interval = poll_interval
def execute(self, context: Dict[str, Any]) -> None:
self.log.info(f'Monitoring job at {self.job_url}')
self.defer(
timeout=self.execution_timeout,
trigger=WaitForResultTrigger(
poll_url=self.job_url,
poll_interval=self.poll_interval,
),
method_name='execute_complete',
)
def execute_complete(self, context: Dict[str, Any], event: Dict[str, Any]) -> Any:
if event['status'] == 'completed':
self.log.info(f'Job completed: {event["result"]}')
return event['result']
else:
raise Exception(f'Job failed: {event.get("error", "Unknown error")}')
HTTP Polling Trigger
from typing import Any, Dict, AsyncIterator
from airflow.triggers.base import BaseTrigger, TriggerEvent
import asyncio
import aiohttp
class HttpPollingTrigger(BaseTrigger):
"""
General-purpose HTTP polling trigger.
Polls a URL until a condition is met or timeout is reached.
"""
def __init__(
self,
url: str,
method: str = 'GET',
headers: Dict[str, str] = None,
expected_status: int = 200,
poll_interval: int = 10,
timeout: int = 300,
):
super().__init__()
self.url = url
self.method = method
self.headers = headers or {}
self.expected_status = expected_status
self.poll_interval = poll_interval
self.timeout = timeout
def serialize(self) -> tuple:
return (
'path.to.HttpPollingTrigger',
{
'url': self.url,
'method': self.method,
'headers': self.headers,
'expected_status': self.expected_status,
'poll_interval': self.poll_interval,
'timeout': self.timeout,
},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
elapsed = 0
while elapsed < self.timeout:
async with aiohttp.ClientSession() as session:
try:
async with session.request(
self.method, self.url, headers=self.headers
) as response:
if response.status == self.expected_status:
data = await response.json()
yield TriggerEvent({
'status': 'success',
'data': data,
})
return
except aiohttp.ClientError:
pass
await asyncio.sleep(self.poll_interval)
elapsed += self.poll_interval
yield TriggerEvent({
'status': 'timeout',
'error': f'Timeout after {self.timeout}s',
})
Triggerer Configuration
# airflow.cfg
[triggerer]
# Number of triggers the triggerer process can run concurrently
capacity = 1000
# How often to check the database for new triggers
triggerer_heartbeat = 5
Key Concepts Table
| Component | Purpose | Resource Impact | Scaling |
|---|---|---|---|
| Deferrable Operator | Releases worker slot on defer | Minimal worker usage | Independent of worker count |
| Trigger Object | Async event definition | Near-zero in memory | Serialized to DB |
| Triggerer Process | Executes async triggers | Single process, many coroutines | Scale by capacity config |
| Trigger Callback | Requeues completed task | Metadata DB write | Scales with DB connections |
| BaseTrigger | Base class for triggers | N/A | N/A |
Comparison: Blocking vs Deferrable
| Metric | Blocking Operator | Deferrable Operator |
|---|---|---|
| Worker slot held | Entire duration | Only during execute() |
| Memory per wait | ~50-200MB (worker process) | ~1KB (trigger object) |
| Concurrency | Limited by executor slots | Limited by triggerer capacity |
| Latency on completion | Poll interval | Seconds (triggerer heartbeat) |
| Implementation complexity | Simple | Moderate |
| Best for | Short waits (<5 min) | Long waits (>5 min) |
Code Examples
Custom Trigger with Multiple Conditions
from typing import Any, Dict, AsyncIterator
from airflow.triggers.base import BaseTrigger, TriggerEvent
import asyncio
class MultiConditionTrigger(BaseTrigger):
"""
Trigger that waits for multiple conditions to be satisfied.
"""
def __init__(
self,
conditions: list,
poll_interval: int = 30,
):
super().__init__()
self.conditions = conditions
self.poll_interval = poll_interval
def serialize(self) -> tuple:
return (
'path.to.MultiConditionTrigger',
{
'conditions': self.conditions,
'poll_interval': self.poll_interval,
},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
import aiohttp
while True:
all_met = True
results = {}
async with aiohttp.ClientSession() as session:
for condition in self.conditions:
url = condition['url']
async with session.get(url) as response:
data = await response.json()
met = condition.get('check', lambda d: True)(data)
results[condition['name']] = {
'met': met,
'data': data,
}
if not met:
all_met = False
if all_met:
yield TriggerEvent({'status': 'all_met', 'results': results})
return
self.log.info(
f'Conditions not met: '
f'{[k for k, v in results.items() if not v["met"]]}'
)
await asyncio.sleep(self.poll_interval)
Deferrable Sensor Pattern
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.base import TriggerEvent
from typing import Any, Dict, AsyncIterator
import asyncio
class DeferrableFileSensor(BaseSensorOperator):
"""
Sensor that defers instead of blocking.
When deferring, the sensor yields a trigger that asynchronously
monitors for file availability without holding a worker slot.
"""
def __init__(self, filepath: str, poll_interval: int = 30, **kwargs):
super().__init__(**kwargs)
self.filepath = filepath
self.poll_interval = poll_interval
def execute(self, context: Dict[str, Any]) -> None:
if not self._check_file():
self.defer(
trigger=FileExistTrigger(self.filepath, self.poll_interval),
method_name='execute_complete',
)
self.log.info(f'File exists: {self.filepath}')
def execute_complete(self, context: Dict[str, Any], event: Dict[str, Any]) -> None:
self.log.info(f'File appeared: {event.get("filepath")}')
def _check_file(self) -> bool:
import os
return os.path.exists(self.filepath)
class FileExistTrigger(BaseTrigger):
def __init__(self, filepath: str, poll_interval: int):
super().__init__()
self.filepath = filepath
self.poll_interval = poll_interval
def serialize(self) -> tuple:
return (
'path.to.FileExistTrigger',
{'filepath': self.filepath, 'poll_interval': self.poll_interval},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
import os
while not os.path.exists(self.filepath):
await asyncio.sleep(self.poll_interval)
yield TriggerEvent({'filepath': self.filepath})
Performance Metrics
| Metric | Blocking | Deferrable | Improvement |
|---|---|---|---|
| Worker slot time (1hr wait) | 3600s | ~0.1s | 36000x |
| Memory per waiting task | ~100MB | ~1KB | 100000x |
| Max concurrent waits (16 slots) | 16 | 1000+ | 62x+ |
| Task startup overhead | Worker process | Trigger coroutine | Negligible |
Best Practices
- Use for long waits: Deferrable operators shine when tasks wait >5 minutes for external events.
- Implement serialize(): Always implement
serialize()on triggers for persistence across triggerer restarts. - Handle timeouts: Set
execution_timeouton defer calls to prevent infinite waits. - Use async libraries: Leverage
aiohttp,httpx, oraiobotocorefor non-blocking I/O in triggers. - Monitor triggerer capacity: Track
triggers.runningandtriggers.queuedmetrics. - Fail gracefully: Handle trigger errors by yielding
TriggerEvent({'status': 'error'}). - Avoid blocking in triggers: Never use
time.sleep()or synchronous I/O in triggerrun()methods. - Test triggers independently: Mock the async loop and verify trigger serialization.
The triggerer capacity setting directly limits how many deferrable tasks can be pending simultaneously. If you have 10,000 tasks deferring at once, ensure capacity in [triggerer] is set accordingly (default is 1000).
Deferrable operators require Airflow 2.2+ and the triggerer component to be running. The triggerer is a lightweight process — a single triggerer instance can handle thousands of concurrent triggers thanks to Python's asyncio.
Key Takeaways:
- Deferrable operators release worker slots by yielding Trigger objects to the triggerer process
- Triggers run asynchronously using Python's asyncio — no worker resources consumed during waits
- Worker slot utilization improves from ~0.001 (blocking) to 1.0 (deferrable) for long waits
- The triggerer capacity setting limits maximum concurrent deferrable tasks
- Always implement
serialize()on triggers and handle timeouts withexecution_timeout - Use async HTTP clients (aiohttp, httpx) in triggers — never synchronous I/O
See Also
- Sensors and Operators — Blocking sensors and poke modes
- Scheduling and Triggers — Timetables and scheduling patterns
- Operators and Hooks — Operator lifecycle and hook architecture
- Executors Comparison — Executor selection and resource management