Kafka Real-World Patterns
Overview
Real-world Kafka deployments require proven patterns for reliability, scalability, and maintainability. This guide covers event mesh, circuit breaker, outbox pattern, change data capture, and event replay.
Pattern Categories
- Reliability: Circuit breaker, outbox pattern
- Scalability: Event mesh, partitioning
- Data Sync: CDC, event replay
- Idempotency: Idempotency keys, deduplication
Event Mesh Pattern
Event Mesh Topology
class EventMesh:
def __init__(self, kafka_clusters):
self.clusters = kafka_clusters
self.topics = {}
self.routes = {}
def register_service(self, service_name, topics, cluster):
"""Register a service with its topics"""
self.topics[service_name] = {
'topics': topics,
'cluster': cluster
}
def route_event(self, event, source_service):
"""Route event to appropriate consumers"""
consumers = self.get_consumers_for_event(event['type'])
for consumer in consumers:
cluster = self.topics[consumer]['cluster']
topic = self.topics[consumer]['topics'][event['type']]
self.send_to_cluster(cluster, topic, event)
def get_consumers_for_event(self, event_type):
"""Find all consumers interested in this event"""
consumers = []
for service, config in self.topics.items():
if event_type in config['topics']:
consumers.append(service)
return consumers
Event Routing Configuration
# event_mesh_config.yml
services:
order-service:
cluster: dc1
publishes:
- OrderCreated
- OrderUpdated
- OrderCancelled
subscribes:
- PaymentProcessed
- InventoryReserved
payment-service:
cluster: dc1
publishes:
- PaymentProcessed
- PaymentFailed
subscribes:
- OrderCreated
notification-service:
cluster: dc2
publishes: []
subscribes:
- OrderCreated
- PaymentProcessed
- OrderShipped
routing_rules:
OrderCreated:
- payment-service
- notification-service
- analytics-service
PaymentProcessed:
- order-service
- notification-service
- inventory-service
Circuit Breaker Pattern
Circuit Breaker Implementation
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=30):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.state = CircuitState.CLOSED
self.last_failure_time = None
def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitBreakerOpenError("Circuit breaker is open")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
def _should_attempt_reset(self):
return (time.time() - self.last_failure_time) >= self.recovery_timeout
# Usage with Kafka producer
class KafkaProducerWithCircuitBreaker:
def __init__(self, bootstrap_servers):
self.producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
self.circuit_breaker = CircuitBreaker(failure_threshold=5)
def send(self, topic, message):
return self.circuit_breaker.call(
self.producer.send,
topic,
value=json.dumps(message).encode()
)
Circuit Breaker Configuration
# Circuit breaker for external service calls
circuit_breaker = CircuitBreaker(
failure_threshold=3, # Open after 3 failures
recovery_timeout=60, # Try recovery after 60 seconds
expected_exceptions=[ConnectionError, TimeoutError]
)
# Decorator usage
@circuit_breaker.decorator
def call_external_service(data):
response = requests.post('http://external-service/api', json=data)
response.raise_for_status()
return response.json()
Outbox Pattern
Transactional Outbox
from sqlalchemy import create_engine, Column, String, Text, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import json
Base = declarative_base()
class OutboxEvent(Base):
__tablename__ = 'outbox_events'
id = Column(String(36), primary_key=True)
aggregate_type = Column(String(255), nullable=False)
aggregate_id = Column(String(36), nullable=False)
event_type = Column(String(255), nullable=False)
payload = Column(Text, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
published = Column(String(1), default='N')
class OrderService:
def __init__(self, db_session, kafka_producer):
self.session = db_session
self.producer = kafka_producer
def create_order(self, order_data):
# 1. Create order in database
order = Order(**order_data)
self.session.add(order)
# 2. Create outbox event in same transaction
outbox_event = OutboxEvent(
id=str(uuid.uuid4()),
aggregate_type='Order',
aggregate_id=order.id,
event_type='OrderCreated',
payload=json.dumps({
'order_id': order.id,
'customer_id': order.customer_id,
'total': order.total
})
)
self.session.add(outbox_event)
# 3. Commit transaction
self.session.commit()
return order
class OutboxProcessor:
def __init__(self, db_session, kafka_producer):
self.session = db_session
self.producer = kafka_producer
def process_outbox(self):
# 1. Read unpublished events
events = self.session.query(OutboxEvent)\
.filter(OutboxEvent.published == 'N')\
.all()
for event in events:
# 2. Publish to Kafka
self.producer.send(
f'{event.aggregate_type.lower()}-events',
key=event.aggregate_id.encode(),
value=event.payload.encode()
)
# 3. Mark as published
event.published = 'Y'
self.session.commit()
Outbox Polling
class OutboxPoller:
def __init__(self, db_session, kafka_producer, poll_interval=1):
self.session = db_session
self.producer = kafka_producer
self.poll_interval = poll_interval
def start_polling(self):
while True:
try:
self.process_pending_events()
except Exception as e:
print(f"Error processing outbox: {e}")
time.sleep(self.poll_interval)
def process_pending_events(self):
with self.session.begin():
events = self.session.query(OutboxEvent)\
.filter(OutboxEvent.published == 'N')\
.limit(100)\
.all()
for event in events:
self.publish_event(event)
event.published = 'Y'
def publish_event(self, event):
self.producer.send(
f'{event.aggregate_type.lower()}-events',
key=event.aggregate_id.encode(),
value=event.payload.encode()
)
Change Data Capture (CDC)
Debezium Setup
{
"name": "postgres-cdc-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "${file:/opt/kafka/secrets/db.properties:password}",
"database.dbname": "mydb",
"database.server.name": "dbserver1",
"schema.include.list": "public",
"table.include.list": "public.orders,public.users,public.payments",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "dbz_publication",
"topic.prefix": "dbserver1",
"tombstones.on.delete": "true",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}
CDC Event Processing
class CDCEventProcessor:
def __init__(self, kafka_consumer):
self.consumer = kafka_consumer
def process_cdc_events(self):
for message in self.consumer:
cdc_event = json.loads(message.value)
if cdc_event['op'] == 'c': # Create
self.handle_create(cdc_event)
elif cdc_event['op'] == 'u': # Update
self.handle_update(cdc_event)
elif cdc_event['op'] == 'd': # Delete
self.handle_delete(cdc_event)
def handle_create(self, event):
# Process new record
print(f"New {event['source']['table']}: {event['after']}")
# Publish domain event
domain_event = {
'event_type': f"{event['source']['table']}_created",
'data': event['after'],
'timestamp': event['ts_ms']
}
self.producer.send(
f"{event['source']['table']}-events",
value=json.dumps(domain_event).encode()
)
def handle_update(self, event):
# Process updated record
print(f"Updated {event['source']['table']}: {event['after']}")
def handle_delete(self, event):
# Process deleted record
print(f"Deleted {event['source']['table']}: {event['key']}")
Event Replay
Event Replay Service
class EventReplayService:
def __init__(self, kafka_consumer, event_handler):
self.consumer = kafka_consumer
self.handler = event_handler
def replay_events(self, topic, from_timestamp, to_timestamp):
"""Replay events from a specific time range"""
# Seek to beginning of topic
partitions = self.consumer.partitions_for_topic(topic)
for partition in partitions:
tp = TopicPartition(topic, partition)
self.consumer.seek_to_beginning(tp)
# Consume and process events
replayed_count = 0
for message in self.consumer:
event = json.loads(message.value)
# Filter by timestamp
if from_timestamp <= event['timestamp'] <= to_timestamp:
self.handler.process(event)
replayed_count += 1
return replayed_count
def replay_events_for_aggregate(self, topic, aggregate_id):
"""Replay events for a specific aggregate"""
self.consumer.subscribe([topic])
replayed_count = 0
for message in self.consumer:
event = json.loads(message.value)
if event.get('aggregate_id') == aggregate_id:
self.handler.process(event)
replayed_count += 1
return replayed_count
# Usage
replay_service = EventReplayService(consumer, order_handler)
# Replay last 24 hours
from datetime import datetime, timedelta
from_time = datetime.utcnow() - timedelta(hours=24)
to_time = datetime.utcnow()
replayed = replay_service.replay_events(
'order-events',
from_timestamp=int(from_time.timestamp() * 1000),
to_timestamp=int(to_time.timestamp() * 1000)
)
Idempotency Keys
Idempotency Implementation
import hashlib
from functools import wraps
def idempotent(operation_name):
def decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
# Generate idempotency key
key_data = f"{operation_name}:{args}:{kwargs}"
idempotency_key = hashlib.sha256(key_data.encode()).hexdigest()
# Check if already processed
if self.is_processed(idempotency_key):
return self.get_cached_result(idempotency_key)
# Process operation
result = func(self, *args, **kwargs)
# Cache result
self.cache_result(idempotency_key, result)
self.mark_processed(idempotency_key)
return result
return wrapper
return decorator
class OrderService:
def __init__(self, redis_client):
self.redis = redis_client
@idempotent("create_order")
def create_order(self, order_data):
# Process order
order = self.process_order(order_data)
return order
def is_processed(self, key):
return self.redis.exists(f"processed:{key}")
def mark_processed(self, key):
self.redis.setex(f"processed:{key}", 86400, '1')
def cache_result(self, key, result):
self.redis.setex(f"result:{key}", 86400, json.dumps(result))
def get_cached_result(self, key):
return json.loads(self.redis.get(f"result:{key}"))
Idempotency in Kafka
class IdempotentKafkaConsumer:
def __init__(self, consumer, redis_client):
self.consumer = consumer
self.redis = redis_client
def consume_idempotently(self, message):
# Generate idempotency key from message
idempotency_key = self.generate_key(message)
# Check if already processed
if self.is_processed(idempotency_key):
return {'status': 'skipped'}
# Process message
result = self.process_message(message)
# Mark as processed
self.mark_processed(idempotency_key)
return result
def generate_key(self, message):
# Use topic, partition, offset as idempotency key
key_data = f"{message.topic}:{message.partition}:{message.offset}"
return hashlib.sha256(key_data.encode()).hexdigest()
def is_processed(self, key):
return self.redis.exists(f"kafka:processed:{key}")
def mark_processed(self, key):
self.redis.setex(f"kafka:processed:{key}", 86400, '1')
Best Practices
Pattern Selection
| Pattern | Use Case | Complexity |
|---|---|---|
| Event Mesh | Multi-service communication | High |
| Circuit Breaker | External service calls | Medium |
| Outbox | Reliable event publishing | Medium |
| CDC | Database synchronization | High |
| Event Replay | Audit and debugging | Medium |
Implementation Checklist
- Event mesh routing configured
- Circuit breaker for external calls
- Outbox pattern for reliability
- CDC for database sync
- Event replay capability
- Idempotency keys implemented
- Dead letter queues configured
- Monitoring and alerting
Summary
Real-world Kafka patterns including event mesh, circuit breaker, outbox pattern, CDC, and event replay provide reliable, scalable, and maintainable event-driven architectures. Choose patterns based on your specific requirements for reliability, scalability, and data consistency.