Event-Driven Architecture: SNS, SQS, EventBridge, Kafka
Difficulty: Senior Level | Companies: Netflix, Uber, LinkedIn, AWS, Confluent
Interview Question
"Design an event-driven architecture for an e-commerce platform that handles 1 million events per second with guaranteed delivery and exactly-once processing."
โน๏ธKey Concepts
This question tests your understanding of event-driven patterns, message queuing, and distributed systems for high-throughput scenarios.
Complete Event-Driven Architecture
Architecture Overview
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ EVENT-DRIVEN ARCHITECTURE โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโโโโโโโ EVENT PRODUCERS โโโโโโโโโโโโโโโโโโโ โ
โ โ โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โ โ Web App โ โ Mobile โ โ IoT โ โ โ
โ โ โ (React) โ โ App โ โ Devices โ โ โ
โ โ โโโโโโฌโโโโโโ โโโโโโฌโโโโโโ โโโโโโฌโโโโโโ โ โ
โ โ โ โ โ โ โ
โ โโโโโโโโโผโโโโโโโโโโโโโโผโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ โ โ
โ โโโโโโโโโผโโโโโโโโโโโโโโผโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโ โ
โ โ EVENT BUS LAYER โ โ
โ โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ Apache Kafka Cluster โ โ โ
โ โ โ โ โ โ
โ โ โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ โ โ
โ โ โ โ Topic 1 โ โ Topic 2 โ โ Topic 3 โ โ โ โ
โ โ โ โ(Orders) โ โ(Payments)โ โ(Inventory)โ โ โ โ
โ โ โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ โ โ
โ โ โ โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ AWS SNS/SQS โ โ โ
โ โ โ โ โ โ
โ โ โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ โ โ
โ โ โ โ SNS โ โ SQS โ โEventBrdgโ โ โ โ
โ โ โ โ (Fanout)โ โ (Queue) โ โ (Router)โ โ โ โ
โ โ โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ โ โ
โ โ โ โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโ EVENT CONSUMERS โโโโโโโโโโโโโโโโโโโ โ
โ โ โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โ โ Order โ โ Payment โ โ Inventoryโ โ โ
โ โ โ Service โ โ Service โ โ Service โ โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โ โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โ โ Shipping โ โ Analyticsโ โ Notification โ โ
โ โ โ Service โ โ Service โ โ Service โ โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Mathematical Foundation: Event Processing
Throughput Calculation:
- Total events per second: E = 1,000,000
- Average event size: S = 1KB
- Total throughput: T = E ร S = 1GB/s
- Kafka partitions needed: P = T / (throughput_per_partition)
- With 10MB/s per partition: P = 1000 / 10 = 100 partitions
Latency Budget:
- End-to-end latency: L_total = 100ms
- Producer to broker: L_pb = 10ms
- Broker processing: L_bp = 20ms
- Consumer processing: L_cp = 30ms
- Network overhead: L_net = 20ms
- Buffer: L_buffer = 20ms
Exactly-Once Semantics:
- Idempotency key: UUID per event
- Transaction ID: Unique per batch
- Deduplication window: W = 5 minutes
- Storage needed: S = events ร idempotency_key_size ร W
Apache Kafka Implementation
# Kafka producer and consumer
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import json
from typing import Dict, Any, List, Callable
from dataclasses import dataclass
from datetime import datetime
import uuid
import hashlib
@dataclass
class KafkaConfig:
bootstrap_servers: List[str]
topic: str
group_id: str
acks: str = 'all'
retries: int = 3
max_in_flight_requests_per_connection: int = 5
class EventProducer:
"""Kafka event producer with exactly-once semantics"""
def __init__(self, config: KafkaConfig):
self.producer = KafkaProducer(
bootstrap_servers=config.bootstrap_servers,
value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks=config.acks,
retries=config.retries,
max_in_flight_requests_per_connection=config.max_in_flight_requests_per_connection,
enable_idempotence=True, # Enable exactly-once
transactional_id=str(uuid.uuid4())
)
self.config = config
def produce_event(self, event_type: str, data: Dict[str, Any], key: str = None):
"""Produce event with idempotency"""
event = {
'event_id': str(uuid.uuid4()),
'event_type': event_type,
'timestamp': datetime.utcnow().isoformat(),
'data': data,
'metadata': {
'producer_id': str(uuid.uuid4()),
'version': '1.0'
}
}
# Use event_id as key for ordering
if key is None:
key = event['event_id']
try:
future = self.producer.send(
topic=self.config.topic,
key=key,
value=event
)
record_metadata = future.get(timeout=10)
return {
'event_id': event['event_id'],
'topic': record_metadata.topic,
'partition': record_metadata.partition,
'offset': record_metadata.offset
}
except KafkaError as e:
print(f"Error producing event: {e}")
raise
def produce_batch(self, events: List[Dict[str, Any]]):
"""Produce batch of events"""
futures = []
for event in events:
future = self.producer.send(
topic=self.config.topic,
key=event.get('event_id'),
value=event
)
futures.append(future)
# Wait for all to complete
for future in futures:
future.get(timeout=10)
def flush(self):
self.producer.flush()
def close(self):
self.producer.close()
class EventConsumer:
"""Kafka event consumer with exactly-once processing"""
def __init__(self, config: KafkaConfig):
self.consumer = KafkaConsumer(
config.topic,
bootstrap_servers=config.bootstrap_servers,
group_id=config.group_id,
auto_offset_reset='earliest',
enable_auto_commit=False, # Manual commit for exactly-once
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
key_deserializer=lambda m: m.decode('utf-8') if m else None,
max_poll_records=500,
session_timeout_ms=30000,
heartbeat_interval_ms=10000
)
self.config = config
self.processed_events: set = set()
def consume_events(self, handler: Callable):
"""Consume events with exactly-once processing"""
try:
for message in self.consumer:
event = message.value
event_id = event.get('event_id')
# Check if already processed (deduplication)
if event_id in self.processed_events:
print(f"Skipping duplicate event: {event_id}")
continue
try:
# Process event
handler(event)
# Mark as processed
self.processed_events.add(event_id)
# Commit offset
self.consumer.commit()
except Exception as e:
print(f"Error processing event {event_id}: {e}")
# Don't commit - will retry
except KeyboardInterrupt:
print("Stopping consumer...")
finally:
self.consumer.close()
def cleanup_old_events(self, max_age_hours: int = 24):
"""Cleanup old processed event IDs"""
# In production, use Redis or database for this
pass
# Example usage
def process_order_event(event: Dict[str, Any]):
"""Process order event"""
print(f"Processing order: {event['data']['order_id']}")
# Process order...
# Initialize
config = KafkaConfig(
bootstrap_servers=['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
topic='orders',
group_id='order-processor'
)
consumer = EventConsumer(config)
consumer.consume_events(process_order_event)
โ ๏ธKafka Configuration
Enable idempotence for exactly-once semantics. Use multiple partitions for parallel processing. Monitor consumer lag to detect processing bottlenecks.
AWS SNS/SQS Integration
# SNS/SQS event-driven architecture
import boto3
import json
from typing import Dict, Any, List
from dataclasses import dataclass
@dataclass
class SNSConfig:
topic_arn: str
region: str = 'us-east-1'
class SNSEventPublisher:
"""SNS event publisher"""
def __init__(self, config: SNSConfig):
self.sns = boto3.client('sns', region_name=config.region)
self.topic_arn = config.topic_arn
def publish_event(self, event_type: str, data: Dict[str, Any],
message_attributes: Dict[str, str] = None):
"""Publish event to SNS"""
message = {
'event_type': event_type,
'data': data,
'timestamp': datetime.utcnow().isoformat()
}
attributes = {}
if message_attributes:
for key, value in message_attributes.items():
attributes[key] = {
'DataType': 'String',
'StringValue': value
}
response = self.sns.publish(
TopicArn=self.topic_arn,
Message=json.dumps(message),
MessageAttributes=attributes
)
return response
def publish_batch(self, events: List[Dict[str, Any]]):
"""Publish batch of events"""
# SNS doesn't support batch publish directly
# Use SQS batch publish instead
pass
class SQSConsumer:
"""SQS event consumer with visibility timeout"""
def __init__(self, queue_url: str, region: str = 'us-east-1'):
self.sqs = boto3.client('sqs', region_name=region)
self.queue_url = queue_url
def receive_messages(self, max_messages: int = 10,
wait_time_seconds: int = 20) -> List[Dict[str, Any]]:
"""Receive messages from SQS"""
response = self.sqs.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=max_messages,
WaitTimeSeconds=wait_time_seconds,
MessageAttributeNames=['All'],
VisibilityTimeout=30
)
return response.get('Messages', [])
def process_message(self, message: Dict[str, Any], handler):
"""Process single message"""
try:
body = json.loads(message['Body'])
handler(body)
# Delete message after successful processing
self.sqs.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=message['ReceiptHandle']
)
return True
except Exception as e:
print(f"Error processing message: {e}")
return False
def change_visibility(self, message: Dict[str, Any], timeout: int):
"""Change message visibility timeout"""
self.sqs.change_message_visibility(
QueueUrl=self.queue_url,
ReceiptHandle=message['ReceiptHandle'],
VisibilityTimeout=timeout
)
class DLQHandler:
"""Dead Letter Queue handler"""
def __init__(self, dlq_url: str, region: str = 'us-east-1'):
self.sqs = boto3.client('sqs', region_name=region)
self.dlq_url = dlq_url
def send_to_dlq(self, message: Dict[str, Any], error: str):
"""Send failed message to DLQ"""
dlq_message = {
'original_message': message,
'error': error,
'timestamp': datetime.utcnow().isoformat(),
'retry_count': message.get('Attributes', {}).get('ApproximateReceiveCount', 0)
}
self.sqs.send_message(
QueueUrl=self.dlq_url,
MessageBody=json.dumps(dlq_message)
)
def process_dlq(self, handler):
"""Process messages from DLQ"""
while True:
messages = self.receive_messages()
if not messages:
break
for message in messages:
try:
handler(message)
self.delete_message(message)
except Exception as e:
print(f"Error processing DLQ message: {e}")
# Example: Order processing with SNS/SQS
"""
Order Service -> SNS Topic -> SQS Queues -> Multiple Consumers
1. Order Service publishes OrderCreated event to SNS
2. SNS fans out to multiple SQS queues:
- Payment Queue
- Inventory Queue
- Notification Queue
3. Each consumer processes independently
4. Failed messages go to DLQ
"""
EventBridge Integration
# EventBridge event routing
import boto3
import json
from typing import Dict, Any, List
from datetime import datetime
class EventBridgePublisher:
"""EventBridge event publisher"""
def __init__(self, event_bus_name: str = 'default'):
self.eventbridge = boto3.client('events')
self.event_bus_name = event_bus_name
def publish_event(self, source: str, detail_type: str,
detail: Dict[str, Any], resources: List[str] = None):
"""Publish event to EventBridge"""
entries = [{
'Source': source,
'DetailType': detail_type,
'Detail': json.dumps(detail),
'EventBusName': self.event_bus_name,
'Time': datetime.utcnow()
}]
if resources:
entries[0]['Resources'] = resources
response = self.eventbridge.put_events(Entries=entries)
return response
def publish_order_event(self, order_data: Dict[str, Any]):
"""Publish order event"""
self.publish_event(
source='com.ecommerce.orders',
detail_type='OrderCreated',
detail=order_data,
resources=[f"arn:aws:ecommerce:orders:{order_data['order_id']}"]
)
def publish_payment_event(self, payment_data: Dict[str, Any]):
"""Publish payment event"""
self.publish_event(
source='com.ecommerce.payments',
detail_type='PaymentProcessed',
detail=payment_data
)
class EventBridgeRuleManager:
"""EventBridge rule management"""
def __init__(self):
self.eventbridge = boto3.client('events')
def create_routing_rule(self, name: str, source_pattern: str,
detail_type_pattern: str, targets: List[Dict[str, str]]):
"""Create event routing rule"""
# Create rule
self.eventbridge.put_rule(
Name=name,
EventPattern=jsonencode({
'source': [source_pattern],
'detail-type': [detail_type_pattern]
}),
State='ENABLED',
Description=f'Route {detail_type_pattern} events from {source_pattern}'
)
# Add targets
for target in targets:
self.eventbridge.put_targets(
Rule=name,
Targets=[{
'Id': target['id'],
'Arn': target['arn'],
'Input': target.get('input', ''),
'InputPath': target.get('input_path', ''),
'InputTransformer': target.get('input_transformer', {})
}]
)
def create_content_based_routing(self, name: str, rules: List[Dict[str, Any]]):
"""Create content-based routing rules"""
for rule in rules:
self.eventbridge.put_rule(
Name=f"{name}-{rule['name']}",
EventPattern=json.dumps(rule['pattern']),
State='ENABLED'
)
self.eventbridge.put_targets(
Rule=f"{name}-{rule['name']}",
Targets=[{
'Id': rule['target_id'],
'Arn': rule['target_arn']
}]
)
# Example: Content-based routing
"""
{
"source": ["com.ecommerce.orders"],
"detail-type": ["OrderCreated"],
"detail": {
"status": ["confirmed"]
}
}
This rule routes OrderCreated events with status=confirmed to a specific target.
"""
โน๏ธEventBridge Benefits
EventBridge provides serverless event routing with content-based filtering. Use it for decoupled architectures where producers and consumers don't need to know about each other.
Event Schema Management
# Event schema registry
import json
from typing import Dict, Any, List
from dataclasses import dataclass
from datetime import datetime
import hashlib
@dataclass
class EventSchema:
schema_id: str
event_type: str
version: str
schema: Dict[str, Any]
created_at: datetime
description: str
class SchemaRegistry:
"""Event schema registry"""
def __init__(self):
self.schemas: Dict[str, EventSchema] = {}
def register_schema(self, event_type: str, version: str,
schema: Dict[str, Any], description: str = "") -> str:
"""Register new event schema"""
schema_id = hashlib.md5(
f"{event_type}:{version}:{json.dumps(schema, sort_keys=True)}".encode()
).hexdigest()
self.schemas[schema_id] = EventSchema(
schema_id=schema_id,
event_type=event_type,
version=version,
schema=schema,
created_at=datetime.utcnow(),
description=description
)
return schema_id
def get_schema(self, schema_id: str) -> EventSchema:
"""Get schema by ID"""
return self.schemas.get(schema_id)
def get_schema_by_type(self, event_type: str, version: str = None) -> EventSchema:
"""Get schema by event type"""
for schema in self.schemas.values():
if schema.event_type == event_type:
if version is None or schema.version == version:
return schema
return None
def validate_event(self, schema_id: str, event_data: Dict[str, Any]) -> bool:
"""Validate event against schema"""
schema = self.get_schema(schema_id)
if not schema:
return False
# Simple validation - in production use jsonschema
return self._validate_against_schema(event_data, schema.schema)
def _validate_against_schema(self, data: Dict[str, Any],
schema: Dict[str, Any]) -> bool:
"""Validate data against JSON schema"""
# Simplified validation
if 'required' in schema:
for field in schema['required']:
if field not in data:
return False
return True
# Example schema
ORDER_CREATED_SCHEMA = {
"type": "object",
"properties": {
"order_id": {"type": "string"},
"user_id": {"type": "string"},
"items": {
"type": "array",
"items": {
"type": "object",
"properties": {
"product_id": {"type": "string"},
"quantity": {"type": "integer"},
"price": {"type": "number"}
}
}
},
"total": {"type": "number"},
"currency": {"type": "string"}
},
"required": ["order_id", "user_id", "items", "total"]
}
# Register schema
registry = SchemaRegistry()
schema_id = registry.register_schema(
event_type='OrderCreated',
version='1.0',
schema=ORDER_CREATED_SCHEMA,
description='Schema for order created events'
)
Summary
| Component | Purpose | Use Case |
|---|---|---|
| Kafka | High-throughput streaming | Real-time event processing |
| SNS | Fan-out messaging | Broadcast to multiple consumers |
| SQS | Message queuing | Reliable message delivery |
| EventBridge | Event routing | Content-based routing |
| Schema Registry | Event validation | Schema evolution management |