CW

Deferrable and Async Operators in Airflow

Free Lesson

Advertisement

Deferrable and Async Operators in Airflow

Architecture Diagram

Formal Definitions

DfDeferrable Operator

A deferrable operator OdeferO_{\text{defer}} is an operator whose execute() method yields a Trigger object instead of performing blocking work. When the trigger is yielded, the worker slot is released and the triggerer process monitors the async event. The task is requeued upon trigger completion.

DfTrigger

A trigger T=(class,kwargs,callback)T = (\text{class}, \text{kwargs}, \text{callback}) is a serializable object that defines an async callable. It runs in the triggerer process using Python's asyncio event loop. Triggers must inherit from BaseTrigger and implement run() as an async generator.

DfTriggerer

The triggerer is a separate Airflow process that executes triggers asynchronously. It maintains a configurable number of async workers (NtriggererN_{\text{triggerer}}) and polls the metadata database for pending triggers. Triggerer capacity determines maximum concurrent deferrable operations.

Detailed Explanation

Why Deferrable Operators?

Traditional blocking operators hold a worker slot while waiting for external conditions. For long-running operations like API callbacks, database queries, or cloud job completions, this wastes valuable executor resources. Deferrable operators release the worker slot immediately and delegate the wait to the lightweight triggerer process.

Worker Slot Utilization
Uworker=Tactive_workTtotal_heldU_{\text{worker}} = \frac{T_{\text{active\_work}}}{T_{\text{total\_held}}}

Here,

  • Tactive_workT_{\text{active\_work}}=Time spent doing actual work
  • Ttotal_heldT_{\text{total\_held}}=Total time the worker slot was held

For a blocking sensor polling every 30 seconds for 1 hour: Uworker0.001U_{\text{worker}} \approx 0.001 (nearly all time is wasted polling). A deferrable equivalent achieves Uworker=1.0U_{\text{worker}} = 1.0 since the worker slot is released immediately.

Creating a Deferrable Operator

from typing import Any, Dict, AsyncIterator
from datetime import timedelta
from airflow.models import BaseOperator
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.utils.decorators import apply_defaults
import asyncio
import aiohttp


class WaitForResultTrigger(BaseTrigger):
    """Async trigger that polls an external API for result completion."""

    def __init__(self, poll_url: str, poll_interval: int = 30, token: str = ''):
        super().__init__()
        self.poll_url = poll_url
        self.poll_interval = poll_interval
        self.token = token

    def serialize(self) -> tuple:
        return (
            'path.to.WaitForResultTrigger',
            {
                'poll_url': self.poll_url,
                'poll_interval': self.poll_interval,
                'token': self.token,
            },
        )

    async def run(self) -> AsyncIterator['TriggerEvent']:
        while True:
            async with aiohttp.ClientSession() as session:
                headers = {'Authorization': f'Bearer {self.token}'}
                async with session.get(
                    self.poll_url, headers=headers
                ) as response:
                    data = await response.json()

                    if data.get('status') == 'completed':
                        yield TriggerEvent({'status': 'completed', 'result': data})
                        return
                    elif data.get('status') == 'failed':
                        yield TriggerEvent({'status': 'failed', 'error': data.get('error')})
                        return

            await asyncio.sleep(self.poll_interval)


class AsyncJobMonitorOperator(BaseOperator):
    """
    Deferrable operator that monitors an external async job.

    :param job_url: URL to check job status
    :param poll_interval: Seconds between polls
    :param execution_timeout: Max time for the entire operation
    """

    template_fields = ('job_url',)

    @apply_defaults
    def __init__(
        self,
        job_url: str,
        poll_interval: int = 30,
        execution_timeout: timedelta = timedelta(hours=1),
        *args,
        **kwargs,
    ):
        super().__init__(*args, execution_timeout=execution_timeout, **kwargs)
        self.job_url = job_url
        self.poll_interval = poll_interval

    def execute(self, context: Dict[str, Any]) -> None:
        self.log.info(f'Monitoring job at {self.job_url}')
        self.defer(
            timeout=self.execution_timeout,
            trigger=WaitForResultTrigger(
                poll_url=self.job_url,
                poll_interval=self.poll_interval,
            ),
            method_name='execute_complete',
        )

    def execute_complete(self, context: Dict[str, Any], event: Dict[str, Any]) -> Any:
        if event['status'] == 'completed':
            self.log.info(f'Job completed: {event["result"]}')
            return event['result']
        else:
            raise Exception(f'Job failed: {event.get("error", "Unknown error")}')

HTTP Polling Trigger

from typing import Any, Dict, AsyncIterator
from airflow.triggers.base import BaseTrigger, TriggerEvent
import asyncio
import aiohttp


class HttpPollingTrigger(BaseTrigger):
    """
    General-purpose HTTP polling trigger.

    Polls a URL until a condition is met or timeout is reached.
    """

    def __init__(
        self,
        url: str,
        method: str = 'GET',
        headers: Dict[str, str] = None,
        expected_status: int = 200,
        poll_interval: int = 10,
        timeout: int = 300,
    ):
        super().__init__()
        self.url = url
        self.method = method
        self.headers = headers or {}
        self.expected_status = expected_status
        self.poll_interval = poll_interval
        self.timeout = timeout

    def serialize(self) -> tuple:
        return (
            'path.to.HttpPollingTrigger',
            {
                'url': self.url,
                'method': self.method,
                'headers': self.headers,
                'expected_status': self.expected_status,
                'poll_interval': self.poll_interval,
                'timeout': self.timeout,
            },
        )

    async def run(self) -> AsyncIterator[TriggerEvent]:
        elapsed = 0

        while elapsed < self.timeout:
            async with aiohttp.ClientSession() as session:
                try:
                    async with session.request(
                        self.method, self.url, headers=self.headers
                    ) as response:
                        if response.status == self.expected_status:
                            data = await response.json()
                            yield TriggerEvent({
                                'status': 'success',
                                'data': data,
                            })
                            return
                except aiohttp.ClientError:
                    pass

            await asyncio.sleep(self.poll_interval)
            elapsed += self.poll_interval

        yield TriggerEvent({
            'status': 'timeout',
            'error': f'Timeout after {self.timeout}s',
        })

Triggerer Configuration

# airflow.cfg
[triggerer]
# Number of triggers the triggerer process can run concurrently
capacity = 1000

# How often to check the database for new triggers
triggerer_heartbeat = 5

Key Concepts Table

ComponentPurposeResource ImpactScaling
Deferrable OperatorReleases worker slot on deferMinimal worker usageIndependent of worker count
Trigger ObjectAsync event definitionNear-zero in memorySerialized to DB
Triggerer ProcessExecutes async triggersSingle process, many coroutinesScale by capacity config
Trigger CallbackRequeues completed taskMetadata DB writeScales with DB connections
BaseTriggerBase class for triggersN/AN/A

Comparison: Blocking vs Deferrable

MetricBlocking OperatorDeferrable Operator
Worker slot heldEntire durationOnly during execute()
Memory per wait~50-200MB (worker process)~1KB (trigger object)
ConcurrencyLimited by executor slotsLimited by triggerer capacity
Latency on completionPoll intervalSeconds (triggerer heartbeat)
Implementation complexitySimpleModerate
Best forShort waits (<5 min)Long waits (>5 min)

Code Examples

Custom Trigger with Multiple Conditions

from typing import Any, Dict, AsyncIterator
from airflow.triggers.base import BaseTrigger, TriggerEvent
import asyncio


class MultiConditionTrigger(BaseTrigger):
    """
    Trigger that waits for multiple conditions to be satisfied.
    """

    def __init__(
        self,
        conditions: list,
        poll_interval: int = 30,
    ):
        super().__init__()
        self.conditions = conditions
        self.poll_interval = poll_interval

    def serialize(self) -> tuple:
        return (
            'path.to.MultiConditionTrigger',
            {
                'conditions': self.conditions,
                'poll_interval': self.poll_interval,
            },
        )

    async def run(self) -> AsyncIterator[TriggerEvent]:
        import aiohttp

        while True:
            all_met = True
            results = {}

            async with aiohttp.ClientSession() as session:
                for condition in self.conditions:
                    url = condition['url']
                    async with session.get(url) as response:
                        data = await response.json()
                        met = condition.get('check', lambda d: True)(data)
                        results[condition['name']] = {
                            'met': met,
                            'data': data,
                        }
                        if not met:
                            all_met = False

            if all_met:
                yield TriggerEvent({'status': 'all_met', 'results': results})
                return

            self.log.info(
                f'Conditions not met: '
                f'{[k for k, v in results.items() if not v["met"]]}'
            )
            await asyncio.sleep(self.poll_interval)

Deferrable Sensor Pattern

from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.base import TriggerEvent
from typing import Any, Dict, AsyncIterator
import asyncio


class DeferrableFileSensor(BaseSensorOperator):
    """
    Sensor that defers instead of blocking.

    When deferring, the sensor yields a trigger that asynchronously
    monitors for file availability without holding a worker slot.
    """

    def __init__(self, filepath: str, poll_interval: int = 30, **kwargs):
        super().__init__(**kwargs)
        self.filepath = filepath
        self.poll_interval = poll_interval

    def execute(self, context: Dict[str, Any]) -> None:
        if not self._check_file():
            self.defer(
                trigger=FileExistTrigger(self.filepath, self.poll_interval),
                method_name='execute_complete',
            )
        self.log.info(f'File exists: {self.filepath}')

    def execute_complete(self, context: Dict[str, Any], event: Dict[str, Any]) -> None:
        self.log.info(f'File appeared: {event.get("filepath")}')

    def _check_file(self) -> bool:
        import os
        return os.path.exists(self.filepath)


class FileExistTrigger(BaseTrigger):
    def __init__(self, filepath: str, poll_interval: int):
        super().__init__()
        self.filepath = filepath
        self.poll_interval = poll_interval

    def serialize(self) -> tuple:
        return (
            'path.to.FileExistTrigger',
            {'filepath': self.filepath, 'poll_interval': self.poll_interval},
        )

    async def run(self) -> AsyncIterator[TriggerEvent]:
        import os
        while not os.path.exists(self.filepath):
            await asyncio.sleep(self.poll_interval)
        yield TriggerEvent({'filepath': self.filepath})

Performance Metrics

MetricBlockingDeferrableImprovement
Worker slot time (1hr wait)3600s~0.1s36000x
Memory per waiting task~100MB~1KB100000x
Max concurrent waits (16 slots)161000+62x+
Task startup overheadWorker processTrigger coroutineNegligible

Best Practices

  1. Use for long waits: Deferrable operators shine when tasks wait >5 minutes for external events.
  2. Implement serialize(): Always implement serialize() on triggers for persistence across triggerer restarts.
  3. Handle timeouts: Set execution_timeout on defer calls to prevent infinite waits.
  4. Use async libraries: Leverage aiohttp, httpx, or aiobotocore for non-blocking I/O in triggers.
  5. Monitor triggerer capacity: Track triggers.running and triggers.queued metrics.
  6. Fail gracefully: Handle trigger errors by yielding TriggerEvent({'status': 'error'}).
  7. Avoid blocking in triggers: Never use time.sleep() or synchronous I/O in trigger run() methods.
  8. Test triggers independently: Mock the async loop and verify trigger serialization.

The triggerer capacity setting directly limits how many deferrable tasks can be pending simultaneously. If you have 10,000 tasks deferring at once, ensure capacity in [triggerer] is set accordingly (default is 1000).

Deferrable operators require Airflow 2.2+ and the triggerer component to be running. The triggerer is a lightweight process — a single triggerer instance can handle thousands of concurrent triggers thanks to Python's asyncio.

Key Takeaways:

  • Deferrable operators release worker slots by yielding Trigger objects to the triggerer process
  • Triggers run asynchronously using Python's asyncio — no worker resources consumed during waits
  • Worker slot utilization improves from ~0.001 (blocking) to 1.0 (deferrable) for long waits
  • The triggerer capacity setting limits maximum concurrent deferrable tasks
  • Always implement serialize() on triggers and handle timeouts with execution_timeout
  • Use async HTTP clients (aiohttp, httpx) in triggers — never synchronous I/O

See Also

Advertisement

Need Expert Airflow Help?

Get personalized DAG design, scheduling optimization, or production Airflow consulting.

Advertisement