Python Message Queues — Async Task Processing
Message queues decouple task submission from execution. Instead of processing tasks immediately, you add them to a queue and workers process them in the background.
Learning Objectives
- Understand task queue concepts
- Use Celery with Redis as broker
- Implement background tasks
- Handle task failures and retries
Why Message Queues?
Without Queue (Synchronous):
User clicks "Export" → Server processes for 30 seconds → Response
( user waits, connection might timeout )
With Queue (Asynchronous):
User clicks "Export" → Task added to queue → Response: "Processing..."
Worker picks up task → Processes in background → Sends email when done
( user gets instant response, task runs reliably )
Celery Basics
# tasks.py
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
@app.task(bind=True, max_retries=3)
def fetch_url(self, url):
try:
import requests
response = requests.get(url, timeout=10)
return response.status_code
except Exception as exc:
self.retry(exc=exc, countdown=60) # Retry after 60 seconds
Triggering Tasks
# From your application
from tasks import add, fetch_url
# Async (returns immediately)
result = add.delay(4, 4)
print(result.id) # Task ID
print(result.get(timeout=10)) # Wait for result: 8
# With options
fetch_url.apply_async(
args=['https://example.com'],
countdown=10, # Delay 10 seconds
expires=3600 # Expire after 1 hour
)
Simple Redis Queue (No Celery)
import redis
import json
import time
r = redis.Redis()
def enqueue(queue_name: str, task_data: dict):
"""Add a task to the queue."""
r.rpush(queue_name, json.dumps(task_data))
def dequeue(queue_name: str, timeout: int = 5):
"""Get next task from queue (blocking)."""
_, data = r.blpop(queue_name, timeout=timeout)
return json.loads(data) if data else None
# Producer (your web app)
enqueue("email_tasks", {
"type": "send_welcome",
"to": "user@example.com",
"name": "Alice"
})
# Consumer (worker process)
while True:
task = dequeue("email_tasks")
if task:
if task["type"] == "send_welcome":
send_welcome_email(task["to"], task["name"])
Task Monitoring
# Check task status
result = add.delay(4, 4)
print(result.status) # PENDING, STARTED, SUCCESS, FAILURE
# Check if ready
print(result.ready()) # True/False
# Get result (blocks until done)
print(result.get(timeout=10))
# Handle failure
try:
result.get(timeout=10)
except Exception as e:
print(f"Task failed: {e}")
Key Takeaways
- Use Celery for production task queues
- Redis or RabbitMQ as message broker
- Always handle task failures with retries
- Set task expiration for stale tasks
- Monitor queue length and processing time
- Use
apply_asyncfor delayed execution - Implement dead letter queues for failed tasks
- Consider rate limiting for external API calls