CW

Error Handling and Retry Strategies in Apache Airflow

Free Lesson

Advertisement

Error Handling and Retry Strategies

Architecture Diagram

Formal Definitions

DfRetry Policy

A retry policy defines how a failed task is retried. It includes the maximum retry count Rmax⁑R_{\max}, the delay between retries Ξ”tretry\Delta t_{\text{retry}}, and an optional exponential backoff factor Ξ²\beta. The delay for retry ii is Ξ”ti=Ξ”t0β‹…Ξ²i\Delta t_i = \Delta t_0 \cdot \beta^i.

DfExecution Timeout

An execution timeout is the maximum allowed time Tmax⁑T_{\max} for a task to complete. If the task exceeds this limit, it is killed and marked as failed. The timeout is enforced at the task instance level.

DfCallback Function

A callback function is a callable invoked on specific task state transitions. Callbacks include on_failure_callback, on_success_callback, on_retry_callback, and on_execute_callback. Formally, fcb:S×C→Actionf_{\text{cb}}: S \times C \rightarrow \text{Action} where SS is the task state and CC is the task context.

DfSLA (Service Level Agreement)

An SLA defines the expected completion time for a task or DAG. If a task exceeds its SLA, an SlaMiss exception is raised and configured callbacks are invoked. The SLA violation condition is Tcompletion>Tsla+Texecution_dateT_{\text{completion}} > T_{\text{sla}} + T_{\text{execution\_date}}.

Detailed Explanation

Retry Configuration

from airflow.decorators import task, dag
from datetime import datetime, timedelta

@dag(
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['error-handling', 'retries'],
)
def retry_example_dag():
    
    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        retry_exponential_backoff=True,
        max_retry_delay=timedelta(minutes=30),
        execution_timeout=timedelta(hours=1),
    )
    def flaky_api_call():
        """Task with automatic retry and timeout."""
        import random
        
        # Simulate random failure
        if random.random() < 0.5:
            raise ConnectionError("API unavailable")
        
        return {"status": "success"}
    
    @task(
        retries=5,
        retry_delay=timedelta(seconds=30),
        retry_exponential_backoff=True,
        max_retry_delay=timedelta(minutes=10),
    )
    def data_processing():
        """Exponential backoff retry for transient errors."""
        import time
        
        # Simulate processing with potential failure
        start_time = time.time()
        while time.time() - start_time < 10:
            # Process data
            pass
        
        return {"processed": 1000}
    
    flaky_api_call() >> data_processing()

retry_example_dag()

Callback Functions

from airflow.decorators import task, dag
from airflow.utils.email import send_email
from datetime import datetime, timedelta
import json

def failure_callback(context):
    """Handle task failure."""
    task_instance = context['task_instance']
    exception = context['exception']
    
    # Log failure details
    print(f"Task {task_instance.task_id} failed")
    print(f"Exception: {exception}")
    print(f"Try number: {task_instance.try_number}")
    print(f"Execution date: {context['execution_date']}")
    
    # Send alert
    send_email(
        to=['team@example.com'],
        subject=f"Airflow Task Failed: {task_instance.task_id}",
        html_content=f"""
        <h2>Task Failure Alert</h2>
        <p><strong>Task:</strong> {task_instance.task_id}</p>
        <p><strong>DAG:</strong> {task_instance.dag_id}</p>
        <p><strong>Exception:</strong> {exception}</p>
        <p><strong>Try:</strong> {task_instance.try_number}</p>
        """,
    )
    
    # Push failure info to XCom
    task_instance.xcom_push(
        key='failure_info',
        value={
            'task_id': task_instance.task_id,
            'error': str(exception),
            'timestamp': datetime.now().isoformat(),
        },
    )

def success_callback(context):
    """Handle task success."""
    task_instance = context['task_instance']
    print(f"Task {task_instance.task_id} succeeded")
    print(f"Duration: {context.get('duration', 'N/A')}")

def retry_callback(context):
    """Handle task retry."""
    task_instance = context['task_instance']
    print(f"Task {task_instance.task_id} retrying")
    print(f"Attempt: {task_instance.try_number}")

@dag(
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['error-handling', 'callbacks'],
    on_failure_callback=failure_callback,
)
def callback_example_dag():
    
    @task(
        on_failure_callback=failure_callback,
        on_success_callback=success_callback,
        on_retry_callback=retry_callback,
    )
    def process_with_callbacks():
        """Task with callback functions."""
        import random
        
        if random.random() < 0.3:
            raise ValueError("Processing error")
        
        return {"status": "success"}
    
    process_with_callbacks()

callback_example_dag()

SLA Configuration

from airflow.decorators import task, dag
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta

def sla_miss_callback(context):
    """Handle SLA miss."""
    task_instance = context['task_instance']
    sla = context.get('sla')
    
    print(f"SLA missed for task: {task_instance.task_id}")
    print(f"SLA: {sla}")
    print(f"Execution date: {context['execution_date']}")

@dag(
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['error-handling', 'sla'],
    sla_miss_callback=sla_miss_callback,
    default_args={
        'sla': timedelta(hours=2),  # Default SLA for all tasks
    },
)
def sla_example_dag():
    
    @task(sla=timedelta(hours=1))
    def critical_task():
        """Task with 1-hour SLA."""
        import time
        time.sleep(5)
        return {"status": "completed"}
    
    @task(sla=timedelta(hours=4))
    def non_critical_task():
        """Task with 4-hour SLA."""
        import time
        time.sleep(2)
        return {"status": "completed"}
    
    @task
    def dependent_task():
        """Task that depends on SLA-monitored tasks."""
        print("Processing after SLA-monitored tasks")
    
    critical_task() >> dependent_task()
    non_critical_task() >> dependent_task()

sla_example_dag()
Exponential Backoff Delay
Ξ”ti=Ξ”t0β‹…Ξ²iforΒ retryΒ i\Delta t_i = \Delta t_0 \cdot \beta^i \quad \text{for retry } i

Here,

  • Ξ”ti\Delta t_i=Delay before retry i
  • Ξ”t0\Delta t_0=Initial retry delay
  • Ξ²\beta=Backoff factor (typically 2)
  • ii=Retry attempt number (0-indexed)

Total Task Time with Retries

Ttotal=βˆ‘i=0R(Texec,i+Ξ”ti)β‹…1fail,iT_{\text{total}} = \sum_{i=0}^{R} (T_{\text{exec},i} + \Delta t_i) \cdot \mathbb{1}_{\text{fail},i}

Here,

  • RR=Maximum retry count
  • Texec,iT_{\text{exec},i}=Execution time of attempt i
  • Ξ”ti\Delta t_i=Delay after attempt i
  • 1fail,i\mathbb{1}_{\text{fail},i}=Indicator: 1 if attempt i failed, 0 otherwise

SLA Violation Condition

SlaMissβ€…β€ŠβŸΊβ€…β€ŠTcompletion>Texecution_date+Tsla\text{SlaMiss} \iff T_{\text{completion}} > T_{\text{execution\_date}} + T_{\text{sla}}

Here,

  • TextcompletionT_{ ext{completion}}=Task completion timestamp
  • TextexecutiondateT_{ ext{execution_date}}=Scheduled execution date
  • TextslaT_{ ext{sla}}=SLA duration

Retry delays are added between attempts, not before the first attempt. With retries=3 and retry_delay=5min, a task that fails 3 times will take at minimum 15 minutes of delay time.

Use retry_exponential_backoff=True for transient errors (network, API). This prevents overwhelming external systems during outages. Set max_retry_delay to cap the maximum wait time.

Key Concepts Table

Error TypeRecommended StrategyRetry CountDelay
Network TimeoutExponential backoff3-530s-5min
API Rate LimitLinear backoff5-101-5min
Database DeadlockExponential backoff31-5min
Resource ExhaustionDelayed retry2-310-30min
Data ValidationNo retry0N/A
Authentication ErrorNo retry0N/A
Disk SpaceAlert only0N/A

Code Examples

Advanced Error Handling Patterns

from airflow.decorators import task, dag
from airflow.exceptions import AirflowException, AirflowSkipException
from datetime import datetime, timedelta
import traceback

@dag(
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['error-handling', 'advanced'],
)
def advanced_error_handling_dag():
    
    @task
    def validate_input(data: dict):
        """Validate input data with specific error types."""
        if not data:
            raise AirflowSkipException("No data to process")
        
        if 'required_field' not in data:
            raise ValueError("Missing required_field")
        
        if data.get('value', 0) < 0:
            raise ValueError("Value must be non-negative")
        
        return data
    
    @task(
        retries=3,
        retry_delay=timedelta(minutes=1),
        retry_on=(ConnectionError, TimeoutError),  # Retry only specific exceptions
    )
    def process_with_selective_retry():
        """Retry only on specific exception types."""
        import random
        
        error_type = random.choice(['ConnectionError', 'ValueError', 'TimeoutError'])
        
        if error_type == 'ConnectionError':
            raise ConnectionError("Network issue")
        elif error_type == 'TimeoutError':
            raise TimeoutError("Request timed out")
        else:
            raise ValueError("Invalid data")  # Won't be retried
    
    @task
    def handle_partial_failure():
        """Handle partial failures in batch processing."""
        results = []
        errors = []
        
        for i in range(10):
            try:
                # Simulate processing
                if i % 3 == 0:
                    raise ValueError(f"Error processing item {i}")
                results.append({"id": i, "status": "success"})
            except Exception as e:
                errors.append({"id": i, "error": str(e)})
        
        return {
            "successful": len(results),
            "failed": len(errors),
            "errors": errors,
        }
    
    @task
    def implement_circuit_breaker():
        """Implement circuit breaker pattern."""
        from airflow.models import Variable
        import time
        
        # Get circuit breaker state
        circuit_state = Variable.get("circuit_breaker_state", default_var="closed")
        failure_count = int(Variable.get("circuit_failure_count", default_var="0"))
        
        if circuit_state == "open":
            # Circuit is open, skip task
            raise AirflowSkipException("Circuit breaker is open")
        
        try:
            # Attempt operation
            result = perform_external_call()
            
            # Reset circuit breaker on success
            if circuit_state == "half-open":
                Variable.set("circuit_breaker_state", "closed")
                Variable.set("circuit_failure_count", "0")
            
            return result
            
        except Exception as e:
            failure_count += 1
            Variable.set("circuit_failure_count", str(failure_count))
            
            # Open circuit after 5 failures
            if failure_count >= 5:
                Variable.set("circuit_breaker_state", "open")
            
            raise
    
    def perform_external_call():
        """Simulate external call."""
        import random
        if random.random() < 0.5:
            raise ConnectionError("External service unavailable")
        return {"status": "success"}
    
    validate_input({"required_field": "value", "value": 100})
    process_with_selective_retry()
    handle_partial_failure()
    implement_circuit_breaker()

advanced_error_handling_dag()

Error Aggregation and Reporting

from airflow.decorators import task, dag
from datetime import datetime, timedelta
import json

@dag(
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['error-handling', 'reporting'],
)
def error_reporting_dag():
    
    @task
    def execute_batch(batch_id: int) -> dict:
        """Execute a batch with potential failures."""
        import random
        
        total_items = 100
        successful = 0
        failed = 0
        errors = []
        
        for i in range(total_items):
            try:
                if random.random() < 0.1:
                    raise ValueError(f"Item {i} validation failed")
                successful += 1
            except Exception as e:
                failed += 1
                errors.append({
                    "item": i,
                    "error": str(e),
                    "timestamp": datetime.now().isoformat(),
                })
        
        return {
            "batch_id": batch_id,
            "total": total_items,
            "successful": successful,
            "failed": failed,
            "error_rate": failed / total_items,
            "errors": errors[:10],  # Keep only first 10 errors
        }
    
    @task
    def aggregate_results(batch_results: list) -> dict:
        """Aggregate results from all batches."""
        total = sum(r["total"] for r in batch_results)
        successful = sum(r["successful"] for r in batch_results)
        failed = sum(r["failed"] for r in batch_results)
        
        all_errors = []
        for r in batch_results:
            all_errors.extend(r.get("errors", []))
        
        return {
            "total_batches": len(batch_results),
            "total_items": total,
            "total_successful": successful,
            "total_failed": failed,
            "overall_error_rate": failed / total if total > 0 else 0,
            "error_summary": all_errors[:50],  # Keep top 50 errors
        }
    
    @task
    def send_error_report(aggregated: dict):
        """Send error report via email."""
        from airflow.utils.email import send_email
        
        if aggregated["total_failed"] == 0:
            print("No errors to report")
            return
        
        html = f"""
        <h2>Error Report</h2>
        <p>Total batches: {aggregated['total_batches']}</p>
        <p>Total items: {aggregated['total_items']}</p>
        <p>Successful: {aggregated['total_successful']}</p>
        <p>Failed: {aggregated['total_failed']}</p>
        <p>Error rate: {aggregated['overall_error_rate']:.2%}</p>
        """
        
        if aggregated["error_summary"]:
            html += "<h3>Sample Errors</h3><ul>"
            for error in aggregated["error_summary"][:10]:
                html += f"<li>Item {error['item']}: {error['error']}</li>"
            html += "</ul>"
        
        send_email(
            to=['team@example.com'],
            subject=f"Batch Processing Error Report - {aggregated['total_failed']} failures",
            html_content=html,
        )
    
    # Execute batches
    batches = execute_batch.expand(batch_id=[1, 2, 3, 4, 5])
    aggregated = aggregate_results(batches)
    send_error_report(aggregated)

error_reporting_dag()

Performance Metrics

Retry Impact Analysis

ScenarioRetriesAvg DelayTotal OverheadSuccess Rate
No retries00085%
Conservative25min10min95%
Moderate32min6min98%
Aggressive51min5min99%
Exponential31-5min7min99%

Error Type Distribution

Error TypeFrequencyRetry SuccessRecommendation
Network Timeout40%HighRetry with backoff
API Rate Limit25%HighLinear backoff
Data Validation15%NoneFix upstream
Resource Exhaustion10%MediumDelay retry
Authentication5%NoneFix credentials
Bug/Logic Error5%NoneFix code

Key Takeaways:

  • Use exponential backoff for transient errors; linear backoff for rate limits
  • Set execution_timeout to prevent infinite loops
  • Implement on_failure_callback for alerting and logging
  • Use SlaMiss callbacks to monitor pipeline health
  • Circuit breaker pattern prevents cascading failures
  • Aggregate errors for batch reporting and trend analysis

See Also

Advertisement

Need Expert Airflow Help?

Get personalized DAG design, scheduling optimization, or production Airflow consulting.

Advertisement