Change Data Capture (CDC) with PySpark
Architecture Diagram: CDC Pipeline Overview
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β CHANGE DATA CAPTURE (CDC) ARCHITECTURE β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ£
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β SOURCE SYSTEMS β β
β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β
β β β MySQL β βPostgreSQLβ β Oracle β β MongoDB β β SQL β β β
β β β Database β β Database β β Database β β Database β β Server β β β
β β ββββββ¬ββββββ ββββββ¬ββββββ ββββββ¬ββββββ ββββββ¬ββββββ ββββββ¬ββββββ β β
β β β β β β β β β
β β βΌ βΌ βΌ βΌ βΌ β β
β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β
β β β Binlog β β WAL/ β β Redo β β Oplog/ β β Change β β β
β β β Reader β β Logical β β Log β β Change β β Data β β β
β β β β β Replicationβ β Reader β β Stream β β Feed β β β
β β ββββββ¬ββββββ ββββββ¬ββββββ ββββββ¬ββββββ ββββββ¬ββββββ ββββββ¬ββββββ β β
β βββββββββΌβββββββββββββββΌβββββββββββββββΌβββββββββββββββΌβββββββββββββββΌβββββββββββββββββ β
β β β β β β β
β βΌ βΌ βΌ βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β CDC CAPTURE LAYER β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββ β β β
β β β β Debezium β β AWS DMS β β Oracle β β Custom β β β β
β β β β (Open Src) β β (Managed) β β GoldenGate β β Log β β β β
β β β β β β β β β β Tailing β β β β
β β β ββββββββ¬ββββββββ ββββββββ¬ββββββββ ββββββββ¬ββββββββ βββββββ¬βββββββ β β β
β β β β β β β β β β
β β β βββββββββββββββββββ΄βββββββββ¬βββββββββ΄ββββββββββββββββββ β β β
β β β β β β β
β β β βββββββββββΌββββββββββ β β β
β β β β CDC Event Format β β β β
β β β β βββββββββββββββββ β β β β
β β β β β β’ before β β β β β
β β β β β β’ after β β β β β
β β β β β β’ op (c/u/d) β β β β β
β β β β β β’ source β β β β β
β β β β β β’ timestamp β β β β β
β β β β β β’ transaction β β β β β
β β β β βββββββββββββββββ β β β β
β β β βββββββββββ¬ββββββββββ β β β
β ββββββββββββββββββββββββββββββββββββββββͺββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β EVENT BROKER (Apache Kafka) β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β β
β β β β Topic: β β Topic: β β Topic: β β Topic: β β β β
β β β β dbserver β β dbserver β β dbserver β β __ β β β β
β β β β .users β β .orders β β .productsβ β schema β β β β
β β β β .cdc β β .cdc β β .cdc β β changes β β β β
β β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β PySpark Structured Streaming CDC Processor β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββ β β β
β β β β Deserialize β β Transform β β Dedup β β Apply β β β β
β β β β CDC Events β β & Enrich β β & Window β β MERGE β β β β
β β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β TARGET DATA LAKEHOUSE (Delta Lake) β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββ β β β
β β β β Bronze β β Silver β β Gold β β Real- β β β β
β β β β (Raw CDC) ββββΆ (Cleansed) ββββΆ (Aggregated)ββββΆ Time β β β β
β β β β β β β β β β Dashboardsβ β β β
β β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Architecture Diagram: Debezium CDC Event Flow
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β DEBEZIUM CDC EVENT LIFECYCLE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β SOURCE DATABASE (MySQL/PostgreSQL) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β
β β β Transaction βββββΆβ Binary Log βββββΆβ Debezium β β β
β β β (INSERT/ β β (Binlog/WAL) β β Connector β β β
β β β UPDATE/ β β β β β β β
β β β DELETE) β β Position: β β Reads log β β β
β β β β β file+offset β β sequentially β β β
β β ββββββββββββββββ ββββββββββββββββ ββββββββ¬ββββββββ β β
β β β β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββΌβββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β CDC EVENT ENVELOPE (JSON) β β
β β β β
β β { β β
β β "schema": { ... }, β β
β β "payload": { β β
β β "before": { βββ State BEFORE change (null for INSERT) β β
β β "id": 101, β β
β β "name": "Alice", β β
β β "city": "NYC" β β
β β }, β β
β β "after": { βββ State AFTER change (null for DELETE) β β
β β "id": 101, β β
β β "name": "Alice", β β
β β "city": "LA" β β
β β }, β β
β β "source": { βββ Source metadata β β
β β "version": "2.5.0", β β
β β "connector": "mysql", β β
β β "name": "dbserver1", β β
β β "ts_ms": 1717200000000, β β
β β "snapshot": "false", β β
β β "db": "production", β β
β β "table": "customers" β β
β β }, β β
β β "op": "u", βββ Operation: c=create, u=update, d=delete β β
β β "ts_ms": 1717200000123, β β
β β "transaction": null β β
β β } β β
β β } β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β PYSPARK CDC PROCESSING β β
β β β β
β β Step 1: Deserialize JSON β Structured DataFrame β β
β β Extract before/after payloads, operation type, timestamps β β
β β β β
β β Step 2: Deduplicate within micro-batch β β
β β Keep latest event per (table, key, timestamp) β β
β β β β
β β Step 3: Filter by operation type β β
β β CREATE β INSERT, UPDATE β MERGE, DELETE β DELETE β β
β β β β
β β Step 4: Apply to Delta Lake target β β
β β Use Delta Lake MERGE for atomic upserts β β
β β β β
β β Step 5: Update checkpoint for exactly-once semantics β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Architecture Diagram: Event Sourcing Pattern
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β EVENT SOURCING + CQRS PATTERN β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β COMMAND SIDE (Write) QUERY SIDE (Read) β
β ββββββββββββββββββββββββββββ ββββββββββββββββββββββββββββ β
β β β β β β
β β ββββββββββββββββββββββ β β ββββββββββββββββββββββ β β
β β β Command API β β β β Query API β β β
β β β (REST/gRPC) β β β β (REST/GraphQL) β β β
β β βββββββββββ¬βββββββββββ β β βββββββββββ²βββββββββββ β β
β β β β β β β β
β β βΌ β β β β β
β β ββββββββββββββββββββββ β β ββββββββββββββββββββββ β β
β β β Command Handler β β β β Query Handler β β β
β β β β’ Validate β β β β β’ Read from view β β β
β β β β’ Business Rules β β β β β’ Apply filters β β β
β β β β’ Emit Events β β β β β’ Return results β β β
β β βββββββββββ¬βββββββββββ β β βββββββββββ²βββββββββββ β β
β β β β β β β β
β β βΌ β β β β β
β β ββββββββββββββββββββββ β β ββββββββββββββββββββββ β β
β β β Event Store β β β β Materialized β β β
β β β (Append-Only Log) β β β β View / Read Model β β β
β β β β β β β (Projections) β β β
β β β ββββββββββββββββ β β β β β β β
β β β β Event 1 β β β βββββΆ β β ββββββββββββββββ β β β
β β β β Created β β β β β β Customer β β β β
β β β β {name: Alice}β β β β β β View: β β β β
β β β ββββββββββββββββ€ β β β β β {id: 101, β β β β
β β β β Event 2 β β β βββββΆ β β β name: Alice,β β β β
β β β β Updated β β β β β β city: LA} β β β β
β β β β {city: LA} β β β β β ββββββββββββββββ β β β
β β β ββββββββββββββββ€ β β β ββββββββββββββββββββββ β β
β β β β Event 3 β β β β β β
β β β β Deleted β β β β ββββββββββββββββββββββ β β
β β β ββββββββββββββββ β β β β Audit Log View β β β
β β ββββββββββββββββββββββ β β β (Complete History) β β β
β β β β ββββββββββββββββββββββ β β
β ββββββββββββββββββββββββββββ ββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β EVENT REPLAY (Rebuild State) β β
β β β β
β β Events: [Created, Updated, Updated, Deleted] β β
β β β β
β β Replay: Start β Created β Updated β Updated β Deleted β End β β
β β (state) (Alice) (Alice,LA)(Alice,NYC)(deleted) (null) β β
β β β β
β β Snapshot Optimization: Cache state at Event N, replay from N+1 β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Detailed Explanation
Change Data Capture (CDC) is a design pattern that identifies and captures changes made to data in a database, then delivers those changes in real-time to downstream systems. Unlike batch-based ETL that queries the entire source table periodically, CDC captures only the delta (inserts, updates, deletes) as they occur, enabling near-real-time data synchronization with minimal source system impact.
The fundamental challenge CDC solves is data synchronization between operational systems (OLTP) and analytical systems (OLAP/data lakes). Traditional ETL approaches poll source tables using timestamp columns or checksums, which misses deletes, creates race conditions, and requires full-table scans that degrade source system performance. CDC eliminates these issues by reading the database's transaction log (binlog for MySQL, WAL for PostgreSQL, redo logs for Oracle), which already records every change atomically.
Debezium is the most widely-used open-source CDC platform, built on Apache Kafka Connect. It reads database transaction logs and produces change events in a standardized JSON envelope format containing before state (for updates/deletes), after state (for creates/updates), operation type (c for create, u for update, d for delete), source metadata (connector name, database, table, timestamp), and transaction information. This envelope format enables downstream consumers to reconstruct the exact sequence of changes.
PySpark Structured Streaming integrates with CDC through several approaches: reading Kafka topics containing CDC events and applying them to Delta Lake tables using MERGE operations; using Delta Lake's built-in CDC feed capability (appendOnly + cdc format) for downstream consumption; or implementing custom watermark-based deduplication for handling out-of-order events.
Event Sourcing, closely related to CDC, stores every state change as an immutable event rather than overwriting current state. This provides a complete audit trail, enables temporal queries ("what was the state at time T?"), supports event replay for rebuilding materialized views, and decouples write and read models (CQRS pattern). In PySpark, event sourcing implementations typically write events to an append-only Delta table and use streaming aggregations to maintain materialized views.
The key challenge in CDC processing is handling exactly-once semantics in distributed systems. PySpark Structured Streaming achieves this through checkpoint-based offset tracking, idempotent MERGE operations on Delta Lake, and watermark-based handling of late-arriving events. The combination of Kafka's at-least-once delivery with Delta Lake's ACID transactions and idempotent MERGE operations provides effectively exactly-once processing semantics.
Key Concepts Table
Mathematical Foundations
Definition: Change Data Capture
CDC captures row-level changes (inserts, updates, deletes) from source database and propagates them to target such that:
where is the initial snapshot and is the change set at time .
Event Ordering
For CDC events affecting the same key , the ordering must satisfy:
Out-of-order events require buffering with timeout :
Merge Correctness Theorem
CDC merge is correct if the merge function is commutative and associative:
This ensures that applying changes in any order produces the same result.
Log Retention
For CDC with retention period and average change rate bytes/sec:
Debezium log retention must exceed maximum replication lag .
Throughput Capacity
Maximum CDC throughput with parallel consumers:
Bottleneck is typically the slowest component.
Key Insight
Event sourcing with CDC provides a complete audit trail but requires compaction to manage storage growth. Debezium's outbox pattern guarantees exactly-once delivery by writing events to an outbox table in the same transaction as business data.
Summary
CDC enables real-time data synchronization with ordering guarantees. Merge operations must be commutative for correctness. Log retention must exceed replication lag. Throughput is bounded by the slowest component in the pipeline.
Key Concepts Table (cont.)
| Component | Description | Failure Mode | Recovery Strategy |
|---|---|---|---|
| Source DB Transaction Log | Binlog/WAL recording all changes | Log rotation/truncation | Snapshot + resume from log position |
| Debezium Connector | Reads log, produces Kafka events | Connector crash | Restart from last committed offset |
| Kafka Topic | Durable event buffer | Broker failure | Replication factor 3+ |
| PySpark Structured Streaming | Micro-batch CDC processor | Executor failure | Checkpoint recovery |
| Delta Lake MERGE | Atomic upsert to target | Write failure | Retry from checkpoint |
| Checkpoint Directory | Offset tracking for exactly-once | Corruption | Restore from backup |
| Watermark | Late event handling | Clock skew | Grace period configuration |
Code Examples
Example 1: Reading CDC Events from Kafka with PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("CDC-From-Kafka") \
.config("spark.sql.streaming.schemaInference", "true") \
.getOrCreate()
# Define CDC event schema
cdc_event_schema = StructType([
StructField("schema", StructType([
StructField("type", StringType()),
StructField("fields", ArrayType(StructType([
StructField("name", StringType()),
StructField("type", StringType()),
StructField("fields", ArrayType(StructType([
StructField("name", StringType()),
StructField("type", StringType()),
])))
])))
])),
StructField("payload", StructType([
StructField("before", MapType(StringType(), StringType())),
StructField("after", MapType(StringType(), StringType())),
StructField("source", StructType([
StructField("version", StringType()),
StructField("connector", StringType()),
StructField("name", StringType()),
StructField("ts_ms", LongType()),
StructField("snapshot", StringType()),
StructField("db", StringType()),
StructField("table", StringType()),
])),
StructField("op", StringType()),
StructField("ts_ms", LongType()),
StructField("transaction", StructType([
StructField("id", StringType()),
StructField("total_order", IntegerType()),
StructField("data_collection_order", IntegerType()),
])),
]))
])
# Read CDC events from Kafka
cdc_stream = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092")
.option("subscribe", "dbserver1.public.customers,dbserver1.public.orders")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("maxOffsetsPerTrigger", 100000)
.load()
)
# Parse and flatten CDC events
parsed_cdc = (
cdc_stream
.select(
col("key").cast("string").alias("kafka_key"),
from_json(col("value").cast("string"), cdc_event_schema).alias("cdc"),
col("topic"),
col("partition"),
col("offset"),
col("timestamp").alias("kafka_timestamp")
)
.select(
col("kafka_key"),
col("cdc.payload.before").alias("before"),
col("cdc.payload.after").alias("after"),
col("cdc.payload.op").alias("operation"),
col("cdc.payload.source.ts_ms").alias("source_ts_ms"),
col("cdc.payload.source.db").alias("source_db"),
col("cdc.payload.source.table").alias("source_table"),
col("cdc.payload.source.connector").alias("connector"),
col("topic"),
col("partition"),
col("offset"),
col("kafka_timestamp"),
# Deduplication key
concat(
col("topic"),
lit("-"),
col("partition"),
lit("-"),
col("offset")
).alias("event_id"),
# Processing timestamp
current_timestamp().alias("processed_at")
)
)
# βββ Write to Bronze Layer (Raw CDC Events) βββ
def write_bronze_batch(batch_df, batch_id):
"""Write raw CDC events to Bronze layer with idempotency."""
if batch_df.count() == 0:
return
(
batch_df
.write
.format("delta")
.mode("append")
.save("/mnt/lakehouse/bronze/cdc_events")
)
bronze_query = (
parsed_cdc
.writeStream
.foreachBatch(write_bronze_batch)
.outputMode("append")
.option("checkpointLocation", "/mnt/checkpoints/cdc_bronze")
.trigger(processingTime="30 seconds")
.start()
)
bronze_query.awaitTermination()
Example 2: Applying CDC to Delta Lake with MERGE
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import DeltaTable
spark = SparkSession.builder \
.appName("CDC-Apply-to-Delta") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.databricks.delta.schema.autoMerge.enabled", "true") \
.getOrCreate()
def apply_cdc_to_delta(micro_batch_df, batch_id):
"""Apply CDC events to Delta Lake target table using MERGE."""
if micro_batch_df.count() == 0:
return
# Separate operations
creates = micro_batch_df.filter(col("operation") == "c")
updates = micro_batch_df.filter(col("operation") == "u")
deletes = micro_batch_df.filter(col("operation") == "d")
target_table = DeltaTable.forPath(spark, "/mnt/lakehouse/silver/customers")
# Apply CREATE events (INSERT)
if creates.count() > 0:
new_records = (
creates
.select("after.*") # Flatten the 'after' map to columns
.withColumn("_cdc_operation", lit("INSERT"))
.withColumn("_cdc_timestamp", col("source_ts_ms").cast("timestamp"))
.withColumn("_event_id", col("event_id"))
)
(
target_table.alias("target")
.merge(
new_records.alias("source"),
"target.id = source.id"
)
.whenNotMatchedInsertAll()
.execute()
)
# Apply UPDATE events (MERGE)
if updates.count() > 0:
updated_records = (
updates
.select("after.*")
.withColumn("_cdc_operation", lit("UPDATE"))
.withColumn("_cdc_timestamp", col("source_ts_ms").cast("timestamp"))
.withColumn("_event_id", col("event_id"))
)
(
target_table.alias("target")
.merge(
updated_records.alias("source"),
"target.id = source.id"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# Apply DELETE events
if deletes.count() > 0:
deleted_keys = deletes.select(
col("before.id").alias("id")
).distinct()
(
target_table.alias("target")
.merge(
deleted_keys.alias("source"),
"target.id = source.id"
)
.whenMatchedDelete()
.execute()
)
# βββ Stream CDC Events and Apply to Target βββ
cdc_stream = (
spark.readStream
.format("delta")
.load("/mnt/lakehouse/bronze/cdc_events")
.filter(
col("source_table").isin("customers", "orders")
)
)
# Deduplicate within micro-batch (keep latest per event_id)
deduped_stream = (
cdc_stream
.withColumn("row_num",
row_number().over(
Window.partitionBy("event_id")
.orderBy(col("processed_at").desc())
)
)
.filter(col("row_num") == 1)
.drop("row_num")
)
# Write with MERGE
merge_query = (
deduped_stream
.writeStream
.foreachBatch(apply_cdc_to_delta)
.outputMode("update")
.option("checkpointLocation", "/mnt/checkpoints/cdc_merge")
.trigger(processingTime="60 seconds")
.start()
)
merge_query.awaitTermination()
Example 3: Event Sourcing Implementation with PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable
spark = SparkSession.builder \
.appName("Event-Sourcing-CDC") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.getOrCreate()
# βββ Event Store (Append-Only) βββ
event_schema = StructType([
StructField("event_id", StringType(), False),
StructField("event_type", StringType(), False),
StructField("aggregate_id", StringType(), False),
StructField("aggregate_type", StringType(), False),
StructField("event_data", MapType(StringType(), StringType())),
StructField("metadata", MapType(StringType(), StringType())),
StructField("event_version", IntegerType()),
StructField("created_at", TimestampType()),
])
# Create event store
spark.sql("""
CREATE TABLE IF NOT EXISTS event_store (
event_id STRING NOT NULL,
event_type STRING NOT NULL,
aggregate_id STRING NOT NULL,
aggregate_type STRING NOT NULL,
event_data MAP<STRING, STRING>,
metadata MAP<STRING, STRING>,
event_version INT,
created_at TIMESTAMP
)
USING DELTA
PARTITIONED BY (aggregate_type)
TBLPROPERTIES (
'delta.appendOnly' = 'true',
'delta.logRetentionDuration' = 'interval 365 days'
)
""")
# βββ Emit Events βββ
def emit_event(aggregate_id, aggregate_type, event_type, event_data, metadata=None):
"""Emit an event to the event store."""
event = spark.createDataFrame([{
"event_id": str(uuid.uuid4()),
"event_type": event_type,
"aggregate_id": aggregate_id,
"aggregate_type": aggregate_type,
"event_data": event_data,
"metadata": metadata or {},
"event_version": 1,
"created_at": datetime.now(),
}])
event.write.format("delta").mode("append").save("/mnt/lakehouse/events")
# Example: Customer lifecycle events
emit_event(
aggregate_id="C101",
aggregate_type="customer",
event_type="CustomerCreated",
event_data={"name": "Alice Johnson", "email": "alice@email.com", "city": "NYC"},
metadata={"source": "registration_api", "correlation_id": "req-123"}
)
emit_event(
aggregate_id="C101",
aggregate_type="customer",
event_type="CustomerAddressChanged",
event_data={"old_city": "NYC", "new_city": "LA", "change_reason": "relocation"},
metadata={"source": "profile_api", "correlation_id": "req-456"}
)
emit_event(
aggregate_id="C101",
aggregate_type="customer",
event_type="CustomerEmailUpdated",
event_data={"old_email": "alice@email.com", "new_email": "alice.new@email.com"},
metadata={"source": "profile_api", "correlation_id": "req-789"}
)
# βββ Build Materialized View (Projection) βββ
def build_customer_view(events_df, aggregate_id=None):
"""Rebuild customer state from events (event replay)."""
filtered = events_df if aggregate_id is None else \
events_df.filter(col("aggregate_id") == aggregate_id)
# Sort by version to replay events in order
ordered = filtered.orderBy("event_version", "created_at")
# Apply events to build current state
state = ordered.groupBy("aggregate_id").agg(
# Extract fields from event_data maps
last(
when(col("event_type") == "CustomerCreated", col("event_data")["name"]),
ignorenulls=True
).alias("name"),
last(
when(col("event_type").isin("CustomerCreated", "CustomerEmailUpdated"),
col("event_data")["email"]),
ignorenulls=True
).alias("email"),
last(
when(col("event_type").isin("CustomerCreated", "CustomerAddressChanged"),
coalesce(col("event_data")["new_city"], col("event_data")["city"])),
ignorenulls=True
).alias("city"),
count("*").alias("total_events"),
max("created_at").alias("last_updated"),
first("created_at").alias("created_at"),
)
return state
# Build view from all events
events_df = spark.read.format("delta").load("/mnt/lakehouse/events")
customer_view = build_customer_view(events_df)
customer_view.show()
# Build view for specific customer (point-in-time query)
def build_state_at_time(events_df, aggregate_id, as_of_timestamp):
"""Build state as of a specific timestamp."""
return (
events_df
.filter(
(col("aggregate_id") == aggregate_id) &
(col("created_at") <= as_of_timestamp)
)
.orderBy("event_version", "created_at")
.groupBy("aggregate_id")
.agg(
last(
when(col("event_type") == "CustomerCreated", col("event_data")["name"]),
ignorenulls=True
).alias("name"),
last(
when(col("event_type").isin("CustomerCreated", "CustomerEmailUpdated"),
col("event_data")["email"]),
ignorenulls=True
).alias("email"),
)
)
# Query state at specific time
state_at_may = build_state_at_time(
events_df, "C101",
"2026-05-15"
)
state_at_may.show()
Performance Metrics
| Metric | Batch ETL (Polling) | CDC (Debezium) | Improvement |
|---|---|---|---|
| Latency (Source β Target) | 4-24 hours | 5-30 seconds | 99.9% reduction |
| Source DB Load | 15-30% CPU (full scans) | 1-3% CPU (log read) | 90% reduction |
| Data Transferred | 100% of table per run | Only changes (0.1-5%) | 95-99% reduction |
| Storage (Delta Lake) | Full copies per snapshot | Event log only | 80% reduction |
| Query Freshness | Stale by hours | Near real-time | Sub-minute |
| Delete Detection | Impossible (without flags) | Automatic | New capability |
| Ordering Guarantees | None (race conditions) | Transaction-ordered | Strict ordering |
| Exactly-Once | Difficult (dedup needed) | Checkpoint-based | Guaranteed |
| Operational Complexity | Low | Medium-High | Trade-off |
| Cost (Compute) | 0.15/hour (streaming) | 70% reduction |
Best Practices
-
Use Debezium for database CDC β Debezium provides the most mature, well-tested CDC connectors for MySQL, PostgreSQL, SQL Server, Oracle, and MongoDB. It handles log reading, snapshotting, and event serialization correctly.
-
Enable Kafka topic compaction for latest-state queries β For use cases that need the current state without replaying all events, configure Kafka topic compaction (cleanup.policy=compact) to retain only the latest value per key.
-
Implement watermark-based deduplication β Use PySpark's watermark feature (
withWatermark) to handle late-arriving events and ensure idempotent processing within configurable time windows. -
Separate bronze and silver CDC processing β Write raw CDC events to Bronze first, then process and apply to Silver. This provides an audit trail and enables reprocessing if logic changes.
-
Use Delta Lake MERGE for idempotent applies β MERGE operations are idempotent by design; running the same MERGE twice produces the same result. This is critical for exactly-once semantics in distributed systems.
-
Monitor Kafka consumer lag β Set up alerts when consumer lag exceeds thresholds. High lag indicates the CDC processor cannot keep up with the source change rate.
-
Handle schema evolution in CDC events β Use Delta Lake's
mergeSchemaoption and Debezium's schema history tracking to handle source schema changes without breaking the CDC pipeline. -
Implement dead-letter queues β Route events that fail deserialization or validation to a dead-letter queue for manual inspection rather than blocking the entire pipeline.
-
Snapshot before CDC starts β When setting up CDC for the first time, take a consistent snapshot of the source database and replay it through the CDC pipeline to establish a baseline.
-
Tune micro-batch frequency β Balance latency vs. throughput by adjusting the trigger interval. For most use cases, 30-60 seconds provides a good balance; for sub-second latency, consider continuous processing mode.
See also: Snowflake Time Travel (snowflake/02), Kafka CDC (kafka/04), Airflow DAGs (airflow/02)