Kafka with Microservices
Overview
Kafka enables event-driven microservices architecture with patterns like event sourcing, CQRS, and saga orchestration. This guide covers implementing these patterns for scalable, resilient distributed systems.
Benefits of Event-Driven Architecture
- Decoupling: Services communicate via events
- Scalability: Independent scaling of producers/consumers
- Resilience: Fault isolation between services
- Auditability: Complete event history
- Flexibility: Easy addition of new consumers
Event Sourcing
Event Store Implementation
from kafka import KafkaProducer
from datetime import datetime
import json
import uuid
class EventStore:
def __init__(self, bootstrap_servers):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8')
)
def append_event(self, aggregate_id, event_type, data, metadata=None):
event = {
'event_id': str(uuid.uuid4()),
'aggregate_id': aggregate_id,
'event_type': event_type,
'data': data,
'timestamp': datetime.utcnow().isoformat(),
'metadata': metadata or {}
}
# Use aggregate_id as key for ordering
self.producer.send(
f'{event_type.lower()}-events',
key=aggregate_id,
value=event
)
return event['event_id']
def get_events(self, aggregate_id, from_version=0):
# Query event store for aggregate events
pass
# Usage
event_store = EventStore('kafka:9092')
# Create order
order_id = str(uuid.uuid4())
event_store.append_event(
aggregate_id=order_id,
event_type='OrderCreated',
data={
'customer_id': 'cust-123',
'items': [{'product_id': 'prod-456', 'quantity': 2}],
'total': 99.99
}
)
# Add payment
event_store.append_event(
aggregate_id=order_id,
event_type='PaymentProcessed',
data={
'payment_id': 'pay-789',
'amount': 99.99,
'method': 'credit_card'
}
)
Event Replay
class EventReplayer:
def __init__(self, event_store):
self.event_store = event_store
def rebuild_state(self, aggregate_id):
events = self.event_store.get_events(aggregate_id)
state = {}
for event in events:
state = self.apply_event(state, event)
return state
def apply_event(self, state, event):
if event['event_type'] == 'OrderCreated':
return {
'id': event['aggregate_id'],
'status': 'created',
'customer_id': event['data']['customer_id'],
'items': event['data']['items'],
'total': event['data']['total'],
'version': 1
}
elif event['event_type'] == 'PaymentProcessed':
state['status'] = 'paid'
state['payment_id'] = event['data']['payment_id']
state['version'] += 1
return state
elif event['event_type'] == 'OrderShipped':
state['status'] = 'shipped'
state['tracking_number'] = event['data']['tracking_number']
state['version'] += 1
return state
return state
CQRS (Command Query Responsibility Segregation)
Command Handler
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List
@dataclass
class CreateOrderCommand:
customer_id: str
items: List[dict]
total: float
@dataclass
class ProcessPaymentCommand:
order_id: str
amount: float
payment_method: str
class CommandHandler(ABC):
@abstractmethod
def handle(self, command):
pass
class OrderCommandHandler(CommandHandler):
def __init__(self, event_store, validation_service):
self.event_store = event_store
self.validator = validation_service
def handle(self, command):
if isinstance(command, CreateOrderCommand):
return self.handle_create_order(command)
elif isinstance(command, ProcessPaymentCommand):
return self.handle_process_payment(command)
def handle_create_order(self, command):
# Validate command
if not self.validator.validate_order(command):
raise ValueError("Invalid order")
# Generate order ID
order_id = str(uuid.uuid4())
# Create order event
self.event_store.append_event(
aggregate_id=order_id,
event_type='OrderCreated',
data={
'customer_id': command.customer_id,
'items': command.items,
'total': command.total
}
)
return {'order_id': order_id, 'status': 'created'}
Query Handler
class OrderQueryService:
def __init__(self, read_database):
self.db = read_database
def get_order(self, order_id):
return self.db.query(f"SELECT * FROM orders WHERE id = '{order_id}'")
def get_customer_orders(self, customer_id):
return self.db.query(f"SELECT * FROM orders WHERE customer_id = '{customer_id}'")
def get_order_summary(self):
return self.db.query("""
SELECT status, COUNT(*) as count, SUM(total) as revenue
FROM orders
GROUP BY status
""")
Projection Builder
class OrderProjectionBuilder:
def __init__(self, read_database):
self.db = read_database
def handle_event(self, event):
if event['event_type'] == 'OrderCreated':
self.db.insert('orders', {
'id': event['aggregate_id'],
'customer_id': event['data']['customer_id'],
'status': 'created',
'total': event['data']['total'],
'created_at': event['timestamp']
})
elif event['event_type'] == 'PaymentProcessed':
self.db.update('orders',
{'id': event['aggregate_id']},
{'status': 'paid', 'payment_id': event['data']['payment_id']}
)
elif event['event_type'] == 'OrderShipped':
self.db.update('orders',
{'id': event['aggregate_id']},
{'status': 'shipped', 'tracking_number': event['data']['tracking_number']}
)
Saga Pattern
Saga Orchestrator
class OrderSaga:
def __init__(self, event_store):
self.event_store = event_store
self.state = {}
def start_saga(self, order_id, order_data):
self.state[order_id] = {
'step': 'reserve_inventory',
'data': order_data
}
# Step 1: Reserve inventory
self.event_store.append_event(
aggregate_id=order_id,
event_type='InventoryReservationRequested',
data=order_data['items']
)
def handle_event(self, event):
order_id = event['aggregate_id']
if event['event_type'] == 'InventoryReserved':
# Step 2: Process payment
self.state[order_id]['step'] = 'process_payment'
self.event_store.append_event(
aggregate_id=order_id,
event_type='PaymentRequested',
data={'amount': self.state[order_id]['data']['total']}
)
elif event['event_type'] == 'PaymentProcessed':
# Step 3: Complete order
self.state[order_id]['step'] = 'complete_order'
self.event_store.append_event(
aggregate_id=order_id,
event_type='OrderCompleted',
data={'status': 'completed'}
)
elif event['event_type'] == 'PaymentFailed':
# Compensate: Release inventory
self.state[order_id]['step'] = 'compensate'
self.event_store.append_event(
aggregate_id=order_id,
event_type='InventoryReleaseRequested',
data=self.state[order_id]['data']['items']
)
Saga Choreography
class OrderSagaChoreography:
def __init__(self):
self.handlers = {
'OrderCreated': self.handle_order_created,
'InventoryReserved': self.handle_inventory_reserved,
'PaymentProcessed': self.handle_payment_processed,
'PaymentFailed': self.handle_payment_failed
}
def handle_event(self, event):
handler = self.handlers.get(event['event_type'])
if handler:
return handler(event)
def handle_order_created(self, event):
# Trigger inventory reservation
return {
'event_type': 'InventoryReservationRequested',
'data': event['data']['items']
}
def handle_inventory_reserved(self, event):
# Trigger payment processing
return {
'event_type': 'PaymentRequested',
'data': {'amount': event['data']['total']}
}
def handle_payment_processed(self, event):
# Complete order
return {
'event_type': 'OrderCompleted',
'data': {'status': 'completed'}
}
def handle_payment_failed(self, event):
# Compensate by releasing inventory
return {
'event_type': 'InventoryReleaseRequested',
'data': event['data']['items']
}
Idempotent Consumers
Idempotency Key
import hashlib
from functools import wraps
def idempotent_consumer(func):
@wraps(func)
def wrapper(self, message):
# Generate idempotency key
idempotency_key = self.generate_idempotency_key(message)
# Check if already processed
if self.is_processed(idempotency_key):
return {'status': 'skipped', 'reason': 'already_processed'}
# Process message
result = func(self, message)
# Mark as processed
self.mark_processed(idempotency_key)
return result
return wrapper
class OrderProcessor:
def __init__(self, kafka_producer, redis_client):
self.producer = kafka_producer
self.redis = redis_client
def generate_idempotency_key(self, message):
# Use order_id + timestamp as idempotency key
data = f"{message['order_id']}:{message['timestamp']}"
return hashlib.sha256(data.encode()).hexdigest()
def is_processed(self, key):
return self.redis.exists(f"processed:{key}")
def mark_processed(self, key):
self.redis.setex(f"processed:{key}", 86400, '1') # 24 hour TTL
@idempotent_consumer
def process_order(self, message):
# Process order
order = message['data']
# Validate order
if not self.validate_order(order):
raise ValueError("Invalid order")
# Process payment
payment_result = self.process_payment(order)
# Publish payment event
self.producer.send('payment-events', value={
'order_id': order['id'],
'payment_id': payment_result['id'],
'status': 'completed'
})
return {'status': 'processed', 'order_id': order['id']}
Dead Letter Queue
DLQ Handler
class DeadLetterHandler:
def __init__(self, kafka_producer, alert_service):
self.producer = kafka_producer
self.alert_service = alert_service
def handle_dead_letter(self, original_message, error):
# Create dead letter message
dead_letter = {
'original_topic': original_message.topic,
'original_partition': original_message.partition,
'original_offset': original_message.offset,
'original_key': original_message.key,
'original_value': original_message.value,
'error': str(error),
'timestamp': datetime.utcnow().isoformat(),
'retry_count': original_message.headers.get('retry_count', 0)
}
# Send to DLQ
self.producer.send('dead-letter-queue', value=dead_letter)
# Alert if too many retries
if dead_letter['retry_count'] >= 3:
self.alert_service.send_alert(
severity='high',
message=f"Message failed after {dead_letter['retry_count']} retries",
details=dead_letter
)
return dead_letter
Best Practices
Service Design
- Single Responsibility: Each service handles one bounded context
- Event Immutability: Events are facts, never modified
- Idempotent Consumers: Handle duplicate messages gracefully
- Saga Pattern: Use for distributed transactions
- Dead Letter Queues: Handle failed messages
Common Patterns
- Event Sourcing: Store all state changes as events
- CQRS: Separate read/write models
- Saga: Coordinate distributed transactions
- Outbox Pattern: Ensure reliable event publishing
- Change Data Capture: Sync databases via events
Summary
Kafka enables powerful event-driven microservices patterns including event sourcing, CQRS, and saga orchestration. Implement idempotent consumers, dead letter queues, and proper error handling for resilient distributed systems.