Scheduling and Triggers in Apache Airflow
Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SCHEDULING ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β SCHEDULER COMPONENTS β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Scheduler Heartbeat β β β
β β β βββ Check for scheduled DAG runs β β β
β β β βββ Create DagRun objects β β β
β β β βββ Queue task instances β β β
β β β βββ Update task states β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Timetable Engine β β β
β β β βββ CronExpression β β β
β β β βββ DeltaTimetable β β β
β β β βββ Custom Timetables β β β
β β β βββ Event-driven Timetables β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β TRIGGER TYPES β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Time-based Triggers β β β
β β β βββ Cron schedules β β β
β β β βββ Interval-based schedules β β β
β β β βββ Date-based schedules β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Event-based Triggers β β β
β β β βββ File sensors β β β
β β β βββ HTTP sensors β β β
β β β βββ Database sensors β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Data-driven Triggers β β β
β β β βββ Dataset triggers β β β
β β β βββ External triggers β β β
β β β βββ API triggers β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β CRON EXPRESSION PATTERNS β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β CRON FORMAT: minute hour day_of_month month day_of_week β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Common Patterns β β β
β β β βββ @hourly β 0 * * * * β β β
β β β βββ @daily β 0 0 * * * β β β
β β β βββ @weekly β 0 0 * * 0 β β β
β β β βββ @monthly β 0 0 1 * * β β β
β β β βββ @yearly β 0 0 1 1 * β β β
β β β βββ @quarterly β 0 0 1 */3 * β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Advanced Patterns β β β
β β β βββ Every 15 minutes: */15 * * * * β β β
β β β βββ Every 2 hours: 0 */2 * * * β β β
β β β βββ Weekdays only: 0 0 * * 1-5 β β β
β β β βββ First Monday: 0 0 1-7 * 1 β β β
β β β βββ Last day of month: 0 0 28-31 * * β β β
β β β βββ Specific time: 30 8 * * 1-5 β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β CRON VISUALIZATION β β
β β β β
β β Minute: 0 1 2 3 4 5 6 7 8 9 10 ... 59 β β
β β β β β β β β β β β β β ... β (* = all) β β
β β β β
β β Hour: 0 1 2 3 4 5 6 7 8 9 10 ... 23 β β
β β β β β β β β β β β β β ... β (* = all) β β
β β β β
β β Day: 1 2 3 4 5 6 7 8 9 10 ... 31 β β
β β β β β β β β β β β β β ... β (* = all) β β
β β β β
β β Month: 1 2 3 4 5 6 7 8 9 10 11 12 β β
β β β β β β β β β β β β β β (* = all) β β
β β β β
β β Weekday: 0 1 2 3 4 5 6 β β
β β β β β β β β β (Mon-Fri) β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β TRIGGER ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β TRIGGER COMPONENTS β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Triggerer Service β β β
β β β βββ Async execution of triggers β β β
β β β βββ Event loop management β β β
β β β βββ Trigger state tracking β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Trigger Types β β β
β β β βββ BaseTrigger β β β
β β β βββ FileTrigger β β β
β β β βββ HttpTrigger β β β
β β β βββ SqlTrigger β β β
β β β βββ CustomTriggers β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Trigger Events β β β
β β β βββ TRIGGER_EVENT β β β
β β β βββ EVENT_PAYLOAD β β β
β β β βββ EVENT_TIMESTAMP β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β TRIGGER FLOW β β
β β β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β βOperator βββββββΆβ Trigger βββββββΆβTriggererβββββββΆβSchedulerβ β β
β β β Defers β β Created β β Executesβ β Resumes β β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β β β β β β β
β β βΌ βΌ βΌ βΌ β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β βOperator β βStored inβ βEvent β βTask β β β
β β βPauses β βDatabase β βLoop β βResumes β β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Formal Definitions
DfSchedule Interval
The schedule interval is the time between consecutive DAG runs. It can be defined as a cron expression, a timedelta object, or a predefined shorthand (@daily, @hourly). The scheduler creates a new DagRun when .
DfTrigger
A trigger is an asynchronous event handler that allows operators to defer execution without consuming worker resources. A trigger implements an async run() method yielding TriggerEvent when a condition is met.
DfTimetable
A timetable is Airflow's scheduling abstraction that determines when DAG runs should be created. A timetable maps the last run to the next scheduled execution time, supporting cron, interval, and event-driven patterns.
Detailed Explanation
Cron Expressions
Cron expressions are the foundation of time-based scheduling in Airflow. They provide a flexible way to define recurring schedules using a simple text format. Understanding cron syntax is essential for creating accurate schedules.
Cron Format: A cron expression consists of five fields: minute, hour, day of month, month, and day of week. Each field can contain specific values, ranges, lists, or wildcards. Airflow supports extended cron syntax with special characters like / for intervals and , for lists.
Special Characters:
*matches any value,separates multiple values (e.g.,1,3,5)-specifies ranges (e.g.,1-5)/specifies steps (e.g.,*/15for every 15 minutes)
Common Pitfalls: Cron expressions can be tricky. For example, 0 */2 * * * runs every 2 hours at minute 0, not every 2 minutes. Day of week ranges are 0-6 (Sunday=0). Month values are 1-12. Always test cron expressions before deploying to production.
Airflow Predefined Schedules: Airflow provides shorthand schedules like @hourly, @daily, @weekly, @monthly, and @yearly. These are convenient for common scheduling patterns and are easier to read than raw cron expressions.
Catchup Run Count
Here,
- =Current time
- =DAG start_date
- =Schedule interval
ThSchedule Drift Bound
For a scheduler heartbeat interval , the maximum schedule drift is bounded by where is the DAG parsing latency. Reducing improves scheduling accuracy at the cost of increased CPU utilization.
The catchup=False parameter prevents Airflow from creating backfill runs for missed intervals. This is essential for new DAGs to avoid unintended historical execution. Use catchup=True only when explicitly needed for data recovery.
Use timezone-aware datetime objects throughout your DAGs. Airflow stores timestamps in UTC internally but displays them in the configured timezone. Be explicit about timezone handling to avoid scheduling inconsistencies.
Timetables
Timetables are Airflow's modern scheduling abstraction, introduced in Airflow 2.2. They provide more flexibility than cron expressions and support complex scheduling patterns.
CronExpression Timetable: The default timetable that uses cron expressions. It's backward compatible with existing DAGs and provides familiar scheduling syntax.
DeltaTimetable: Schedules DAGs at fixed intervals using timedelta objects. It's simpler than cron for basic interval scheduling and handles daylight saving time transitions more predictably.
Custom Timetables: You can create custom timetables for specialized scheduling requirements. Custom timetables inherit from Timetable and implement methods for determining when runs should be created.
Event-driven Timetables: These timetables create runs based on external events rather than time. They're useful for event-driven architectures where workflows are triggered by data availability or external signals.
Trigger Mechanism
Triggers enable deferrable execution, allowing operators to pause and resume based on external conditions. This is more efficient than polling because it doesn't consume worker resources while waiting.
Trigger Architecture: The triggerer service runs triggers asynchronously. When an operator defers, it creates a trigger and pauses. The triggerer monitors the trigger and resumes the operator when the condition is met.
Trigger Types: Airflow provides built-in triggers for common patterns like file availability, HTTP endpoints, and database queries. You can also create custom triggers for specialized use cases.
Trigger Events: Triggers emit events when conditions are met. These events can include payload data that the resumed operator can use for further processing.
Scheduling Best Practices
Idempotency: Ensure that DAG runs are idempotent. If a run is triggered multiple times, it should produce the same result. This is crucial for handling backfill and retry scenarios.
Timezone Awareness: Always use timezone-aware datetime objects. Airflow stores timestamps in UTC internally but displays them in the configured timezone. Be explicit about timezone handling in your DAGs.
Catchup Configuration: The catchup parameter determines whether Airflow should create runs for missed intervals. Set it to False for new DAGs to avoid unintended backfill.
Max Active Runs: Use max_active_runs to control concurrent execution. This prevents resource contention and ensures orderly processing.
Dependencies: Use depends_on_past carefully. It can create bottlenecks if not managed properly. Consider using dataset dependencies for more flexible orchestration.
Key Concepts Table
| Component | Purpose | Example | Use Case |
|---|---|---|---|
| Cron Expression | Time-based scheduling | 0 0 * * * | Daily execution |
| DeltaTimetable | Interval scheduling | timedelta(hours=1) | Hourly processing |
| Trigger | Async waiting | FileTrigger | File availability |
| Sensor | Polling waiting | FileSensor | Simple conditions |
| Dataset | Data-driven scheduling | Dataset('s3://data') | Event-driven |
| Backfill | Historical runs | catchup=True | Data recovery |
| Max Active Runs | Concurrency control | max_active_runs=1 | Sequential execution |
| Timezone | Time handling | timezone('UTC') | Global deployments |
Code Examples
Advanced Cron Patterns
# advanced_cron_patterns.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.time_sensor import TimeSensor
from airflow.sensors.external_task import ExternalTaskSensor
import croniter
def business_day_schedule():
"""Generate schedule for business days only."""
# Run at 9 AM on weekdays
return '0 9 * * 1-5'
def month_end_schedule():
"""Generate schedule for last day of month."""
# Run at 11 PM on last day of month
return '0 23 28-31 * *'
def quarterly_schedule():
"""Generate schedule for quarterly processing."""
# Run on first day of each quarter at midnight
return '0 0 1 1,4,7,10 *'
def custom_cron_parser(cron_expression: str):
"""Parse and validate cron expression."""
from croniter import croniter
from datetime import datetime
# Validate cron expression
if not croniter.is_valid(cron_expression):
raise ValueError(f"Invalid cron expression: {cron_expression}")
# Get next execution times
cron = croniter(cron_expression, datetime.now())
next_runs = []
for _ in range(5):
next_runs.append(cron.get_next(datetime))
return next_runs
def advanced_cron_example():
"""Demonstrate advanced cron patterns."""
# Complex cron patterns
patterns = {
'every_15_minutes': '*/15 * * * *',
'every_2_hours_weekdays': '0 */2 * * 1-5',
'first_monday_of_month': '0 0 1-7 * 1',
'last_friday_of_month': '0 0 25-31 * 5',
'every_6_hours_utc': '0 */6 * * *',
}
for name, pattern in patterns.items():
print(f"\n{name}: {pattern}")
next_runs = custom_cron_parser(pattern)
for run_time in next_runs:
print(f" Next run: {run_time}")
with DAG(
'advanced_cron_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Advanced cron scheduling patterns',
schedule_interval=business_day_schedule(),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['cron', 'advanced'],
) as dag:
# Task to demonstrate cron parsing
parse_cron = PythonOperator(
task_id='parse_cron',
python_callable=advanced_cron_example,
)
# Time sensor example
def check_business_hours(**context):
"""Check if current time is within business hours."""
from airflow.utils import timezone
current_time = timezone.utcnow()
# Business hours: 9 AM to 5 PM UTC
if 9 <= current_time.hour < 17:
return True
return False
time_check = PythonOperator(
task_id='business_hours_check',
python_callable=check_business_hours,
)
parse_cron >> time_check
Custom Timetable Implementation
# custom_timetable.py
from datetime import datetime, timedelta
from typing import Optional
from airflow.timetables.base import Timetable, DagRunInfo, TimeRestriction
from airflow.models import DagModel
from airflow import settings
class BusinessDaysTimetable(Timetable):
"""
Custom timetable that only schedules on business days.
Excludes weekends and major holidays.
"""
def __init__(
self,
hour: int = 9,
minute: int = 0,
exclude_holidays: bool = True,
):
super().__init__()
self.hour = hour
self.minute = minute
self.exclude_holidays = exclude_holidays
def _is_business_day(self, date: datetime) -> bool:
"""Check if date is a business day."""
# Check if weekend
if date.weekday() >= 5: # Saturday (5) or Sunday (6)
return False
if self.exclude_holidays:
# Check against holiday list
holidays = self._get_holidays(date.year)
if date.date() in holidays:
return False
return True
def _get_holidays(self, year: int) -> set:
"""Get holidays for a given year."""
# Example US federal holidays
holidays = set()
# New Year's Day
holidays.add(datetime(year, 1, 1).date())
# Independence Day
holidays.add(datetime(year, 7, 4).date())
# Christmas
holidays.add(datetime(year, 12, 25).date())
# Add more holidays as needed
return holidays
def get_next_dagrun_info(
self,
last_automated_data_interval: Optional[DataInterval],
restriction: TimeRestriction,
) -> Optional[DagRunInfo]:
"""Get next DAG run information."""
if last_automated_data_interval is None:
# First run - start from restriction start_date
start_date = restriction.start_date or datetime.now()
else:
# Next run - start from last run end
start_date = last_automated_data_interval.end
# Find next business day
next_date = start_date
while not self._is_business_day(next_date):
next_date += timedelta(days=1)
# Set to specified time
next_date = next_date.replace(
hour=self.hour,
minute=self.minute,
second=0,
microsecond=0,
)
# Check if we've exceeded restriction
if restriction.end and next_date > restriction.end:
return None
# Create data interval
data_interval = DataInterval(start=next_date, end=next_date)
return DagRunInfo(run_id=f"business_{next_date.isoformat()}", data_interval=data_interval)
def infer_manual_data_interval(self, run_after: datetime) -> DataInterval:
"""Infer data interval for manual runs."""
# Use previous business day as data interval
previous_date = run_after - timedelta(days=1)
while not self._is_business_day(previous_date):
previous_date -= timedelta(days=1)
return DataInterval(start=previous_date, end=previous_date)
# Usage in DAG
with DAG(
'business_days_dag',
timetable=BusinessDaysTimetable(hour=9, minute=0),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['timetable', 'business_days'],
) as dag:
# Tasks here will only run on business days
pass
Trigger Implementation
# trigger_implementation.py
from datetime import datetime, timedelta
from typing import Any, AsyncIterator, Dict, Optional, Tuple
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.models import Connection
import asyncio
import aiohttp
class WebhookTrigger(BaseTrigger):
"""
Custom trigger that waits for a webhook call.
This trigger listens for incoming HTTP requests and resumes
the operator when a valid webhook is received.
"""
def __init__(
self,
webhook_path: str,
method: str = 'POST',
headers: Optional[Dict[str, str]] = None,
timeout: int = 300,
):
super().__init__()
self.webhook_path = webhook_path
self.method = method
self.headers = headers or {}
self.timeout = timeout
def serialize(self) -> Tuple[str, Dict[str, Any]]:
"""Serialize the trigger for storage."""
return (
"trigger_implementation.WebhookTrigger",
{
"webhook_path": self.webhook_path,
"method": self.method,
"headers": self.headers,
"timeout": self.timeout,
},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Run the webhook listener."""
import aiohttp.web
# Create a simple web server to listen for webhooks
app = aiohttp.web.Application()
app.router.add_route(self.method, self.webhook_path, self.handle_webhook)
runner = aiohttp.web.AppRunner(app)
await runner.setup()
site = aiohttp.web.TCPSite(runner, 'localhost', 8080)
await site.start()
try:
# Wait for webhook with timeout
async with asyncio.timeout(self.timeout):
while True:
await asyncio.sleep(1)
# Check for webhook data
if hasattr(self, '_webhook_data'):
yield TriggerEvent(self._webhook_data)
break
finally:
await runner.cleanup()
async def handle_webhook(self, request: aiohttp.web.Request):
"""Handle incoming webhook request."""
try:
data = await request.json()
self._webhook_data = {
'status': 'success',
'data': data,
'timestamp': datetime.now().isoformat(),
}
return aiohttp.web.json_response({'status': 'received'})
except Exception as e:
self._webhook_data = {
'status': 'error',
'error': str(e),
'timestamp': datetime.now().isoformat(),
}
return aiohttp.web.json_response({'status': 'error'}, status=400)
class PollingTrigger(BaseTrigger):
"""
Custom trigger that polls an endpoint until condition is met.
"""
def __init__(
self,
http_conn_id: str,
endpoint: str,
method: str = 'GET',
headers: Optional[Dict[str, str]] = None,
poll_interval: int = 30,
timeout: int = 3600,
success_criteria: Optional[Dict[str, Any]] = None,
):
super().__init__()
self.http_conn_id = http_conn_id
self.endpoint = endpoint
self.method = method
self.headers = headers or {}
self.poll_interval = poll_interval
self.timeout = timeout
self.success_criteria = success_criteria or {}
def serialize(self) -> Tuple[str, Dict[str, Any]]:
"""Serialize the trigger for storage."""
return (
"trigger_implementation.PollingTrigger",
{
"http_conn_id": self.http_conn_id,
"endpoint": self.endpoint,
"method": self.method,
"headers": self.headers,
"poll_interval": self.poll_interval,
"timeout": self.timeout,
"success_criteria": self.success_criteria,
},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Run the polling loop."""
conn = Connection.get_connection(self.http_conn_id)
url = f"{conn.schema}://{conn.host}:{conn.port}{self.endpoint}"
headers = {**self.headers}
if conn.login:
headers['Authorization'] = f"Bearer {conn.get_password()}"
start_time = datetime.now()
timeout_delta = timedelta(seconds=self.timeout)
async with aiohttp.ClientSession() as session:
while True:
try:
async with session.request(
self.method,
url,
headers=headers,
) as response:
data = await response.json()
# Check success criteria
if self._check_success_criteria(data):
yield TriggerEvent({
'status': 'success',
'data': data,
'timestamp': datetime.now().isoformat(),
})
return
except Exception as e:
# Log error but continue polling
print(f"Polling error: {e}")
# Check timeout
if datetime.now() - start_time > timeout_delta:
yield TriggerEvent({
'status': 'timeout',
'error': 'Polling timeout exceeded',
'timestamp': datetime.now().isoformat(),
})
return
# Wait before next poll
await asyncio.sleep(self.poll_interval)
def _check_success_criteria(self, data: Dict[str, Any]) -> bool:
"""Check if response meets success criteria."""
for key, value in self.success_criteria.items():
if key not in data or data[key] != value:
return False
return True
# Usage in DAG
from airflow.operators.python import PythonOperator
from airflow.models import DAG
def process_trigger_result(**context):
"""Process the result from trigger."""
trigger_event = context['ti'].xcom_pull(
task_ids='wait_for_webhook',
key='return_value',
)
print(f"Trigger event received: {trigger_event}")
with DAG(
'trigger_example_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Custom trigger example',
schedule_interval=timedelta(hours=1),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['trigger', 'custom'],
) as dag:
# Webhook trigger example
from airflow.operators.python import PythonOperator
wait_for_webhook = PythonOperator(
task_id='wait_for_webhook',
python_callable=process_trigger_result,
# In practice, you'd use a sensor with the custom trigger
)
# Polling trigger example
wait_for_condition = PythonOperator(
task_id='wait_for_condition',
python_callable=process_trigger_result,
)
wait_for_webhook >> wait_for_condition
Dataset-Driven Scheduling
# dataset_scheduling.py
from datetime import datetime, timedelta
from airflow import DAG, Dataset
from airflow.operators.python import PythonOperator
from airflow.datasets import Dataset
# Define datasets
raw_data_dataset = Dataset("s3://my-bucket/raw-data/{ds}/")
processed_data_dataset = Dataset("s3://my-bucket/processed-data/{ds}/")
report_dataset = Dataset("s3://my-bucket/reports/{ds}/")
def extract_data(**context):
"""Extract data from source."""
# Simulate data extraction
print("Extracting data...")
return {"status": "success", "records": 1000}
def transform_data(**context):
"""Transform extracted data."""
# Simulate data transformation
print("Transforming data...")
return {"status": "success", "transformed_records": 950}
def generate_report(**context):
"""Generate report from transformed data."""
# Simulate report generation
print("Generating report...")
return {"status": "success", "report_url": "s3://reports/report.pdf"}
# DAG 1: Data Extraction (writes to raw_data_dataset)
with DAG(
'data_extraction_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Data extraction DAG',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['dataset', 'extraction'],
# This DAG triggers when raw_data_dataset is updated
schedule=[raw_data_dataset],
) as extraction_dag:
extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
outlets=[raw_data_dataset], # This DAG produces this dataset
)
# DAG 2: Data Processing (reads from raw_data_dataset, writes to processed_data_dataset)
with DAG(
'data_processing_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Data processing DAG',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['dataset', 'processing'],
# This DAG runs when raw_data_dataset is updated
schedule=[raw_data_dataset],
) as processing_dag:
transform = PythonOperator(
task_id='transform',
python_callable=transform_data,
inlets=[raw_data_dataset], # This DAG reads this dataset
outlets=[processed_data_dataset], # This DAG writes this dataset
)
# DAG 3: Report Generation (reads from processed_data_dataset)
with DAG(
'report_generation_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Report generation DAG',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['dataset', 'reporting'],
# This DAG runs when processed_data_dataset is updated
schedule=[processed_data_dataset],
) as report_dag:
report = PythonOperator(
task_id='generate_report',
python_callable=generate_report,
inlets=[processed_data_dataset], # This DAG reads this dataset
outlets=[report_dataset], # This DAG writes this dataset
)
Performance Metrics
| Metric | Description | Optimization Strategy |
|---|---|---|
| Cron Parse Time | Time to parse cron expression | Use simple expressions |
| Schedule Accuracy | How closely actual run times match scheduled times | Optimize scheduler heartbeat |
| Trigger Latency | Time for trigger to fire | Use async triggers, optimize polling |
| DAG Run Creation | Time to create DagRun objects | Optimize database queries |
| Backfill Speed | Time to complete backfill | Parallelize, optimize task execution |
| Timezone Handling | Accuracy of timezone conversions | Use timezone-aware datetime objects |
| Schedule Drift | Deviation from intended schedule | Monitor and adjust scheduling parameters |
| Resource Usage | Scheduler resource consumption | Optimize heartbeat interval, limit active runs |
Best Practices
-
Cron Expression Validation: Always validate cron expressions before deploying. Use tools like
croniterto test and verify schedules. -
Timezone Handling: Use timezone-aware datetime objects throughout your DAGs. Be explicit about timezone conversions and avoid naive datetime objects.
-
Catchup Configuration: Set
catchup=Falsefor new DAGs to avoid unintended backfill. Usecatchup=Trueonly when you explicitly need historical runs. -
Max Active Runs: Configure
max_active_runsto prevent resource contention. Usemax_active_runs=1for sequential execution. -
Schedule Interval: Choose appropriate schedule intervals based on data requirements and resource constraints. Avoid over-scheduling.
-
Trigger Usage: Use triggers for external dependencies instead of polling. This improves efficiency and reduces resource usage.
-
Dataset Dependencies: Use dataset-driven scheduling for event-driven workflows. This provides more flexibility than time-based scheduling.
-
Monitoring: Monitor schedule accuracy and trigger performance. Set up alerts for missed or delayed runs.
-
Testing: Test scheduling logic thoroughly. Use Airflow's testing utilities to validate schedule behavior.
-
Documentation: Document scheduling decisions and rationale. Include schedule descriptions in DAG documentation.
Key Takeaways:
- Schedule interval determines DAG run frequency
- Catchup run count is
- Schedule drift is bounded by
- Triggers enable async deferral without worker resource consumption
- Timetables provide flexible scheduling beyond cron expressions
- Always use timezone-aware datetime objects to avoid scheduling bugs
See also: Kafka Connect (kafka/03), PySpark Submit (pyspark/19), Data Engineering Orchestration (data-engineering/017)