CQRS & Event Sourcing
Difficulty: Senior Level | Companies: AWS, Google, Microsoft, Netflix, Uber
CQRS vs Event Sourcing
CQRS separates reads from writes. Event Sourcing stores all changes as events. They're complementary patterns often used together.
โน๏ธ
CQRS is about separating concerns; Event Sourcing is about how you store state. You can use CQRS without Event Sourcing and vice versa.
Architecture Diagram
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Commands โ
โ CreateOrder, CancelOrder, etc. โ
โโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโ
โ Command Handler โ
โ Validates, Creates Events โ
โโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโ
โ Event Store โ
โ (Append-only log of events) โ
โโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโโโโโโโโ
โ Event Bus โ
โโโโโโโโฌโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโ
โ โ
โโโโโโโโโโโโโโผโโโโโโโ โโโโโโโโโโผโโโโโโโโโโโโโ
โ Write Model โ โ Read Model โ
โ (Aggregate) โ โ (Projections) โ
โ PostgreSQL โ โ Elasticsearch โ
โโโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโ
โ Queries โ
โ GetOrder, ListOrders, SearchOrders โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Pattern 1: Event Store with DynamoDB
Store events with optimistic concurrency control.
import { DynamoDB } from 'aws-sdk';
import { v4 as uuidv4 } from 'uuid';
const dynamodb = new DynamoDB.DocumentClient();
interface StoredEvent {
aggregateId: string;
version: number;
eventType: string;
data: any;
metadata: {
causationId: string;
correlationId: string;
userId: string;
timestamp: string;
};
}
export class DynamoEventStore {
private tableName = 'events';
async append(event: StoredEvent): Promise<void> {
await dynamodb
.put({
TableName: this.tableName,
Item: {
...event,
createdAt: Date.now(),
},
// Optimistic concurrency: fail if version exists
ConditionExpression: 'attribute_not_exists(version)',
})
.promise();
}
async getEvents(aggregateId: string, fromVersion: number = 0): Promise<StoredEvent[]> {
const result = await dynamodb
.query({
TableName: this.tableName,
KeyConditionExpression: 'aggregateId = :id AND version > :from',
ExpressionAttributeValues: {
':id': aggregateId,
':from': fromVersion,
},
ScanIndexForward: true,
})
.promise();
return (result.Items || []) as StoredEvent[];
}
async getEventsByType(eventType: string, limit: number = 100): Promise<StoredEvent[]> {
const result = await dynamodb
.query({
TableName: this.tableName,
IndexName: 'eventType-timestamp-index',
KeyConditionExpression: 'eventType = :type',
ExpressionAttributeValues: {
':type': eventType,
},
Limit: limit,
ScanIndexForward: false,
})
.promise();
return (result.Items || []) as StoredEvent[];
}
}
Pattern 2: Aggregate with Event Sourcing
Rebuild aggregate state from event history.
// Base aggregate class
export abstract class Aggregate {
protected version: number = 0;
protected changes: StoredEvent[] = [];
abstract when(event: StoredEvent): void;
load(events: StoredEvent[]): void {
for (const event of events) {
this.when(event);
this.version = event.version;
}
}
protected apply(eventType: string, data: any): void {
const event: StoredEvent = {
aggregateId: this.id,
version: this.version + 1,
eventType,
data,
metadata: {
causationId: uuidv4(),
correlationId: getCorrelationId(),
userId: getCurrentUserId(),
timestamp: new Date().toISOString(),
},
};
this.when(event);
this.changes.push(event);
this.version++;
}
getUncommittedChanges(): StoredEvent[] {
return this.changes;
}
clearChanges(): void {
this.changes = [];
}
}
// Order aggregate
export class OrderAggregate extends Aggregate {
id: string = '';
customerId: string = '';
items: OrderItem[] = [];
status: string = 'pending';
total: number = 0;
when(event: StoredEvent): void {
switch (event.eventType) {
case 'OrderCreated':
this.id = event.aggregateId;
this.customerId = event.data.customerId;
this.items = event.data.items;
this.status = 'created';
this.total = this.calculateTotal();
break;
case 'OrderItemAdded':
this.items.push(event.data.item);
this.total = this.calculateTotal();
break;
case 'OrderConfirmed':
this.status = 'confirmed';
break;
case 'OrderCancelled':
this.status = 'cancelled';
break;
}
}
// Command methods
static create(customerId: string, items: OrderItem[]): OrderAggregate {
const order = new OrderAggregate();
order.id = uuidv4();
order.apply('OrderCreated', { customerId, items });
return order;
}
addItem(item: OrderItem): void {
if (this.status !== 'created') {
throw new Error('Can only add items to created orders');
}
this.apply('OrderItemAdded', { item });
}
confirm(): void {
if (this.status !== 'created') {
throw new Error('Can only confirm created orders');
}
this.apply('OrderConfirmed', {});
}
cancel(): void {
if (this.status === 'cancelled') {
throw new Error('Order already cancelled');
}
this.apply('OrderCancelled', { reason: 'Customer requested' });
}
private calculateTotal(): number {
return this.items.reduce((sum, item) => sum + item.price * item.quantity, 0);
}
}
โน๏ธ
Aggregates enforce business invariants. Commands go through aggregates, which validate state before applying events.
Pattern 3: Read Model Projections
Build optimized read models from events.
// Projection for order list view
export class OrderListProjection {
constructor(private readDb: ReadDatabase) {}
async project(event: StoredEvent): Promise<void> {
switch (event.eventType) {
case 'OrderCreated':
await this.readDb.upsert('order_list', {
id: event.aggregateId,
customerId: event.data.customerId,
status: 'created',
itemCount: event.data.items.length,
total: event.data.items.reduce(
(sum: number, i: any) => sum + i.price * i.quantity,
0
),
createdAt: event.metadata.timestamp,
});
break;
case 'OrderConfirmed':
await this.readDb.update('order_list', event.aggregateId, {
status: 'confirmed',
confirmedAt: event.metadata.timestamp,
});
break;
case 'OrderCancelled':
await this.readDb.update('order_list', event.aggregateId, {
status: 'cancelled',
cancelledAt: event.metadata.timestamp,
});
break;
}
}
}
// Projection for customer order summary
export class CustomerOrderSummaryProjection {
constructor(private readDb: ReadDatabase) {}
async project(event: StoredEvent): Promise<void> {
if (event.eventType === 'OrderCreated') {
await this.readDb.upsert('customer_order_summary', {
customerId: event.data.customerId,
totalOrders: { $inc: 1 },
totalSpent: { $inc: event.data.items.reduce(
(sum: number, i: any) => sum + i.price * i.quantity, 0
)},
lastOrderAt: event.metadata.timestamp,
});
}
}
}
Pattern 4: Eventual Consistency Handler
Handle projection lag and out-of-order events.
# Projection with version tracking for eventual consistency
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
@dataclass
class ProjectionState:
aggregate_id: str
last_version: int
last_projected_at: datetime
error: Optional[str] = None
class ConsistentProjection:
def __init__(self, event_store, read_db, state_store):
self.event_store = event_store
self.read_db = read_db
self.state_store = state_store
async def catch_up(self, aggregate_id: str):
"""Rebuild projection from last known version."""
state = await self.state_store.get(aggregate_id)
from_version = state.last_version if state else 0
events = await self.event_store.get_events(aggregate_id, from_version)
for event in events:
try:
await self.process_event(event)
await self.state_store.update(ProjectionState(
aggregate_id=aggregate_id,
last_version=event['version'],
last_projected_at=datetime.utcnow(),
))
except Exception as e:
await self.state_store.update(ProjectionState(
aggregate_id=aggregate_id,
last_version=event['version'] - 1,
last_projected_at=datetime.utcnow(),
error=str(e),
))
raise
async def process_event(self, event: dict):
"""Process single event idempotently."""
# Check if already processed
existing = await self.read_db.get('processed_events', event['eventId'])
if existing:
return
# Process based on type
if event['eventType'] == 'OrderCreated':
await self.read_db.upsert('orders', {
'id': event['aggregateId'],
'status': 'created',
'data': event['data'],
})
# Mark as processed
await self.read_db.insert('processed_events', {
'eventId': event['eventId'],
'processedAt': datetime.utcnow(),
})
Pattern 5: Snapshotting for Long Event Streams
Periodically snapshot aggregates to avoid replaying all events.
export class SnapshottingEventStore {
private snapshotInterval = 100;
async loadAggregate(id: string): Promise<OrderAggregate> {
// Try loading from snapshot first
const snapshot = await this.loadSnapshot(id);
const aggregate = new OrderAggregate();
if (snapshot) {
aggregate.loadFromSnapshot(snapshot);
// Load only events after snapshot
const events = await this.eventStore.getEvents(id, snapshot.version);
aggregate.load(events);
} else {
const events = await this.eventStore.getEvents(id);
aggregate.load(events);
}
return aggregate;
}
async saveAggregate(aggregate: OrderAggregate): Promise<void> {
const changes = aggregate.getUncommittedChanges();
// Append events
for (const event of changes) {
await this.eventStore.append(event);
}
// Create snapshot if interval reached
if (aggregate.version % this.snapshotInterval === 0) {
await this.saveSnapshot(aggregate);
}
aggregate.clearChanges();
}
private async saveSnapshot(aggregate: OrderAggregate): Promise<void> {
await dynamodb.put({
TableName: 'snapshots',
Item: {
aggregateId: aggregate.id,
aggregateType: 'Order',
version: aggregate.version,
state: aggregate.toSnapshot(),
createdAt: Date.now(),
},
}).promise();
}
}
โ ๏ธ
Snapshotting adds complexity. Start without it and add only when event replay becomes a performance bottleneck (typically 1000+ events per aggregate).
CQRS/ES Decision Framework
| Scenario | CQRS | Event Sourcing | Both |
|---|---|---|---|
| Simple CRUD app | No | No | No |
| Complex business logic | Yes | Optional | Optional |
| Audit requirements | Optional | Yes | Yes |
| High read/write ratio | Yes | Optional | Yes |
| Financial systems | Yes | Yes | Yes |
| Real-time analytics | Yes | Optional | Yes |
Follow-Up Questions
- How do you handle schema evolution for events when the aggregate logic changes over time?
- What strategies would you use to query across multiple aggregates without building a monolithic read model?
- How do you implement eventual consistency in a UI when using CQRS with asynchronous projections?