Message Queues - RabbitMQ, Kafka, Producer-Consumer

Message QueuesMessage Queue SystemsFree Lesson

Advertisement

Introduction

Message queues enable asynchronous communication between services. This tutorial covers RabbitMQ and Kafka, the most popular message queue systems, with Python examples.

RabbitMQ Implementation

# producer.py
import pika
import json
import time

def connect():
    credentials = pika.PlainCredentials('guest', 'guest')
    parameters = pika.ConnectionParameters(
        'localhost', 
        5672,
        '/',
        credentials
    )
    return pika.BlockingConnection(parameters)

def publish_message(queue: str, message: dict):
    connection = connect()
    channel = connection.channel()
    
    channel.queue_declare(queue=queue, durable=True)
    
    channel.basic_publish(
        exchange='',
        routing_key=queue,
        body=json.dumps(message),
        properties=pika.BasicProperties(
            delivery_mode=2,  # persistent
            content_type='application/json'
        )
    )
    connection.close()

def publish_task(task_type: str, payload: dict):
    message = {
        'type': task_type,
        'payload': payload,
        'timestamp': time.time()
    }
    publish_message('tasks', message)

# consumer.py
import pika
import json

def callback(ch, method, properties, body):
    message = json.loads(body)
    print(f"Received: {message}")
    
    # Process message
    process_message(message)
    
    # Acknowledge
    ch.basic_ack(delivery_tag=method.delivery_tag)

def consume_messages(queue: str):
    connection = connect()
    channel = connection.channel()
    
    channel.queue_declare(queue=queue, durable=True)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(queue=queue, on_message_callback=callback)
    
    print('Waiting for messages...')
    channel.start_consuming()

def process_message(message: dict):
    task_type = message.get('type')
    payload = message.get('payload')
    
    if task_type == 'email':
        send_email(payload)
    elif task_type == 'notification':
        send_notification(payload)

Kafka Implementation

# kafka_producer.py
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8') if k else None
)

def send_message(topic: str, key: str, message: dict):
    producer.send(topic, key=key, value=message)
    producer.flush()

def send_order(order_data: dict):
    send_message('orders', str(order_data['id']), order_data)

# kafka_consumer.py
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['localhost:9092'],
    group_id='order-processor',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest',
    enable_auto_commit=True
)

def process_order(order: dict):
    print(f"Processing order: {order['id']}")

for message in consumer:
    process_order(message.value)

Advanced Patterns

# pub_sub.py - RabbitMQ Pub/Sub
def setup_fanout_exchange(exchange_name: str):
    connection = connect()
    channel = connection.channel()
    
    channel.exchange_declare(
        exchange=exchange_name,
        exchange_type='fanout'
    )
    
    return channel

def publish_to_exchange(exchange: str, message: dict):
    channel = setup_fanout_exchange(exchange)
    channel.basic_publish(
        exchange=exchange,
        routing_key='',
        body=json.dumps(message)
    )

# consumer_groups.py - Kafka consumer groups
consumer = KafkaConsumer(
    'orders',
    group_id='order-processors',
    bootstrap_servers=['localhost:9092']
)

Practice Problems

  1. Implement a dead letter queue for failed messages
  2. Create a message schema with versioning support
  3. Implement message ordering within partitions
  4. Add retry logic with exponential backoff
  5. Build a message batching system for high throughput

Advertisement

Need Expert Python Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement