πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Kafka Real-World Patterns

🟒 Free Lesson

Advertisement

Kafka Real-World Patterns

ProducersOrder ServicePayment ServiceUser ServiceInventory ServiceKafka Meshorder-eventspayment-eventsuser-eventsinventory-eventsConsumersAnalytics ServiceNotification ServiceSearch IndexerData WarehouseProjectionsRead ModelsAggregatesSearch IndexMaterialized ViewsQueryAPIsRESTGraphQLgRPCEvent Mesh Pattern

Overview

Real-world Kafka deployments require proven patterns for reliability, scalability, and maintainability. This guide covers event mesh, circuit breaker, outbox pattern, change data capture, and event replay.

Pattern Categories

  • Reliability: Circuit breaker, outbox pattern
  • Scalability: Event mesh, partitioning
  • Data Sync: CDC, event replay
  • Idempotency: Idempotency keys, deduplication

Event Mesh Pattern

Event Mesh Topology

class EventMesh:
    def __init__(self, kafka_clusters):
        self.clusters = kafka_clusters
        self.topics = {}
        self.routes = {}
    
    def register_service(self, service_name, topics, cluster):
        """Register a service with its topics"""
        self.topics[service_name] = {
            'topics': topics,
            'cluster': cluster
        }
    
    def route_event(self, event, source_service):
        """Route event to appropriate consumers"""
        consumers = self.get_consumers_for_event(event['type'])
        
        for consumer in consumers:
            cluster = self.topics[consumer]['cluster']
            topic = self.topics[consumer]['topics'][event['type']]
            
            self.send_to_cluster(cluster, topic, event)
    
    def get_consumers_for_event(self, event_type):
        """Find all consumers interested in this event"""
        consumers = []
        for service, config in self.topics.items():
            if event_type in config['topics']:
                consumers.append(service)
        return consumers

Event Routing Configuration

# event_mesh_config.yml
services:
  order-service:
    cluster: dc1
    publishes:
      - OrderCreated
      - OrderUpdated
      - OrderCancelled
    subscribes:
      - PaymentProcessed
      - InventoryReserved
  
  payment-service:
    cluster: dc1
    publishes:
      - PaymentProcessed
      - PaymentFailed
    subscribes:
      - OrderCreated
  
  notification-service:
    cluster: dc2
    publishes: []
    subscribes:
      - OrderCreated
      - PaymentProcessed
      - OrderShipped

routing_rules:
  OrderCreated:
    - payment-service
    - notification-service
    - analytics-service
  
  PaymentProcessed:
    - order-service
    - notification-service
    - inventory-service

Circuit Breaker Pattern

Circuit Breaker Implementation

import time
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=30):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.state = CircuitState.CLOSED
        self.last_failure_time = None
    
    def call(self, func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
            else:
                raise CircuitBreakerOpenError("Circuit breaker is open")
        
        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise
    
    def _on_success(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED
    
    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
    
    def _should_attempt_reset(self):
        return (time.time() - self.last_failure_time) >= self.recovery_timeout

# Usage with Kafka producer
class KafkaProducerWithCircuitBreaker:
    def __init__(self, bootstrap_servers):
        self.producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
        self.circuit_breaker = CircuitBreaker(failure_threshold=5)
    
    def send(self, topic, message):
        return self.circuit_breaker.call(
            self.producer.send,
            topic,
            value=json.dumps(message).encode()
        )

Circuit Breaker Configuration

# Circuit breaker for external service calls
circuit_breaker = CircuitBreaker(
    failure_threshold=3,      # Open after 3 failures
    recovery_timeout=60,      # Try recovery after 60 seconds
    expected_exceptions=[ConnectionError, TimeoutError]
)

# Decorator usage
@circuit_breaker.decorator
def call_external_service(data):
    response = requests.post('http://external-service/api', json=data)
    response.raise_for_status()
    return response.json()

Outbox Pattern

Transactional Outbox

from sqlalchemy import create_engine, Column, String, Text, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import json

Base = declarative_base()

class OutboxEvent(Base):
    __tablename__ = 'outbox_events'
    
    id = Column(String(36), primary_key=True)
    aggregate_type = Column(String(255), nullable=False)
    aggregate_id = Column(String(36), nullable=False)
    event_type = Column(String(255), nullable=False)
    payload = Column(Text, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    published = Column(String(1), default='N')

class OrderService:
    def __init__(self, db_session, kafka_producer):
        self.session = db_session
        self.producer = kafka_producer
    
    def create_order(self, order_data):
        # 1. Create order in database
        order = Order(**order_data)
        self.session.add(order)
        
        # 2. Create outbox event in same transaction
        outbox_event = OutboxEvent(
            id=str(uuid.uuid4()),
            aggregate_type='Order',
            aggregate_id=order.id,
            event_type='OrderCreated',
            payload=json.dumps({
                'order_id': order.id,
                'customer_id': order.customer_id,
                'total': order.total
            })
        )
        self.session.add(outbox_event)
        
        # 3. Commit transaction
        self.session.commit()
        
        return order

class OutboxProcessor:
    def __init__(self, db_session, kafka_producer):
        self.session = db_session
        self.producer = kafka_producer
    
    def process_outbox(self):
        # 1. Read unpublished events
        events = self.session.query(OutboxEvent)\
            .filter(OutboxEvent.published == 'N')\
            .all()
        
        for event in events:
            # 2. Publish to Kafka
            self.producer.send(
                f'{event.aggregate_type.lower()}-events',
                key=event.aggregate_id.encode(),
                value=event.payload.encode()
            )
            
            # 3. Mark as published
            event.published = 'Y'
        
        self.session.commit()

Outbox Polling

class OutboxPoller:
    def __init__(self, db_session, kafka_producer, poll_interval=1):
        self.session = db_session
        self.producer = kafka_producer
        self.poll_interval = poll_interval
    
    def start_polling(self):
        while True:
            try:
                self.process_pending_events()
            except Exception as e:
                print(f"Error processing outbox: {e}")
            
            time.sleep(self.poll_interval)
    
    def process_pending_events(self):
        with self.session.begin():
            events = self.session.query(OutboxEvent)\
                .filter(OutboxEvent.published == 'N')\
                .limit(100)\
                .all()
            
            for event in events:
                self.publish_event(event)
                event.published = 'Y'
    
    def publish_event(self, event):
        self.producer.send(
            f'{event.aggregate_type.lower()}-events',
            key=event.aggregate_id.encode(),
            value=event.payload.encode()
        )

Change Data Capture (CDC)

Debezium Setup

{
  "name": "postgres-cdc-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "${file:/opt/kafka/secrets/db.properties:password}",
    "database.dbname": "mydb",
    "database.server.name": "dbserver1",
    "schema.include.list": "public",
    "table.include.list": "public.orders,public.users,public.payments",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_slot",
    "publication.name": "dbz_publication",
    "topic.prefix": "dbserver1",
    "tombstones.on.delete": "true",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3"
  }
}

CDC Event Processing

class CDCEventProcessor:
    def __init__(self, kafka_consumer):
        self.consumer = kafka_consumer
    
    def process_cdc_events(self):
        for message in self.consumer:
            cdc_event = json.loads(message.value)
            
            if cdc_event['op'] == 'c':  # Create
                self.handle_create(cdc_event)
            elif cdc_event['op'] == 'u':  # Update
                self.handle_update(cdc_event)
            elif cdc_event['op'] == 'd':  # Delete
                self.handle_delete(cdc_event)
    
    def handle_create(self, event):
        # Process new record
        print(f"New {event['source']['table']}: {event['after']}")
        
        # Publish domain event
        domain_event = {
            'event_type': f"{event['source']['table']}_created",
            'data': event['after'],
            'timestamp': event['ts_ms']
        }
        
        self.producer.send(
            f"{event['source']['table']}-events",
            value=json.dumps(domain_event).encode()
        )
    
    def handle_update(self, event):
        # Process updated record
        print(f"Updated {event['source']['table']}: {event['after']}")
    
    def handle_delete(self, event):
        # Process deleted record
        print(f"Deleted {event['source']['table']}: {event['key']}")

Event Replay

Event Replay Service

class EventReplayService:
    def __init__(self, kafka_consumer, event_handler):
        self.consumer = kafka_consumer
        self.handler = event_handler
    
    def replay_events(self, topic, from_timestamp, to_timestamp):
        """Replay events from a specific time range"""
        # Seek to beginning of topic
        partitions = self.consumer.partitions_for_topic(topic)
        for partition in partitions:
            tp = TopicPartition(topic, partition)
            self.consumer.seek_to_beginning(tp)
        
        # Consume and process events
        replayed_count = 0
        for message in self.consumer:
            event = json.loads(message.value)
            
            # Filter by timestamp
            if from_timestamp <= event['timestamp'] <= to_timestamp:
                self.handler.process(event)
                replayed_count += 1
        
        return replayed_count
    
    def replay_events_for_aggregate(self, topic, aggregate_id):
        """Replay events for a specific aggregate"""
        self.consumer.subscribe([topic])
        
        replayed_count = 0
        for message in self.consumer:
            event = json.loads(message.value)
            
            if event.get('aggregate_id') == aggregate_id:
                self.handler.process(event)
                replayed_count += 1
        
        return replayed_count

# Usage
replay_service = EventReplayService(consumer, order_handler)

# Replay last 24 hours
from datetime import datetime, timedelta
from_time = datetime.utcnow() - timedelta(hours=24)
to_time = datetime.utcnow()

replayed = replay_service.replay_events(
    'order-events',
    from_timestamp=int(from_time.timestamp() * 1000),
    to_timestamp=int(to_time.timestamp() * 1000)
)

Idempotency Keys

Idempotency Implementation

import hashlib
from functools import wraps

def idempotent(operation_name):
    def decorator(func):
        @wraps(func)
        def wrapper(self, *args, **kwargs):
            # Generate idempotency key
            key_data = f"{operation_name}:{args}:{kwargs}"
            idempotency_key = hashlib.sha256(key_data.encode()).hexdigest()
            
            # Check if already processed
            if self.is_processed(idempotency_key):
                return self.get_cached_result(idempotency_key)
            
            # Process operation
            result = func(self, *args, **kwargs)
            
            # Cache result
            self.cache_result(idempotency_key, result)
            self.mark_processed(idempotency_key)
            
            return result
        return wrapper
    return decorator

class OrderService:
    def __init__(self, redis_client):
        self.redis = redis_client
    
    @idempotent("create_order")
    def create_order(self, order_data):
        # Process order
        order = self.process_order(order_data)
        return order
    
    def is_processed(self, key):
        return self.redis.exists(f"processed:{key}")
    
    def mark_processed(self, key):
        self.redis.setex(f"processed:{key}", 86400, '1')
    
    def cache_result(self, key, result):
        self.redis.setex(f"result:{key}", 86400, json.dumps(result))
    
    def get_cached_result(self, key):
        return json.loads(self.redis.get(f"result:{key}"))

Idempotency in Kafka

class IdempotentKafkaConsumer:
    def __init__(self, consumer, redis_client):
        self.consumer = consumer
        self.redis = redis_client
    
    def consume_idempotently(self, message):
        # Generate idempotency key from message
        idempotency_key = self.generate_key(message)
        
        # Check if already processed
        if self.is_processed(idempotency_key):
            return {'status': 'skipped'}
        
        # Process message
        result = self.process_message(message)
        
        # Mark as processed
        self.mark_processed(idempotency_key)
        
        return result
    
    def generate_key(self, message):
        # Use topic, partition, offset as idempotency key
        key_data = f"{message.topic}:{message.partition}:{message.offset}"
        return hashlib.sha256(key_data.encode()).hexdigest()
    
    def is_processed(self, key):
        return self.redis.exists(f"kafka:processed:{key}")
    
    def mark_processed(self, key):
        self.redis.setex(f"kafka:processed:{key}", 86400, '1')

Best Practices

Pattern Selection

PatternUse CaseComplexity
Event MeshMulti-service communicationHigh
Circuit BreakerExternal service callsMedium
OutboxReliable event publishingMedium
CDCDatabase synchronizationHigh
Event ReplayAudit and debuggingMedium

Implementation Checklist

  • Event mesh routing configured
  • Circuit breaker for external calls
  • Outbox pattern for reliability
  • CDC for database sync
  • Event replay capability
  • Idempotency keys implemented
  • Dead letter queues configured
  • Monitoring and alerting

Summary

Real-world Kafka patterns including event mesh, circuit breaker, outbox pattern, CDC, and event replay provide reliable, scalable, and maintainable event-driven architectures. Choose patterns based on your specific requirements for reliability, scalability, and data consistency.

⭐

Premium Content

Kafka Real-World Patterns

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Kafka Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement