Python Message Queues — Async Task Processing

Python AdvancedMessage QueuesFree Lesson

Advertisement

Python Message Queues — Async Task Processing

Message queues decouple task submission from execution. Instead of processing tasks immediately, you add them to a queue and workers process them in the background.

Learning Objectives

  • Understand task queue concepts
  • Use Celery with Redis as broker
  • Implement background tasks
  • Handle task failures and retries

Why Message Queues?

Without Queue (Synchronous):
User clicks "Export" → Server processes for 30 seconds → Response
( user waits, connection might timeout )

With Queue (Asynchronous):
User clicks "Export" → Task added to queue → Response: "Processing..."
Worker picks up task → Processes in background → Sends email when done
( user gets instant response, task runs reliably )

Celery Basics

# tasks.py
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

@app.task(bind=True, max_retries=3)
def fetch_url(self, url):
    try:
        import requests
        response = requests.get(url, timeout=10)
        return response.status_code
    except Exception as exc:
        self.retry(exc=exc, countdown=60)  # Retry after 60 seconds

Triggering Tasks

# From your application
from tasks import add, fetch_url

# Async (returns immediately)
result = add.delay(4, 4)
print(result.id)  # Task ID
print(result.get(timeout=10))  # Wait for result: 8

# With options
fetch_url.apply_async(
    args=['https://example.com'],
    countdown=10,  # Delay 10 seconds
    expires=3600   # Expire after 1 hour
)

Simple Redis Queue (No Celery)

import redis
import json
import time

r = redis.Redis()

def enqueue(queue_name: str, task_data: dict):
    """Add a task to the queue."""
    r.rpush(queue_name, json.dumps(task_data))

def dequeue(queue_name: str, timeout: int = 5):
    """Get next task from queue (blocking)."""
    _, data = r.blpop(queue_name, timeout=timeout)
    return json.loads(data) if data else None

# Producer (your web app)
enqueue("email_tasks", {
    "type": "send_welcome",
    "to": "user@example.com",
    "name": "Alice"
})

# Consumer (worker process)
while True:
    task = dequeue("email_tasks")
    if task:
        if task["type"] == "send_welcome":
            send_welcome_email(task["to"], task["name"])

Task Monitoring

# Check task status
result = add.delay(4, 4)
print(result.status)  # PENDING, STARTED, SUCCESS, FAILURE

# Check if ready
print(result.ready())  # True/False

# Get result (blocks until done)
print(result.get(timeout=10))

# Handle failure
try:
    result.get(timeout=10)
except Exception as e:
    print(f"Task failed: {e}")

Key Takeaways

  1. Use Celery for production task queues
  2. Redis or RabbitMQ as message broker
  3. Always handle task failures with retries
  4. Set task expiration for stale tasks
  5. Monitor queue length and processing time
  6. Use apply_async for delayed execution
  7. Implement dead letter queues for failed tasks
  8. Consider rate limiting for external API calls

Advertisement

Need Expert Python Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement