πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Topic: Airflow Scheduling Deep Dive

Apache Airflow AdvancedScheduling⭐ Premium

Advertisement

Airflow Scheduling Deep Dive

Intervals, Cron, and Timetables

AmazonMetaDifficulty: Advanced

Interview Question

ℹ️Interview Context

Company: Amazon / Meta Role: Senior Data Engineer / Platform Engineer Difficulty: Advanced Time: 45-60 minutes

Question: "Explain the different scheduling mechanisms in Airflow. How do cron expressions, timedelta, and custom timetables work? When would you use each approach?"


Detailed Theory

Scheduling Fundamentals

# scheduling_fundamentals.py
"""
Airflow Scheduling Mechanisms:

1. Cron Expressions:
   - Standard cron syntax
   - Good for simple schedules
   - Limited flexibility

2. timedelta:
   - Python timedelta objects
   - Good for fixed intervals
   - Easy to understand

3. Custom Timetables:
   - Airflow 2.2+ feature
   - Maximum flexibility
   - Complex scheduling logic

4. Preset Intervals:
   - @daily, @hourly, @weekly
   - Convenient shortcuts
   - Common schedules
"""

1. Cron Expressions

# cron_expressions.py
from airflow.decorators import dag
from datetime import datetime

# Standard cron format: minute hour day_of_month month day_of_week
# * * * * *
# | | | | |
# | | | | +-- Day of week (0-6) (Sunday=0)
# | | | +---- Month (1-12)
# | | +------ Day of month (1-31)
# | +-------- Hour (0-23)
# +---------- Minute (0-59)

# Common cron examples
CRON_EXAMPLES = {
    # Every day at 2 AM
    'daily_2am': '0 2 * * *',
    
    # Every hour
    'hourly': '0 * * * *',
    
    # Every 15 minutes
    'every_15_min': '*/15 * * * *',
    
    # Monday at 9 AM
    'weekly_monday_9am': '0 9 * * 1',
    
    # First day of month at midnight
    'monthly_first': '0 0 1 * *',
    
    # Every weekday at 6 AM
    'weekdays_6am': '0 6 * * 1-5',
    
    # Every 6 hours
    'every_6_hours': '0 */6 * * *',
    
    # 1st and 15th of month at 8 AM
    'twice_monthly': '0 8 1,15 * *',
}

# DAG with cron schedule
@dag(
    dag_id='cron_schedule_example',
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    start_date=datetime(2024, 1, 1),
    catchup=False,
)
def cron_dag():
    pass

# DAG with preset schedule
@dag(
    dag_id='preset_schedule_example',
    schedule_interval='@daily',  # Same as '0 0 * * *'
    start_date=datetime(2024, 1, 1),
    catchup=False,
)
def preset_dag():
    pass

⚠️Cron Limitations

Cron expressions don't handle:

  • Timezone conversions (use tz parameter)
  • Complex business logic
  • Variable intervals
  • DST transitions

2. timedelta Scheduling

# timedelta_scheduling.py
from airflow.decorators import dag
from datetime import datetime, timedelta

# timedelta schedule
@dag(
    dag_id='timedelta_schedule_example',
    schedule_interval=timedelta(hours=6),  # Every 6 hours
    start_date=datetime(2024, 1, 1),
    catchup=False,
)
def timedelta_dag():
    pass

# Common timedelta examples
TIMEDELTA_EXAMPLES = {
    'every_15_min': timedelta(minutes=15),
    'every_hour': timedelta(hours=1),
    'every_6_hours': timedelta(hours=6),
    'daily': timedelta(days=1),
    'weekly': timedelta(weeks=1),
    'bi_weekly': timedelta(weeks=2),
    'monthly': timedelta(days=30),  # Approximate
}

# DAG with timedelta
@dag(
    dag_id='timedelta_example',
    schedule_interval=timedelta(hours=6),
    start_date=datetime(2024, 1, 1),
    catchup=False,
)
def timedelta_schedule():
    pass

3. Custom Timetables

# custom_timetables.py
from airflow.timetables.base import Timetable, DataInterval, TimeRestriction
from airflow.timetombs.schedules import cron
from datetime import datetime, timedelta
from typing import Optional
import pytz

class BusinessHourTimetable(Timetable):
    """
    Custom timetable for business hours only.
    
    Runs only during business hours (9 AM - 5 PM, Mon-Fri).
    """
    
    def __init__(
        self,
        cron_str: str = '0 9-17 * * 1-5',  # Every hour 9-5, Mon-Fri
        timezone: str = 'UTC',
    ):
        self.cron = cron_str
        self.timezone = pytz.timezone(timezone)
    
    def next_dagrun_info(
        self,
        *,
        last_automated_data_interval: Optional[DataInterval],
        restriction: TimeRestriction,
    ) -> Optional[DataInterval]:
        """Calculate next DAG run time"""
        if last_automated_data_interval is None:
            # First run
            start = datetime.now(self.timezone)
            start = start.replace(minute=0, second=0, microsecond=0)
            end = start + timedelta(hours=1)
            return DataInterval(start=start, end=end)
        
        # Next run
        start = last_automated_data_interval.end
        end = start + timedelta(hours=1)
        
        # Skip weekends
        if start.weekday() >= 5:  # Saturday or Sunday
            # Move to Monday 9 AM
            days_until_monday = 7 - start.weekday()
            start = start + timedelta(days=days_until_monday)
            start = start.replace(hour=9, minute=0, second=0, microsecond=0)
            end = start + timedelta(hours=1)
        
        # Skip non-business hours
        if start.hour < 9:
            start = start.replace(hour=9, minute=0, second=0, microsecond=0)
            end = start + timedelta(hours=1)
        elif start.hour >= 17:
            start = start + timedelta(days=1)
            start = start.replace(hour=9, minute=0, second=0, microsecond=0)
            end = start + timedelta(hours=1)
        
        return DataInterval(start=start, end=end)

# Usage
@dag(
    dag_id='business_hours_dag',
    timetable=BusinessHourTimetable(timezone='America/New_York'),
    start_date=datetime(2024, 1, 1),
    catchup=False,
)
def business_hours_dag():
    pass

class MonthlyFirstDayTimetable(Timetable):
    """
    Custom timetable for first day of month.
    """
    
    def next_dagrun_info(
        self,
        *,
        last_automated_data_interval: Optional[DataInterval],
        restriction: TimeRestriction,
    ) -> Optional[DataInterval]:
        """Calculate next DAG run time"""
        if last_automated_data_interval is None:
            # First run: first day of current month
            now = datetime.now()
            start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
            end = start + timedelta(days=1)
            return DataInterval(start=start, end=end)
        
        # Next run: first day of next month
        start = last_automated_data_interval.end
        if start.month == 12:
            start = start.replace(year=start.year + 1, month=1, day=1)
        else:
            start = start.replace(month=start.month + 1, day=1)
        
        end = start + timedelta(days=1)
        return DataInterval(start=start, end=end)

# Usage
@dag(
    dag_id='monthly_first_dag',
    timetable=MonthlyFirstDayTimetable(),
    start_date=datetime(2024, 1, 1),
    catchup=False,
)
def monthly_first_dag():
    pass

ℹ️Pro Tip

Custom Timetables are the most flexible scheduling option in Airflow 2.2+. Use them for complex business logic that can't be expressed with cron or timedelta.

4. Backfilling and Catchup

# backfilling.py
from airflow.decorators import dag
from datetime import datetime, timedelta

# DAG with catchup
@dag(
    dag_id='catchup_example',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=True,  # Backfill missing runs
    max_active_runs=1,  # Only one run at a time
)
def catchup_dag():
    pass

# DAG without catchup
@dag(
    dag_id='no_catchup_example',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,  # Don't backfill
)
def no_catchup_dag():
    pass

# Manual backfill command
# airflow dags backfill -s 2024-01-01 -e 2024-01-31 my_dag

# Backfill configuration
BACKFILL_CONFIG = """
# airflow.cfg settings for backfilling
[core]
# Max active runs per DAG
max_active_runs_per_dag = 1

# Parallelism for backfill
parallelism = 32

# DAG level settings
[dag_level_permissions]
# Enable DAG level permissions
enable_dag_level_permissions = True
"""

5. Timezone Handling

# timezone_handling.py
from airflow.decorators import dag
from datetime import datetime
import pytz

# DAG with timezone
@dag(
    dag_id='timezone_example',
    schedule_interval='0 2 * * *',
    start_date=datetime(2024, 1, 1, tzinfo=pytz.timezone('America/New_York')),
    catchup=False,
    tags=['timezone'],
)
def timezone_dag():
    pass

# Timezone utilities
TIMEZONE_EXAMPLES = {
    'UTC': 'UTC',
    'US_Eastern': 'America/New_York',
    'US_Pacific': 'America/Los_Angeles',
    'Europe_London': 'Europe/London',
    'Asia_Tokyo': 'Asia/Tokyo',
}

# Get current time in timezone
def get_current_time(timezone: str = 'UTC') -> datetime:
    """Get current time in specified timezone"""
    tz = pytz.timezone(timezone)
    return datetime.now(tz)

# Convert between timezones
def convert_timezone(
    dt: datetime,
    from_tz: str,
    to_tz: str
) -> datetime:
    """Convert datetime between timezones"""
    from_timezone = pytz.timezone(from_tz)
    to_timezone = pytz.timezone(to_tz)
    
    # Localize to source timezone
    dt = from_timezone.localize(dt)
    
    # Convert to target timezone
    return dt.astimezone(to_timezone)

Real-World Scenarios

Scenario 1: Amazon's Multi-Region Scheduling

# amazon_multi_region.py
"""
Amazon-style multi-region scheduling:
- Different schedules per region
- Timezone handling
- Business hours only
"""

from airflow.decorators import dag
from datetime import datetime, timedelta
from airflow.timetables.base import Timetable, DataInterval, TimeRestriction
from typing import Optional
import pytz

class MultiRegionTimetable(Timetable):
    """Custom timetable for multi-region processing"""
    
    def __init__(
        self,
        regions: dict,
        primary_region: str = 'us-east-1',
    ):
        self.regions = regions
        self.primary_region = primary_region
    
    def next_dagrun_info(
        self,
        *,
        last_automated_data_interval: Optional[DataInterval],
        restriction: TimeRestriction,
    ) -> Optional[DataInterval]:
        """Calculate next DAG run based on primary region"""
        primary_tz = pytz.timezone(
            self.regions[self.primary_region]['timezone']
        )
        
        if last_automated_data_interval is None:
            # First run
            now = datetime.now(primary_tz)
            start = now.replace(minute=0, second=0, microsecond=0)
            end = start + timedelta(hours=1)
            return DataInterval(start=start, end=end)
        
        # Next run
        start = last_automated_data_interval.end
        end = start + timedelta(hours=1)
        
        # Check if within business hours
        current_hour = start.hour
        if current_hour < 9 or current_hour >= 17:
            # Move to next business hour
            start = start.replace(hour=9, minute=0, second=0, microsecond=0)
            if current_hour >= 17:
                start = start + timedelta(days=1)
            end = start + timedelta(hours=1)
        
        return DataInterval(start=start, end=end)

# Usage
REGIONS = {
    'us-east-1': {'timezone': 'America/New_York', 'business_hours': (9, 17)},
    'eu-west-1': {'timezone': 'Europe/London', 'business_hours': (9, 17)},
    'ap-southeast-1': {'timezone': 'Asia/Singapore', 'business_hours': (9, 17)},
}

@dag(
    dag_id='amazon_multi_region',
    timetable=MultiRegionTimetable(
        regions=REGIONS,
        primary_region='us-east-1'
    ),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['amazon', 'multi-region', 'production'],
)
def multi_region_dag():
    pass

Scenario 2: Meta's Event-Driven Scheduling

# meta_event_driven.py
"""
Meta-style event-driven scheduling:
- Trigger on external events
- Variable intervals
- Dynamic scheduling
"""

from airflow.decorators import dag, task
from datetime import datetime, timedelta
from airflow.timetables.base import Timetable, DataInterval, TimeRestriction
from typing import Optional
import redis
import json

class EventDrivenTimetable(Timetable):
    """Timetable that triggers on external events"""
    
    def __init__(
        self,
        redis_host: str = 'localhost',
        redis_port: int = 6379,
        queue_name: str = 'airflow_events',
    ):
        self.redis_client = redis.Redis(
            host=redis_host,
            port=redis_port
        )
        self.queue_name = queue_name
    
    def next_dagrun_info(
        self,
        *,
        last_automated_data_interval: Optional[DataInterval],
        restriction: TimeRestriction,
    ) -> Optional[DataInterval]:
        """Check for events and schedule accordingly"""
        # Check if there are events to process
        queue_length = self.redis_client.llen(self.queue_name)
        
        if queue_length == 0:
            return None  # No events, don't schedule
        
        # Schedule run for now
        now = datetime.now()
        start = now.replace(second=0, microsecond=0)
        end = start + timedelta(minutes=5)  # 5-minute window
        
        return DataInterval(start=start, end=end)

# Usage
@dag(
    dag_id='meta_event_driven',
    timetable=EventDrivenTimetable(
        redis_host='redis-cluster',
        queue_name='data_events'
    ),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['meta', 'event-driven', 'production'],
)
def event_driven_dag():
    @task
    def process_events():
        """Process events from queue"""
        import redis
        import json
        
        r = redis.Redis(host='redis-cluster')
        events = []
        
        # Get events from queue
        while True:
            event = r.lpop('data_events')
            if not event:
                break
            events.append(json.loads(event))
        
        return {'events_processed': len(events)}
    
    process_events()

event_driven_dag()

Edge Cases

⚠️Common Pitfalls

  1. Timezone Issues: Always specify timezone explicitly. Don't rely on system timezone.

  2. DST Transitions: Handle daylight saving time transitions gracefully.

  3. Catchup Runs: Be careful with catchup=True on large date ranges.

  4. Overlapping Runs: Use max_active_runs to prevent overlapping runs.

# edge_cases.py
from airflow.decorators import dag
from datetime import datetime
import pytz

# Timezone issue
@dag(
    dag_id='timezone_issue',
    schedule_interval='0 2 * * *',
    start_date=datetime(2024, 1, 1),  # BAD: No timezone!
    catchup=False,
)
def timezone_issue_dag():
    pass

# Timezone correct
@dag(
    dag_id='timezone_correct',
    schedule_interval='0 2 * * *',
    start_date=datetime(2024, 1, 1, tzinfo=pytz.timezone('America/New_York')),
    catchup=False,
)
def timezone_correct_dag():
    pass

# Overlapping runs issue
@dag(
    dag_id='overlapping_issue',
    schedule_interval='@hourly',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    max_active_runs=None,  # BAD: No limit!
)
def overlapping_issue_dag():
    pass

# Overlapping correct
@dag(
    dag_id='overlapping_correct',
    schedule_interval='@hourly',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    max_active_runs=1,  # GOOD: Limit to 1
)
def overlapping_correct_dag():
    pass

QuizBox


Best Practices

# best_practices.py
"""
Scheduling Best Practices:

1. Timezone Handling:
   - Always specify timezone explicitly
   - Use UTC for internal processing
   - Convert to local time for display

2. Catchup Configuration:
   - Use catchup=False for new DAGs
   - Use catchup=True for historical data
   - Set max_active_runs for backfills

3. Schedule Selection:
   - Use cron for specific times
   - Use timedelta for fixed intervals
   - Use custom Timetables for complex logic

4. Performance:
   - Avoid very frequent schedules (< 5 min)
   - Use appropriate max_active_runs
   - Monitor scheduler performance

5. Error Handling:
   - Handle missed runs gracefully
   - Implement retry logic
   - Monitor scheduling delays
"""

ℹ️Amazon Interview Tip

At Amazon, they use custom Timetables for complex scheduling logic. When discussing scheduling, emphasize the importance of timezone handling and how they prevent overlapping runs. Also mention their approach to backfilling historical data.


Summary

Scheduling is fundamental to Airflow. Key takeaways:

  1. Cron expressions for specific times
  2. Timedelta for fixed intervals
  3. Custom Timetables for complex logic
  4. Timezone handling is critical
  5. Catchup and backfilling for historical data

For Amazon and Meta interviews, focus on:

  • Timezone handling
  • Custom Timetable implementation
  • Preventing overlapping runs
  • Backfilling strategies
  • Performance optimization

This question is part of the Apache Airflow Advanced interview preparation series. Practice explaining these concepts before your interview.

Advertisement