๐ŸŽ‰ 75% of content is free forever โ€” Unlock Premium from $10/mo โ†’
CW
Search coursesโ€ฆ
๐Ÿ’ผ Servicesโ„น๏ธ Aboutโœ‰๏ธ ContactView Pricing Plansfrom $10

CQRS & Event Sourcing

Cloud ArchitectureData Patternsโญ Premium

Advertisement

CQRS & Event Sourcing

Difficulty: Senior Level | Companies: AWS, Google, Microsoft, Netflix, Uber

CQRS vs Event Sourcing

CQRS separates reads from writes. Event Sourcing stores all changes as events. They're complementary patterns often used together.

โ„น๏ธ

CQRS is about separating concerns; Event Sourcing is about how you store state. You can use CQRS without Event Sourcing and vice versa.

Architecture Diagram

Architecture Diagram
                    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                    โ”‚            Commands                  โ”‚
                    โ”‚  CreateOrder, CancelOrder, etc.     โ”‚
                    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                                  โ”‚
                    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                    โ”‚        Command Handler              โ”‚
                    โ”‚   Validates, Creates Events         โ”‚
                    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                                  โ”‚
                    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                    โ”‚        Event Store                  โ”‚
                    โ”‚  (Append-only log of events)        โ”‚
                    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                                  โ”‚
                    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                    โ”‚          Event Bus                  โ”‚
                    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                           โ”‚                  โ”‚
              โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
              โ”‚  Write Model      โ”‚ โ”‚  Read Model         โ”‚
              โ”‚  (Aggregate)      โ”‚ โ”‚  (Projections)      โ”‚
              โ”‚  PostgreSQL       โ”‚ โ”‚  Elasticsearch      โ”‚
              โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                                              โ”‚
                    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                    โ”‚            Queries                  โ”‚
                    โ”‚  GetOrder, ListOrders, SearchOrders โ”‚
                    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Pattern 1: Event Store with DynamoDB

Store events with optimistic concurrency control.

import { DynamoDB } from 'aws-sdk';
import { v4 as uuidv4 } from 'uuid';

const dynamodb = new DynamoDB.DocumentClient();

interface StoredEvent {
  aggregateId: string;
  version: number;
  eventType: string;
  data: any;
  metadata: {
    causationId: string;
    correlationId: string;
    userId: string;
    timestamp: string;
  };
}

export class DynamoEventStore {
  private tableName = 'events';

  async append(event: StoredEvent): Promise<void> {
    await dynamodb
      .put({
        TableName: this.tableName,
        Item: {
          ...event,
          createdAt: Date.now(),
        },
        // Optimistic concurrency: fail if version exists
        ConditionExpression: 'attribute_not_exists(version)',
      })
      .promise();
  }

  async getEvents(aggregateId: string, fromVersion: number = 0): Promise<StoredEvent[]> {
    const result = await dynamodb
      .query({
        TableName: this.tableName,
        KeyConditionExpression: 'aggregateId = :id AND version > :from',
        ExpressionAttributeValues: {
          ':id': aggregateId,
          ':from': fromVersion,
        },
        ScanIndexForward: true,
      })
      .promise();

    return (result.Items || []) as StoredEvent[];
  }

  async getEventsByType(eventType: string, limit: number = 100): Promise<StoredEvent[]> {
    const result = await dynamodb
      .query({
        TableName: this.tableName,
        IndexName: 'eventType-timestamp-index',
        KeyConditionExpression: 'eventType = :type',
        ExpressionAttributeValues: {
          ':type': eventType,
        },
        Limit: limit,
        ScanIndexForward: false,
      })
      .promise();

    return (result.Items || []) as StoredEvent[];
  }
}

Pattern 2: Aggregate with Event Sourcing

Rebuild aggregate state from event history.

// Base aggregate class
export abstract class Aggregate {
  protected version: number = 0;
  protected changes: StoredEvent[] = [];

  abstract when(event: StoredEvent): void;

  load(events: StoredEvent[]): void {
    for (const event of events) {
      this.when(event);
      this.version = event.version;
    }
  }

  protected apply(eventType: string, data: any): void {
    const event: StoredEvent = {
      aggregateId: this.id,
      version: this.version + 1,
      eventType,
      data,
      metadata: {
        causationId: uuidv4(),
        correlationId: getCorrelationId(),
        userId: getCurrentUserId(),
        timestamp: new Date().toISOString(),
      },
    };

    this.when(event);
    this.changes.push(event);
    this.version++;
  }

  getUncommittedChanges(): StoredEvent[] {
    return this.changes;
  }

  clearChanges(): void {
    this.changes = [];
  }
}

// Order aggregate
export class OrderAggregate extends Aggregate {
  id: string = '';
  customerId: string = '';
  items: OrderItem[] = [];
  status: string = 'pending';
  total: number = 0;

  when(event: StoredEvent): void {
    switch (event.eventType) {
      case 'OrderCreated':
        this.id = event.aggregateId;
        this.customerId = event.data.customerId;
        this.items = event.data.items;
        this.status = 'created';
        this.total = this.calculateTotal();
        break;
      case 'OrderItemAdded':
        this.items.push(event.data.item);
        this.total = this.calculateTotal();
        break;
      case 'OrderConfirmed':
        this.status = 'confirmed';
        break;
      case 'OrderCancelled':
        this.status = 'cancelled';
        break;
    }
  }

  // Command methods
  static create(customerId: string, items: OrderItem[]): OrderAggregate {
    const order = new OrderAggregate();
    order.id = uuidv4();
    order.apply('OrderCreated', { customerId, items });
    return order;
  }

  addItem(item: OrderItem): void {
    if (this.status !== 'created') {
      throw new Error('Can only add items to created orders');
    }
    this.apply('OrderItemAdded', { item });
  }

  confirm(): void {
    if (this.status !== 'created') {
      throw new Error('Can only confirm created orders');
    }
    this.apply('OrderConfirmed', {});
  }

  cancel(): void {
    if (this.status === 'cancelled') {
      throw new Error('Order already cancelled');
    }
    this.apply('OrderCancelled', { reason: 'Customer requested' });
  }

  private calculateTotal(): number {
    return this.items.reduce((sum, item) => sum + item.price * item.quantity, 0);
  }
}

โ„น๏ธ

Aggregates enforce business invariants. Commands go through aggregates, which validate state before applying events.

Pattern 3: Read Model Projections

Build optimized read models from events.

// Projection for order list view
export class OrderListProjection {
  constructor(private readDb: ReadDatabase) {}

  async project(event: StoredEvent): Promise<void> {
    switch (event.eventType) {
      case 'OrderCreated':
        await this.readDb.upsert('order_list', {
          id: event.aggregateId,
          customerId: event.data.customerId,
          status: 'created',
          itemCount: event.data.items.length,
          total: event.data.items.reduce(
            (sum: number, i: any) => sum + i.price * i.quantity,
            0
          ),
          createdAt: event.metadata.timestamp,
        });
        break;

      case 'OrderConfirmed':
        await this.readDb.update('order_list', event.aggregateId, {
          status: 'confirmed',
          confirmedAt: event.metadata.timestamp,
        });
        break;

      case 'OrderCancelled':
        await this.readDb.update('order_list', event.aggregateId, {
          status: 'cancelled',
          cancelledAt: event.metadata.timestamp,
        });
        break;
    }
  }
}

// Projection for customer order summary
export class CustomerOrderSummaryProjection {
  constructor(private readDb: ReadDatabase) {}

  async project(event: StoredEvent): Promise<void> {
    if (event.eventType === 'OrderCreated') {
      await this.readDb.upsert('customer_order_summary', {
        customerId: event.data.customerId,
        totalOrders: { $inc: 1 },
        totalSpent: { $inc: event.data.items.reduce(
          (sum: number, i: any) => sum + i.price * i.quantity, 0
        )},
        lastOrderAt: event.metadata.timestamp,
      });
    }
  }
}

Pattern 4: Eventual Consistency Handler

Handle projection lag and out-of-order events.

# Projection with version tracking for eventual consistency
from dataclasses import dataclass
from datetime import datetime
from typing import Optional

@dataclass
class ProjectionState:
    aggregate_id: str
    last_version: int
    last_projected_at: datetime
    error: Optional[str] = None

class ConsistentProjection:
    def __init__(self, event_store, read_db, state_store):
        self.event_store = event_store
        self.read_db = read_db
        self.state_store = state_store
    
    async def catch_up(self, aggregate_id: str):
        """Rebuild projection from last known version."""
        state = await self.state_store.get(aggregate_id)
        from_version = state.last_version if state else 0
        
        events = await self.event_store.get_events(aggregate_id, from_version)
        
        for event in events:
            try:
                await self.process_event(event)
                await self.state_store.update(ProjectionState(
                    aggregate_id=aggregate_id,
                    last_version=event['version'],
                    last_projected_at=datetime.utcnow(),
                ))
            except Exception as e:
                await self.state_store.update(ProjectionState(
                    aggregate_id=aggregate_id,
                    last_version=event['version'] - 1,
                    last_projected_at=datetime.utcnow(),
                    error=str(e),
                ))
                raise
    
    async def process_event(self, event: dict):
        """Process single event idempotently."""
        # Check if already processed
        existing = await self.read_db.get('processed_events', event['eventId'])
        if existing:
            return
        
        # Process based on type
        if event['eventType'] == 'OrderCreated':
            await self.read_db.upsert('orders', {
                'id': event['aggregateId'],
                'status': 'created',
                'data': event['data'],
            })
        
        # Mark as processed
        await self.read_db.insert('processed_events', {
            'eventId': event['eventId'],
            'processedAt': datetime.utcnow(),
        })

Pattern 5: Snapshotting for Long Event Streams

Periodically snapshot aggregates to avoid replaying all events.

export class SnapshottingEventStore {
  private snapshotInterval = 100;

  async loadAggregate(id: string): Promise<OrderAggregate> {
    // Try loading from snapshot first
    const snapshot = await this.loadSnapshot(id);
    const aggregate = new OrderAggregate();
    
    if (snapshot) {
      aggregate.loadFromSnapshot(snapshot);
      // Load only events after snapshot
      const events = await this.eventStore.getEvents(id, snapshot.version);
      aggregate.load(events);
    } else {
      const events = await this.eventStore.getEvents(id);
      aggregate.load(events);
    }
    
    return aggregate;
  }

  async saveAggregate(aggregate: OrderAggregate): Promise<void> {
    const changes = aggregate.getUncommittedChanges();
    
    // Append events
    for (const event of changes) {
      await this.eventStore.append(event);
    }
    
    // Create snapshot if interval reached
    if (aggregate.version % this.snapshotInterval === 0) {
      await this.saveSnapshot(aggregate);
    }
    
    aggregate.clearChanges();
  }

  private async saveSnapshot(aggregate: OrderAggregate): Promise<void> {
    await dynamodb.put({
      TableName: 'snapshots',
      Item: {
        aggregateId: aggregate.id,
        aggregateType: 'Order',
        version: aggregate.version,
        state: aggregate.toSnapshot(),
        createdAt: Date.now(),
      },
    }).promise();
  }
}

โš ๏ธ

Snapshotting adds complexity. Start without it and add only when event replay becomes a performance bottleneck (typically 1000+ events per aggregate).

CQRS/ES Decision Framework

ScenarioCQRSEvent SourcingBoth
Simple CRUD appNoNoNo
Complex business logicYesOptionalOptional
Audit requirementsOptionalYesYes
High read/write ratioYesOptionalYes
Financial systemsYesYesYes
Real-time analyticsYesOptionalYes

Follow-Up Questions

  1. How do you handle schema evolution for events when the aggregate logic changes over time?
  2. What strategies would you use to query across multiple aggregates without building a monolithic read model?
  3. How do you implement eventual consistency in a UI when using CQRS with asynchronous projections?

Advertisement