🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Python Message Queues — Async Task Processing

Python AdvancedMessage Queues🟢 Free Lesson

Advertisement

Python Message Queues — Async Task Processing

Message queues decouple task submission from execution. Instead of processing tasks immediately, you add them to a queue and workers process them in the background. This tutorial covers Celery, Redis queues, task monitoring, retry mechanisms, and real-world patterns.

Learning Objectives

  • Understand task queue concepts and trade-offs
  • Use Celery with Redis as broker
  • Implement background tasks with retries
  • Monitor task status and failures
  • Handle scheduled and periodic tasks
  • Build a complete background job processing system

Why Message Queues?

Architecture Diagram
Without Queue (Synchronous):
User clicks "Export" -> Server processes for 30 seconds -> Response
( user waits, connection might timeout )

With Queue (Asynchronous):
User clicks "Export" -> Task added to queue -> Response: "Processing..."
Worker picks up task -> Processes in background -> Sends email when done
( user gets instant response, task runs reliably )

When to Use Message Queues

Use CaseWhy Queue Helps
Email sendingDon't block response for slow SMTP
Data processingHandle large datasets without timeouts
Image/video processingCPU-intensive, run in background
Report generationComplex queries, non-interactive
Scheduled jobsCron-like tasks (cleanup, reports)
WebhooksReliable delivery with retries

Celery Basics

Setup

# tasks.py
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

@app.task
def process_file(file_path):
    # Heavy processing...
    return {"status": "done", "file": file_path}

Running Workers

# Start a worker
celery -A tasks worker --loglevel=info

# Start with concurrency
celery -A tasks worker --loglevel=info --concurrency=4

# Start with pool
celery -A tasks worker --loglevel=info --pool=solo

Triggering Tasks

# From your application
from tasks import add, process_file

# Async (returns immediately)
result = add.delay(4, 4)
print(result.id)  # Task ID
print(result.get(timeout=10))  # Wait for result: 8

# With options
process_file.apply_async(
    args=['/path/to/file.csv'],
    countdown=10,  # Delay 10 seconds
    expires=3600   # Expire after 1 hour
)

# Send to specific queue
process_file.apply_async(
    args=['/path/to/file.csv'],
    queue='heavy_tasks'
)

Task Configuration

# celeryconfig.py
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']

timezone = 'UTC'
enable_utc = True

task_routes = {
    'tasks.process_file': {'queue': 'heavy_tasks'},
    'tasks.send_email': {'queue': 'emails'},
}

task_default_queue = 'default'

# Retry settings
task_acks_late = True
task_reject_on_worker_lost = True

# Rate limiting
task_default_rate_limit = '100/h'

# Time limits
task_soft_time_limit = 300  # 5 minutes
task_time_limit = 600       # 10 minutes
# In tasks.py
from celery import Celery

app = Celery('tasks')
app.config_from_object('celeryconfig')

Task Retries and Error Handling

from celery import Celery
from celery.exceptions import MaxRetriesExceededError
import requests
import time

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def fetch_url(self, url):
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        return response.json()
    except requests.RequestException as exc:
        self.retry(exc=exc, countdown=60)

@app.task(bind=True)
def process_with_rollback(self, data):
    try:
        # Process data
        result = heavy_computation(data)
        # Store result
        save_result(result)
        return result
    except Exception as exc:
        # Rollback any partial changes
        rollback_changes(data)
        # Re-raise or retry
        raise self.retry(exc=exc, countdown=300)

@app.task(bind=True, max_retries=5)
def send_email(self, to, subject, body):
    try:
        smtp_send(to, subject, body)
    except smtplib.SMTPException as exc:
        # Exponential backoff: 60, 120, 240, 480, 960 seconds
        wait_time = 60 * (2 ** self.request.retries)
        raise self.retry(exc=exc, countdown=wait_time)

Simple Redis Queue (No Celery)

import redis
import json
import time
from typing import Optional

r = redis.Redis()

def enqueue(queue_name: str, task_data: dict):
    """Add a task to the queue."""
    r.rpush(queue_name, json.dumps(task_data))

def dequeue(queue_name: str, timeout: int = 5) -> Optional[dict]:
    """Get next task from queue (blocking)."""
    result = r.blpop(queue_name, timeout=timeout)
    if result:
        _, data = result
        return json.loads(data)
    return None

def queue_length(queue_name: str) -> int:
    """Get number of tasks in queue."""
    return r.llen(queue_name)

# Producer (your web app)
enqueue("email_tasks", {
    "type": "send_welcome",
    "to": "user@example.com",
    "name": "Alice"
})

# Consumer (worker process)
while True:
    task = dequeue("email_tasks")
    if task:
        if task["type"] == "send_welcome":
            send_welcome_email(task["to"], task["name"])

Scheduled and Periodic Tasks

Using Celery Beat

# celeryconfig.py
from celery.schedules import crontab

beat_schedule = {
    'cleanup-every-hour': {
        'task': 'tasks.cleanup_temp_files',
        'schedule': crontab(minute=0),  # Every hour
    },
    'daily-report': {
        'task': 'tasks.generate_daily_report',
        'schedule': crontab(hour=6, minute=0),  # 6 AM daily
    },
    'weekly-cleanup': {
        'task': 'tasks.cleanup_old_logs',
        'schedule': crontab(hour=2, minute=0, day_of_week=0),  # Sunday 2 AM
    },
}
# tasks.py
@app.task
def cleanup_temp_files():
    import os
    import glob
    from datetime import datetime, timedelta

    cutoff = datetime.now() - timedelta(hours=24)
    for file in glob.glob("/tmp/*"):
        if os.path.getmtime(file) < cutoff.timestamp():
            os.remove(file)
    return "Cleaned up temp files"

@app.task
def generate_daily_report():
    # Generate report
    report = create_report()
    # Send via email
    send_report_email(report)
    return "Report generated and sent"

Task Monitoring

# Check task status
result = add.delay(4, 4)
print(result.status)  # PENDING, STARTED, SUCCESS, FAILURE, REVOKED

# Check if ready
print(result.ready())  # True/False

# Get result (blocks until done)
print(result.get(timeout=10))

# Handle failure
try:
    result.get(timeout=10)
except Exception as e:
    print(f"Task failed: {e}")

# Revoke a task
result.revoke()  # Stop the task
result.revoke(terminate=True)  # Kill the task

# Check if revoked
print(result revoked)

Monitoring with Flower

# Install flower
pip install flower

# Start flower
celery -A tasks flower

# Access at http://localhost:5555

Complete Background Job System

import redis
import json
import time
import uuid
from typing import Any, Optional
from datetime import datetime
from enum import Enum

class TaskStatus(str, Enum):
    PENDING = "pending"
    RUNNING = "running"
    SUCCESS = "success"
    FAILED = "failed"
    RETRY = "retry"

class TaskManager:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.queue_name = "tasks"

    def enqueue(self, task_type: str, data: dict, priority: int = 0) -> str:
        """Add task to queue."""
        task_id = str(uuid.uuid4())
        task = {
            "id": task_id,
            "type": task_type,
            "data": data,
            "status": TaskStatus.PENDING,
            "created_at": datetime.now().isoformat(),
            "retries": 0,
            "max_retries": 3,
        }
        self.redis.hset(f"task:{task_id}", mapping=task)
        self.redis.lpush(self.queue_name, task_id)
        return task_id

    def get_task(self, task_id: str) -> Optional[dict]:
        """Get task status."""
        return self.redis.hgetall(f"task:{task_id}")

    def update_status(self, task_id: str, status: TaskStatus, result: Any = None):
        """Update task status."""
        updates = {"status": status}
        if result:
            updates["result"] = json.dumps(result)
        self.redis.hset(f"task:{task_id}", mapping=updates)

    def dequeue(self, timeout: int = 5) -> Optional[str]:
        """Get next task from queue."""
        result = self.redis.blpop(self.queue_name, timeout=timeout)
        if result:
            _, task_id = result
            return task_id.decode() if isinstance(task_id, bytes) else task_id
        return None

    def get_queue_length(self) -> int:
        return self.redis.llen(self.queue_name)

manager = TaskManager(redis.Redis())

# Enqueue tasks
task_id = manager.enqueue("send_email", {
    "to": "user@example.com",
    "subject": "Welcome!",
    "body": "Hello!"
})

# Worker processing
def worker():
    while True:
        task_id = manager.dequeue()
        if task_id:
            task = manager.get_task(task_id)
            manager.update_status(task_id, TaskStatus.RUNNING)
            try:
                result = process_task(task["type"], json.loads(task["data"]))
                manager.update_status(task_id, TaskStatus.SUCCESS, result)
            except Exception as e:
                manager.update_status(task_id, TaskStatus.FAILED, {"error": str(e)})

Common Mistakes

MistakeProblemSolution
Not handling retriesPermanent failures crash workersAdd retry logic with backoff
No task timeoutWorkers hang foreverSet soft/hard time limits
Storing results foreverMemory/disk exhaustionSet result expiration
No monitoringUnknown failuresUse Flower or custom monitoring
Blocking in tasksWorker hangsUse async or separate processes
No dead letter queueLost failed tasksImplement DLQ for retries
Large payloadsSlow serializationCompress or use references

Best Practices

  1. Use Celery for production task queues
  2. Redis or RabbitMQ as message broker
  3. Always handle task failures with retries
  4. Set task expiration for stale tasks
  5. Monitor queue length and processing time
  6. Use apply_async for delayed execution
  7. Implement dead letter queues for failed tasks
  8. Consider rate limiting for external API calls
  9. Use task routing to separate workloads
  10. Set time limits to prevent hanging workers

Key Takeaways

  1. Use Celery for production task queues
  2. Redis or RabbitMQ as message broker
  3. Always handle task failures with retries
  4. Set task expiration for stale tasks
  5. Monitor queue length and processing time
  6. Use apply_async for delayed execution
  7. Implement dead letter queues for failed tasks
  8. Consider rate limiting for external API calls
  9. Use task routing to separate workloads
  10. Set time limits to prevent hanging workers

Premium Content

Python Message Queues — Async Task Processing

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
💼Interview Prep
📜Certificates
🤝Community Access

Already a member? Log in

Need Expert Python Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement