Error Handling and Retry Strategies
Architecture Diagram
Formal Definitions
DfRetry Policy
A retry policy defines how a failed task is retried. It includes the maximum retry count , the delay between retries , and an optional exponential backoff factor . The delay for retry is .
DfExecution Timeout
An execution timeout is the maximum allowed time for a task to complete. If the task exceeds this limit, it is killed and marked as failed. The timeout is enforced at the task instance level.
DfCallback Function
A callback function is a callable invoked on specific task state transitions. Callbacks include on_failure_callback, on_success_callback, on_retry_callback, and on_execute_callback. Formally, where is the task state and is the task context.
DfSLA (Service Level Agreement)
An SLA defines the expected completion time for a task or DAG. If a task exceeds its SLA, an SlaMiss exception is raised and configured callbacks are invoked. The SLA violation condition is .
Detailed Explanation
Retry Configuration
from airflow.decorators import task, dag
from datetime import datetime, timedelta
@dag(
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['error-handling', 'retries'],
)
def retry_example_dag():
@task(
retries=3,
retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True,
max_retry_delay=timedelta(minutes=30),
execution_timeout=timedelta(hours=1),
)
def flaky_api_call():
"""Task with automatic retry and timeout."""
import random
# Simulate random failure
if random.random() < 0.5:
raise ConnectionError("API unavailable")
return {"status": "success"}
@task(
retries=5,
retry_delay=timedelta(seconds=30),
retry_exponential_backoff=True,
max_retry_delay=timedelta(minutes=10),
)
def data_processing():
"""Exponential backoff retry for transient errors."""
import time
# Simulate processing with potential failure
start_time = time.time()
while time.time() - start_time < 10:
# Process data
pass
return {"processed": 1000}
flaky_api_call() >> data_processing()
retry_example_dag()
Callback Functions
from airflow.decorators import task, dag
from airflow.utils.email import send_email
from datetime import datetime, timedelta
import json
def failure_callback(context):
"""Handle task failure."""
task_instance = context['task_instance']
exception = context['exception']
# Log failure details
print(f"Task {task_instance.task_id} failed")
print(f"Exception: {exception}")
print(f"Try number: {task_instance.try_number}")
print(f"Execution date: {context['execution_date']}")
# Send alert
send_email(
to=['team@example.com'],
subject=f"Airflow Task Failed: {task_instance.task_id}",
html_content=f"""
<h2>Task Failure Alert</h2>
<p><strong>Task:</strong> {task_instance.task_id}</p>
<p><strong>DAG:</strong> {task_instance.dag_id}</p>
<p><strong>Exception:</strong> {exception}</p>
<p><strong>Try:</strong> {task_instance.try_number}</p>
""",
)
# Push failure info to XCom
task_instance.xcom_push(
key='failure_info',
value={
'task_id': task_instance.task_id,
'error': str(exception),
'timestamp': datetime.now().isoformat(),
},
)
def success_callback(context):
"""Handle task success."""
task_instance = context['task_instance']
print(f"Task {task_instance.task_id} succeeded")
print(f"Duration: {context.get('duration', 'N/A')}")
def retry_callback(context):
"""Handle task retry."""
task_instance = context['task_instance']
print(f"Task {task_instance.task_id} retrying")
print(f"Attempt: {task_instance.try_number}")
@dag(
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['error-handling', 'callbacks'],
on_failure_callback=failure_callback,
)
def callback_example_dag():
@task(
on_failure_callback=failure_callback,
on_success_callback=success_callback,
on_retry_callback=retry_callback,
)
def process_with_callbacks():
"""Task with callback functions."""
import random
if random.random() < 0.3:
raise ValueError("Processing error")
return {"status": "success"}
process_with_callbacks()
callback_example_dag()
SLA Configuration
from airflow.decorators import task, dag
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta
def sla_miss_callback(context):
"""Handle SLA miss."""
task_instance = context['task_instance']
sla = context.get('sla')
print(f"SLA missed for task: {task_instance.task_id}")
print(f"SLA: {sla}")
print(f"Execution date: {context['execution_date']}")
@dag(
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['error-handling', 'sla'],
sla_miss_callback=sla_miss_callback,
default_args={
'sla': timedelta(hours=2), # Default SLA for all tasks
},
)
def sla_example_dag():
@task(sla=timedelta(hours=1))
def critical_task():
"""Task with 1-hour SLA."""
import time
time.sleep(5)
return {"status": "completed"}
@task(sla=timedelta(hours=4))
def non_critical_task():
"""Task with 4-hour SLA."""
import time
time.sleep(2)
return {"status": "completed"}
@task
def dependent_task():
"""Task that depends on SLA-monitored tasks."""
print("Processing after SLA-monitored tasks")
critical_task() >> dependent_task()
non_critical_task() >> dependent_task()
sla_example_dag()
Here,
- =Delay before retry i
- =Initial retry delay
- =Backoff factor (typically 2)
- =Retry attempt number (0-indexed)
Total Task Time with Retries
Here,
- =Maximum retry count
- =Execution time of attempt i
- =Delay after attempt i
- =Indicator: 1 if attempt i failed, 0 otherwise
SLA Violation Condition
Here,
- =Task completion timestamp
- =Scheduled execution date
- =SLA duration
Retry delays are added between attempts, not before the first attempt. With retries=3 and retry_delay=5min, a task that fails 3 times will take at minimum 15 minutes of delay time.
Use retry_exponential_backoff=True for transient errors (network, API). This prevents overwhelming external systems during outages. Set max_retry_delay to cap the maximum wait time.
Key Concepts Table
| Error Type | Recommended Strategy | Retry Count | Delay |
|---|---|---|---|
| Network Timeout | Exponential backoff | 3-5 | 30s-5min |
| API Rate Limit | Linear backoff | 5-10 | 1-5min |
| Database Deadlock | Exponential backoff | 3 | 1-5min |
| Resource Exhaustion | Delayed retry | 2-3 | 10-30min |
| Data Validation | No retry | 0 | N/A |
| Authentication Error | No retry | 0 | N/A |
| Disk Space | Alert only | 0 | N/A |
Code Examples
Advanced Error Handling Patterns
from airflow.decorators import task, dag
from airflow.exceptions import AirflowException, AirflowSkipException
from datetime import datetime, timedelta
import traceback
@dag(
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['error-handling', 'advanced'],
)
def advanced_error_handling_dag():
@task
def validate_input(data: dict):
"""Validate input data with specific error types."""
if not data:
raise AirflowSkipException("No data to process")
if 'required_field' not in data:
raise ValueError("Missing required_field")
if data.get('value', 0) < 0:
raise ValueError("Value must be non-negative")
return data
@task(
retries=3,
retry_delay=timedelta(minutes=1),
retry_on=(ConnectionError, TimeoutError), # Retry only specific exceptions
)
def process_with_selective_retry():
"""Retry only on specific exception types."""
import random
error_type = random.choice(['ConnectionError', 'ValueError', 'TimeoutError'])
if error_type == 'ConnectionError':
raise ConnectionError("Network issue")
elif error_type == 'TimeoutError':
raise TimeoutError("Request timed out")
else:
raise ValueError("Invalid data") # Won't be retried
@task
def handle_partial_failure():
"""Handle partial failures in batch processing."""
results = []
errors = []
for i in range(10):
try:
# Simulate processing
if i % 3 == 0:
raise ValueError(f"Error processing item {i}")
results.append({"id": i, "status": "success"})
except Exception as e:
errors.append({"id": i, "error": str(e)})
return {
"successful": len(results),
"failed": len(errors),
"errors": errors,
}
@task
def implement_circuit_breaker():
"""Implement circuit breaker pattern."""
from airflow.models import Variable
import time
# Get circuit breaker state
circuit_state = Variable.get("circuit_breaker_state", default_var="closed")
failure_count = int(Variable.get("circuit_failure_count", default_var="0"))
if circuit_state == "open":
# Circuit is open, skip task
raise AirflowSkipException("Circuit breaker is open")
try:
# Attempt operation
result = perform_external_call()
# Reset circuit breaker on success
if circuit_state == "half-open":
Variable.set("circuit_breaker_state", "closed")
Variable.set("circuit_failure_count", "0")
return result
except Exception as e:
failure_count += 1
Variable.set("circuit_failure_count", str(failure_count))
# Open circuit after 5 failures
if failure_count >= 5:
Variable.set("circuit_breaker_state", "open")
raise
def perform_external_call():
"""Simulate external call."""
import random
if random.random() < 0.5:
raise ConnectionError("External service unavailable")
return {"status": "success"}
validate_input({"required_field": "value", "value": 100})
process_with_selective_retry()
handle_partial_failure()
implement_circuit_breaker()
advanced_error_handling_dag()
Error Aggregation and Reporting
from airflow.decorators import task, dag
from datetime import datetime, timedelta
import json
@dag(
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['error-handling', 'reporting'],
)
def error_reporting_dag():
@task
def execute_batch(batch_id: int) -> dict:
"""Execute a batch with potential failures."""
import random
total_items = 100
successful = 0
failed = 0
errors = []
for i in range(total_items):
try:
if random.random() < 0.1:
raise ValueError(f"Item {i} validation failed")
successful += 1
except Exception as e:
failed += 1
errors.append({
"item": i,
"error": str(e),
"timestamp": datetime.now().isoformat(),
})
return {
"batch_id": batch_id,
"total": total_items,
"successful": successful,
"failed": failed,
"error_rate": failed / total_items,
"errors": errors[:10], # Keep only first 10 errors
}
@task
def aggregate_results(batch_results: list) -> dict:
"""Aggregate results from all batches."""
total = sum(r["total"] for r in batch_results)
successful = sum(r["successful"] for r in batch_results)
failed = sum(r["failed"] for r in batch_results)
all_errors = []
for r in batch_results:
all_errors.extend(r.get("errors", []))
return {
"total_batches": len(batch_results),
"total_items": total,
"total_successful": successful,
"total_failed": failed,
"overall_error_rate": failed / total if total > 0 else 0,
"error_summary": all_errors[:50], # Keep top 50 errors
}
@task
def send_error_report(aggregated: dict):
"""Send error report via email."""
from airflow.utils.email import send_email
if aggregated["total_failed"] == 0:
print("No errors to report")
return
html = f"""
<h2>Error Report</h2>
<p>Total batches: {aggregated['total_batches']}</p>
<p>Total items: {aggregated['total_items']}</p>
<p>Successful: {aggregated['total_successful']}</p>
<p>Failed: {aggregated['total_failed']}</p>
<p>Error rate: {aggregated['overall_error_rate']:.2%}</p>
"""
if aggregated["error_summary"]:
html += "<h3>Sample Errors</h3><ul>"
for error in aggregated["error_summary"][:10]:
html += f"<li>Item {error['item']}: {error['error']}</li>"
html += "</ul>"
send_email(
to=['team@example.com'],
subject=f"Batch Processing Error Report - {aggregated['total_failed']} failures",
html_content=html,
)
# Execute batches
batches = execute_batch.expand(batch_id=[1, 2, 3, 4, 5])
aggregated = aggregate_results(batches)
send_error_report(aggregated)
error_reporting_dag()
Performance Metrics
Retry Impact Analysis
| Scenario | Retries | Avg Delay | Total Overhead | Success Rate |
|---|---|---|---|---|
| No retries | 0 | 0 | 0 | 85% |
| Conservative | 2 | 5min | 10min | 95% |
| Moderate | 3 | 2min | 6min | 98% |
| Aggressive | 5 | 1min | 5min | 99% |
| Exponential | 3 | 1-5min | 7min | 99% |
Error Type Distribution
| Error Type | Frequency | Retry Success | Recommendation |
|---|---|---|---|
| Network Timeout | 40% | High | Retry with backoff |
| API Rate Limit | 25% | High | Linear backoff |
| Data Validation | 15% | None | Fix upstream |
| Resource Exhaustion | 10% | Medium | Delay retry |
| Authentication | 5% | None | Fix credentials |
| Bug/Logic Error | 5% | None | Fix code |
Key Takeaways:
- Use exponential backoff for transient errors; linear backoff for rate limits
- Set
execution_timeoutto prevent infinite loops - Implement
on_failure_callbackfor alerting and logging - Use
SlaMisscallbacks to monitor pipeline health - Circuit breaker pattern prevents cascading failures
- Aggregate errors for batch reporting and trend analysis
See Also
- Branching Logic β Conditional error recovery paths
- Monitoring and Alerting β System-wide error monitoring
- Testing DAGs β Testing error scenarios
- Security Best Practices β Secure error handling