Change Data Capture (CDC) with PySpark

Free Lesson

Advertisement

Change Data Capture (CDC) with PySpark

Architecture Diagram: CDC Pipeline Overview

Architecture Diagram
╔══════════════════════════════════════════════════════════════════════════════════════════╗
β•‘                    CHANGE DATA CAPTURE (CDC) ARCHITECTURE                                β•‘
╠══════════════════════════════════════════════════════════════════════════════════════════╣
β•‘                                                                                          β•‘
β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β•‘
β•‘  β”‚                         SOURCE SYSTEMS                                             β”‚  β•‘
β•‘  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”‚  β•‘
β•‘  β”‚  β”‚  MySQL   β”‚  β”‚PostgreSQLβ”‚  β”‚  Oracle  β”‚  β”‚  MongoDB β”‚  β”‚  SQL     β”‚            β”‚  β•‘
β•‘  β”‚  β”‚ Database β”‚  β”‚ Database β”‚  β”‚ Database β”‚  β”‚ Database β”‚  β”‚  Server  β”‚            β”‚  β•‘
β•‘  β”‚  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜            β”‚  β•‘
β•‘  β”‚       β”‚              β”‚              β”‚              β”‚              β”‚                  β”‚  β•‘
β•‘  β”‚       β–Ό              β–Ό              β–Ό              β–Ό              β–Ό                  β”‚  β•‘
β•‘  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”‚  β•‘
β•‘  β”‚  β”‚  Binlog  β”‚  β”‚ WAL/     β”‚  β”‚  Redo    β”‚  β”‚  Oplog/  β”‚  β”‚  Change  β”‚            β”‚  β•‘
β•‘  β”‚  β”‚  Reader  β”‚  β”‚ Logical  β”‚  β”‚  Log     β”‚  β”‚  Change  β”‚  β”‚  Data    β”‚            β”‚  β•‘
β•‘  β”‚  β”‚          β”‚  β”‚ Replicationβ”‚ β”‚  Reader  β”‚  β”‚  Stream  β”‚  β”‚  Feed    β”‚            β”‚  β•‘
β•‘  β”‚  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜            β”‚  β•‘
β•‘  β””β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β•‘
β•‘          β”‚              β”‚              β”‚              β”‚              β”‚                    β•‘
β•‘          β–Ό              β–Ό              β–Ό              β–Ό              β–Ό                    β•‘
β•‘  ╔═══════════════════════════════════════════════════════════════════════════════╗     β•‘
β•‘  β•‘                       CDC CAPTURE LAYER                                         β•‘     β•‘
β•‘  β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β•‘     β•‘
β•‘  β•‘  β”‚                                                                         β”‚   β•‘     β•‘
β•‘  β•‘  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β•‘     β•‘
β•‘  β•‘  β”‚  β”‚   Debezium   β”‚  β”‚  AWS DMS     β”‚  β”‚  Oracle      β”‚  β”‚  Custom    β”‚  β”‚   β•‘     β•‘
β•‘  β•‘  β”‚  β”‚   (Open Src) β”‚  β”‚  (Managed)   β”‚  β”‚  GoldenGate  β”‚  β”‚  Log       β”‚  β”‚   β•‘     β•‘
β•‘  β•‘  β”‚  β”‚              β”‚  β”‚              β”‚  β”‚              β”‚  β”‚  Tailing   β”‚  β”‚   β•‘     β•‘
β•‘  β•‘  β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜  β”‚   β•‘     β•‘
β•‘  β•‘  β”‚         β”‚                 β”‚                 β”‚                 β”‚          β”‚   β•‘     β•‘
β•‘  β•‘  β”‚         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜          β”‚   β•‘     β•‘
β•‘  β•‘  β”‚                                    β”‚                                     β”‚   β•‘     β•‘
β•‘  β•‘  β”‚                          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                           β”‚   β•‘     β•‘
β•‘  β•‘  β”‚                          β”‚  CDC Event Format  β”‚                           β”‚   β•‘     β•‘
β•‘  β•‘  β”‚                          β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚                           β”‚   β•‘     β•‘
β•‘  β•‘  β”‚                          β”‚  β”‚ β€’ before      β”‚ β”‚                           β”‚   β•‘     β•‘
β•‘  β•‘  β”‚                          β”‚  β”‚ β€’ after       β”‚ β”‚                           β”‚   β•‘     β•‘
β•‘  β•‘  β”‚                          β”‚  β”‚ β€’ op (c/u/d)  β”‚ β”‚                           β”‚   β•‘     β•‘
β•‘  β•‘  β”‚                          β”‚  β”‚ β€’ source      β”‚ β”‚                           β”‚   β•‘     β•‘
β•‘  β•‘  β”‚                          β”‚  β”‚ β€’ timestamp   β”‚ β”‚                           β”‚   β•‘     β•‘
β•‘  β•‘  β”‚                          β”‚  β”‚ β€’ transaction β”‚ β”‚                           β”‚   β•‘     β•‘
β•‘  β•‘  β”‚                          β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚                           β”‚   β•‘     β•‘
β•‘  β•‘  β”‚                          β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                           β”‚   β•‘     β•‘
β•‘  β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•ͺ═══════════════════════════════════════╝     β•‘
β•‘                                         β”‚                                               β•‘
β•‘                                         β–Ό                                               β•‘
β•‘  ╔═══════════════════════════════════════════════════════════════════════════════╗     β•‘
β•‘  β•‘                       EVENT BROKER (Apache Kafka)                               β•‘     β•‘
β•‘  β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β•‘     β•‘
β•‘  β•‘  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”               β”‚   β•‘     β•‘
β•‘  β•‘  β”‚  β”‚ Topic:   β”‚  β”‚ Topic:   β”‚  β”‚ Topic:   β”‚  β”‚ Topic:   β”‚               β”‚   β•‘     β•‘
β•‘  β•‘  β”‚  β”‚ dbserver β”‚  β”‚ dbserver β”‚  β”‚ dbserver β”‚  β”‚ __       β”‚               β”‚   β•‘     β•‘
β•‘  β•‘  β”‚  β”‚ .users   β”‚  β”‚ .orders  β”‚  β”‚ .productsβ”‚  β”‚ schema   β”‚               β”‚   β•‘     β•‘
β•‘  β•‘  β”‚  β”‚ .cdc     β”‚  β”‚ .cdc     β”‚  β”‚ .cdc     β”‚  β”‚ changes  β”‚               β”‚   β•‘     β•‘
β•‘  β•‘  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜               β”‚   β•‘     β•‘
β•‘  β•‘  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β•‘     β•‘
β•‘  β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•     β•‘
β•‘                                         β”‚                                               β•‘
β•‘                                         β–Ό                                               β•‘
β•‘  ╔═══════════════════════════════════════════════════════════════════════════════╗     β•‘
β•‘  β•‘                 PySpark Structured Streaming CDC Processor                      β•‘     β•‘
β•‘  β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β•‘     β•‘
β•‘  β•‘  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β•‘     β•‘
β•‘  β•‘  β”‚  β”‚  Deserialize  β”‚  β”‚   Transform  β”‚  β”‚   Dedup      β”‚  β”‚  Apply     β”‚  β”‚   β•‘     β•‘
β•‘  β•‘  β”‚  β”‚  CDC Events   β”‚  β”‚   & Enrich   β”‚  β”‚   & Window   β”‚  β”‚  MERGE     β”‚  β”‚   β•‘     β•‘
β•‘  β•‘  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β•‘     β•‘
β•‘  β•‘  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β•‘     β•‘
β•‘  β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•     β•‘
β•‘                                         β”‚                                               β•‘
β•‘                                         β–Ό                                               β•‘
β•‘  ╔═══════════════════════════════════════════════════════════════════════════════╗     β•‘
β•‘  β•‘                    TARGET DATA LAKEHOUSE (Delta Lake)                           β•‘     β•‘
β•‘  β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β•‘     β•‘
β•‘  β•‘  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β•‘     β•‘
β•‘  β•‘  β”‚  β”‚   Bronze     β”‚  β”‚   Silver     β”‚  β”‚   Gold       β”‚  β”‚   Real-    β”‚  β”‚   β•‘     β•‘
β•‘  β•‘  β”‚  β”‚   (Raw CDC)  │──▢  (Cleansed) │──▢  (Aggregated)│──▢  Time      β”‚  β”‚   β•‘     β•‘
β•‘  β•‘  β”‚  β”‚              β”‚  β”‚              β”‚  β”‚              β”‚  β”‚   Dashboardsβ”‚  β”‚   β•‘     β•‘
β•‘  β•‘  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β•‘     β•‘
β•‘  β•‘  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β•‘     β•‘
β•‘  β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•     β•‘
β•‘                                                                                          β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•

Architecture Diagram: Debezium CDC Event Flow

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    DEBEZIUM CDC EVENT LIFECYCLE                                       β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                                     β”‚
β”‚  SOURCE DATABASE (MySQL/PostgreSQL)                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                                                                             β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                  β”‚   β”‚
β”‚  β”‚  β”‚ Transaction  │───▢│ Binary Log   │───▢│  Debezium     β”‚                  β”‚   β”‚
β”‚  β”‚  β”‚ (INSERT/     β”‚    β”‚ (Binlog/WAL) β”‚    β”‚  Connector    β”‚                  β”‚   β”‚
β”‚  β”‚  β”‚  UPDATE/     β”‚    β”‚              β”‚    β”‚              β”‚                  β”‚   β”‚
β”‚  β”‚  β”‚  DELETE)     β”‚    β”‚  Position:   β”‚    β”‚  Reads log    β”‚                  β”‚   β”‚
β”‚  β”‚  β”‚              β”‚    β”‚  file+offset β”‚    β”‚  sequentially β”‚                  β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜                  β”‚   β”‚
β”‚  β”‚                                                  β”‚                          β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                     β”‚                              β”‚
β”‚                                                     β–Ό                              β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    CDC EVENT ENVELOPE (JSON)                                 β”‚   β”‚
β”‚  β”‚                                                                             β”‚   β”‚
β”‚  β”‚  {                                                                          β”‚   β”‚
β”‚  β”‚    "schema": { ... },                                                       β”‚   β”‚
β”‚  β”‚    "payload": {                                                             β”‚   β”‚
β”‚  β”‚      "before": {          ◄── State BEFORE change (null for INSERT)         β”‚   β”‚
β”‚  β”‚        "id": 101,                                                            β”‚   β”‚
β”‚  β”‚        "name": "Alice",                                                      β”‚   β”‚
β”‚  β”‚        "city": "NYC"                                                         β”‚   β”‚
β”‚  β”‚      },                                                                     β”‚   β”‚
β”‚  β”‚      "after": {           ◄── State AFTER change (null for DELETE)          β”‚   β”‚
β”‚  β”‚        "id": 101,                                                            β”‚   β”‚
β”‚  β”‚        "name": "Alice",                                                      β”‚   β”‚
β”‚  β”‚        "city": "LA"                                                          β”‚   β”‚
β”‚  β”‚      },                                                                     β”‚   β”‚
β”‚  β”‚      "source": {           ◄── Source metadata                              β”‚   β”‚
β”‚  β”‚        "version": "2.5.0",                                                  β”‚   β”‚
β”‚  β”‚        "connector": "mysql",                                                β”‚   β”‚
β”‚  β”‚        "name": "dbserver1",                                                 β”‚   β”‚
β”‚  β”‚        "ts_ms": 1717200000000,                                              β”‚   β”‚
β”‚  β”‚        "snapshot": "false",                                                 β”‚   β”‚
β”‚  β”‚        "db": "production",                                                  β”‚   β”‚
β”‚  β”‚        "table": "customers"                                                 β”‚   β”‚
β”‚  β”‚      },                                                                     β”‚   β”‚
β”‚  β”‚      "op": "u",            ◄── Operation: c=create, u=update, d=delete     β”‚   β”‚
β”‚  β”‚      "ts_ms": 1717200000123,                                                β”‚   β”‚
β”‚  β”‚      "transaction": null                                                     β”‚   β”‚
β”‚  β”‚    }                                                                        β”‚   β”‚
β”‚  β”‚  }                                                                          β”‚   β”‚
β”‚  β”‚                                                                             β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                     β”‚                              β”‚
β”‚                                                     β–Ό                              β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    PYSPARK CDC PROCESSING                                   β”‚   β”‚
β”‚  β”‚                                                                             β”‚   β”‚
β”‚  β”‚  Step 1: Deserialize JSON β†’ Structured DataFrame                            β”‚   β”‚
β”‚  β”‚          Extract before/after payloads, operation type, timestamps          β”‚   β”‚
β”‚  β”‚                                                                             β”‚   β”‚
β”‚  β”‚  Step 2: Deduplicate within micro-batch                                     β”‚   β”‚
β”‚  β”‚          Keep latest event per (table, key, timestamp)                      β”‚   β”‚
β”‚  β”‚                                                                             β”‚   β”‚
β”‚  β”‚  Step 3: Filter by operation type                                           β”‚   β”‚
β”‚  β”‚          CREATE β†’ INSERT, UPDATE β†’ MERGE, DELETE β†’ DELETE                   β”‚   β”‚
β”‚  β”‚                                                                             β”‚   β”‚
β”‚  β”‚  Step 4: Apply to Delta Lake target                                         β”‚   β”‚
β”‚  β”‚          Use Delta Lake MERGE for atomic upserts                            β”‚   β”‚
β”‚  β”‚                                                                             β”‚   β”‚
β”‚  β”‚  Step 5: Update checkpoint for exactly-once semantics                       β”‚   β”‚
β”‚  β”‚                                                                             β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                                     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Architecture Diagram: Event Sourcing Pattern

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    EVENT SOURCING + CQRS PATTERN                                     β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                                     β”‚
β”‚  COMMAND SIDE (Write)                 QUERY SIDE (Read)                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                  β”‚
β”‚  β”‚                          β”‚        β”‚                          β”‚                  β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚        β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚                  β”‚
β”‚  β”‚  β”‚   Command API      β”‚  β”‚        β”‚  β”‚   Query API        β”‚  β”‚                  β”‚
β”‚  β”‚  β”‚   (REST/gRPC)      β”‚  β”‚        β”‚  β”‚   (REST/GraphQL)   β”‚  β”‚                  β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚        β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β–²β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚                  β”‚
β”‚  β”‚            β”‚              β”‚        β”‚            β”‚              β”‚                  β”‚
β”‚  β”‚            β–Ό              β”‚        β”‚            β”‚              β”‚                  β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚        β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚                  β”‚
β”‚  β”‚  β”‚  Command Handler   β”‚  β”‚        β”‚  β”‚  Query Handler     β”‚  β”‚                  β”‚
β”‚  β”‚  β”‚  β€’ Validate        β”‚  β”‚        β”‚  β”‚  β€’ Read from view  β”‚  β”‚                  β”‚
β”‚  β”‚  β”‚  β€’ Business Rules  β”‚  β”‚        β”‚  β”‚  β€’ Apply filters   β”‚  β”‚                  β”‚
β”‚  β”‚  β”‚  β€’ Emit Events     β”‚  β”‚        β”‚  β”‚  β€’ Return results  β”‚  β”‚                  β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚        β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β–²β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚                  β”‚
β”‚  β”‚            β”‚              β”‚        β”‚            β”‚              β”‚                  β”‚
β”‚  β”‚            β–Ό              β”‚        β”‚            β”‚              β”‚                  β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚        β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚                  β”‚
β”‚  β”‚  β”‚  Event Store       β”‚  β”‚        β”‚  β”‚  Materialized      β”‚  β”‚                  β”‚
β”‚  β”‚  β”‚  (Append-Only Log) β”‚  β”‚        β”‚  β”‚  View / Read Model β”‚  β”‚                  β”‚
β”‚  β”‚  β”‚                    β”‚  β”‚        β”‚  β”‚  (Projections)     β”‚  β”‚                  β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚  β”‚        β”‚  β”‚                    β”‚  β”‚                  β”‚
β”‚  β”‚  β”‚  β”‚ Event 1      β”‚  β”‚  β”‚  ────▢ β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚  β”‚                  β”‚
β”‚  β”‚  β”‚  β”‚ Created      β”‚  β”‚  β”‚        β”‚  β”‚  β”‚ Customer     β”‚  β”‚  β”‚                  β”‚
β”‚  β”‚  β”‚  β”‚ {name: Alice}β”‚  β”‚  β”‚        β”‚  β”‚  β”‚ View:        β”‚  β”‚  β”‚                  β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€  β”‚  β”‚        β”‚  β”‚  β”‚ {id: 101,    β”‚  β”‚  β”‚                  β”‚
β”‚  β”‚  β”‚  β”‚ Event 2      β”‚  β”‚  β”‚  ────▢ β”‚  β”‚  β”‚  name: Alice,β”‚  β”‚  β”‚                  β”‚
β”‚  β”‚  β”‚  β”‚ Updated      β”‚  β”‚  β”‚        β”‚  β”‚  β”‚  city: LA}   β”‚  β”‚  β”‚                  β”‚
β”‚  β”‚  β”‚  β”‚ {city: LA}   β”‚  β”‚  β”‚        β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚  β”‚                  β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€  β”‚  β”‚        β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚                  β”‚
β”‚  β”‚  β”‚  β”‚ Event 3      β”‚  β”‚  β”‚        β”‚                          β”‚                  β”‚
β”‚  β”‚  β”‚  β”‚ Deleted      β”‚  β”‚  β”‚        β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚                  β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚  β”‚        β”‚  β”‚  Audit Log View    β”‚  β”‚                  β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚        β”‚  β”‚  (Complete History) β”‚  β”‚                  β”‚
β”‚  β”‚                          β”‚        β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚                  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                  β”‚
β”‚                                                                                     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    EVENT REPLAY (Rebuild State)                              β”‚   β”‚
β”‚  β”‚                                                                             β”‚   β”‚
β”‚  β”‚  Events: [Created, Updated, Updated, Deleted]                               β”‚   β”‚
β”‚  β”‚                                                                             β”‚   β”‚
β”‚  β”‚  Replay: Start β†’ Created β†’ Updated β†’ Updated β†’ Deleted β†’ End               β”‚   β”‚
β”‚  β”‚          (state) (Alice)  (Alice,LA)(Alice,NYC)(deleted) (null)             β”‚   β”‚
β”‚  β”‚                                                                             β”‚   β”‚
β”‚  β”‚  Snapshot Optimization: Cache state at Event N, replay from N+1            β”‚   β”‚
β”‚  β”‚                                                                             β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                                     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Detailed Explanation

Change Data Capture (CDC) is a design pattern that identifies and captures changes made to data in a database, then delivers those changes in real-time to downstream systems. Unlike batch-based ETL that queries the entire source table periodically, CDC captures only the delta (inserts, updates, deletes) as they occur, enabling near-real-time data synchronization with minimal source system impact.

The fundamental challenge CDC solves is data synchronization between operational systems (OLTP) and analytical systems (OLAP/data lakes). Traditional ETL approaches poll source tables using timestamp columns or checksums, which misses deletes, creates race conditions, and requires full-table scans that degrade source system performance. CDC eliminates these issues by reading the database's transaction log (binlog for MySQL, WAL for PostgreSQL, redo logs for Oracle), which already records every change atomically.

Debezium is the most widely-used open-source CDC platform, built on Apache Kafka Connect. It reads database transaction logs and produces change events in a standardized JSON envelope format containing before state (for updates/deletes), after state (for creates/updates), operation type (c for create, u for update, d for delete), source metadata (connector name, database, table, timestamp), and transaction information. This envelope format enables downstream consumers to reconstruct the exact sequence of changes.

PySpark Structured Streaming integrates with CDC through several approaches: reading Kafka topics containing CDC events and applying them to Delta Lake tables using MERGE operations; using Delta Lake's built-in CDC feed capability (appendOnly + cdc format) for downstream consumption; or implementing custom watermark-based deduplication for handling out-of-order events.

Event Sourcing, closely related to CDC, stores every state change as an immutable event rather than overwriting current state. This provides a complete audit trail, enables temporal queries ("what was the state at time T?"), supports event replay for rebuilding materialized views, and decouples write and read models (CQRS pattern). In PySpark, event sourcing implementations typically write events to an append-only Delta table and use streaming aggregations to maintain materialized views.

The key challenge in CDC processing is handling exactly-once semantics in distributed systems. PySpark Structured Streaming achieves this through checkpoint-based offset tracking, idempotent MERGE operations on Delta Lake, and watermark-based handling of late-arriving events. The combination of Kafka's at-least-once delivery with Delta Lake's ACID transactions and idempotent MERGE operations provides effectively exactly-once processing semantics.

Key Concepts Table

Mathematical Foundations

Definition: Change Data Capture

CDC captures row-level changes Ξ”R={I,U,D}\Delta R = \{I, U, D\} (inserts, updates, deletes) from source database SS and propagates them to target TT such that:

T(t)=T(t0)βˆͺ⋃ti≀tΞ”R(ti)T(t) = T(t_0) \cup \bigcup_{t_i \leq t} \Delta R(t_i)

where T(t0)T(t_0) is the initial snapshot and Ξ”R(ti)\Delta R(t_i) is the change set at time tit_i.

Event Ordering

For CDC events e1,e2e_1, e_2 affecting the same key kk, the ordering must satisfy:

Version(e1)<Version(e2)β€…β€ŠβŸΉβ€…β€ŠApply(e1)Β beforeΒ Apply(e2)\text{Version}(e_1) < \text{Version}(e_2) \implies \text{Apply}(e_1) \text{ before } \text{Apply}(e_2)

Out-of-order events require buffering with timeout TbufferT_{\text{buffer}}:

Buffer(e)β€…β€ŠβŸΊβ€…β€Šβˆƒeβ€²:eβ€².seq=e.seqβˆ’1∧eβ€²Β notΒ yetΒ received\text{Buffer}(e) \iff \exists e': e'.\text{seq} = e.\text{seq} - 1 \land e' \text{ not yet received}

Merge Correctness Theorem

CDC merge is correct if the merge function MM is commutative and associative:

M(M(A,b),c)=M(M(A,c),b)=M(A,bβˆͺc)M(M(A, b), c) = M(M(A, c), b) = M(A, b \cup c)

This ensures that applying changes in any order produces the same result.

Log Retention

For CDC with retention period RR and average change rate Ξ»\lambda bytes/sec:

Storagelog=R×λ×86400(bytes)\text{Storage}_{\text{log}} = R \times \lambda \times 86400 \quad \text{(bytes)}

Debezium log retention must exceed maximum replication lag lagmax⁑\text{lag}_{\max}.

Throughput Capacity

Maximum CDC throughput with pp parallel consumers:

Tmax=pΓ—min⁑(Tsource,Tnetwork,Tsink)T_{\text{max}} = p \times \min(T_{\text{source}}, T_{\text{network}}, T_{\text{sink}})

Bottleneck is typically the slowest component.

Key Insight

Event sourcing with CDC provides a complete audit trail but requires compaction to manage storage growth. Debezium's outbox pattern guarantees exactly-once delivery by writing events to an outbox table in the same transaction as business data.

Summary

CDC enables real-time data synchronization with ordering guarantees. Merge operations must be commutative for correctness. Log retention must exceed replication lag. Throughput is bounded by the slowest component in the pipeline.

Key Concepts Table (cont.)

ComponentDescriptionFailure ModeRecovery Strategy
Source DB Transaction LogBinlog/WAL recording all changesLog rotation/truncationSnapshot + resume from log position
Debezium ConnectorReads log, produces Kafka eventsConnector crashRestart from last committed offset
Kafka TopicDurable event bufferBroker failureReplication factor 3+
PySpark Structured StreamingMicro-batch CDC processorExecutor failureCheckpoint recovery
Delta Lake MERGEAtomic upsert to targetWrite failureRetry from checkpoint
Checkpoint DirectoryOffset tracking for exactly-onceCorruptionRestore from backup
WatermarkLate event handlingClock skewGrace period configuration

Code Examples

Example 1: Reading CDC Events from Kafka with PySpark

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("CDC-From-Kafka") \
    .config("spark.sql.streaming.schemaInference", "true") \
    .getOrCreate()

# Define CDC event schema
cdc_event_schema = StructType([
    StructField("schema", StructType([
        StructField("type", StringType()),
        StructField("fields", ArrayType(StructType([
            StructField("name", StringType()),
            StructField("type", StringType()),
            StructField("fields", ArrayType(StructType([
                StructField("name", StringType()),
                StructField("type", StringType()),
            ])))
        ])))
    ])),
    StructField("payload", StructType([
        StructField("before", MapType(StringType(), StringType())),
        StructField("after", MapType(StringType(), StringType())),
        StructField("source", StructType([
            StructField("version", StringType()),
            StructField("connector", StringType()),
            StructField("name", StringType()),
            StructField("ts_ms", LongType()),
            StructField("snapshot", StringType()),
            StructField("db", StringType()),
            StructField("table", StringType()),
        ])),
        StructField("op", StringType()),
        StructField("ts_ms", LongType()),
        StructField("transaction", StructType([
            StructField("id", StringType()),
            StructField("total_order", IntegerType()),
            StructField("data_collection_order", IntegerType()),
        ])),
    ]))
])

# Read CDC events from Kafka
cdc_stream = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092")
    .option("subscribe", "dbserver1.public.customers,dbserver1.public.orders")
    .option("startingOffsets", "latest")
    .option("failOnDataLoss", "false")
    .option("maxOffsetsPerTrigger", 100000)
    .load()
)

# Parse and flatten CDC events
parsed_cdc = (
    cdc_stream
    .select(
        col("key").cast("string").alias("kafka_key"),
        from_json(col("value").cast("string"), cdc_event_schema).alias("cdc"),
        col("topic"),
        col("partition"),
        col("offset"),
        col("timestamp").alias("kafka_timestamp")
    )
    .select(
        col("kafka_key"),
        col("cdc.payload.before").alias("before"),
        col("cdc.payload.after").alias("after"),
        col("cdc.payload.op").alias("operation"),
        col("cdc.payload.source.ts_ms").alias("source_ts_ms"),
        col("cdc.payload.source.db").alias("source_db"),
        col("cdc.payload.source.table").alias("source_table"),
        col("cdc.payload.source.connector").alias("connector"),
        col("topic"),
        col("partition"),
        col("offset"),
        col("kafka_timestamp"),
        # Deduplication key
        concat(
            col("topic"),
            lit("-"),
            col("partition"),
            lit("-"),
            col("offset")
        ).alias("event_id"),
        # Processing timestamp
        current_timestamp().alias("processed_at")
    )
)

# ─── Write to Bronze Layer (Raw CDC Events) ───
def write_bronze_batch(batch_df, batch_id):
    """Write raw CDC events to Bronze layer with idempotency."""
    if batch_df.count() == 0:
        return
    
    (
        batch_df
        .write
        .format("delta")
        .mode("append")
        .save("/mnt/lakehouse/bronze/cdc_events")
    )

bronze_query = (
    parsed_cdc
    .writeStream
    .foreachBatch(write_bronze_batch)
    .outputMode("append")
    .option("checkpointLocation", "/mnt/checkpoints/cdc_bronze")
    .trigger(processingTime="30 seconds")
    .start()
)

bronze_query.awaitTermination()

Example 2: Applying CDC to Delta Lake with MERGE

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import DeltaTable

spark = SparkSession.builder \
    .appName("CDC-Apply-to-Delta") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.databricks.delta.schema.autoMerge.enabled", "true") \
    .getOrCreate()

def apply_cdc_to_delta(micro_batch_df, batch_id):
    """Apply CDC events to Delta Lake target table using MERGE."""
    if micro_batch_df.count() == 0:
        return
    
    # Separate operations
    creates = micro_batch_df.filter(col("operation") == "c")
    updates = micro_batch_df.filter(col("operation") == "u")
    deletes = micro_batch_df.filter(col("operation") == "d")
    
    target_table = DeltaTable.forPath(spark, "/mnt/lakehouse/silver/customers")
    
    # Apply CREATE events (INSERT)
    if creates.count() > 0:
        new_records = (
            creates
            .select("after.*")  # Flatten the 'after' map to columns
            .withColumn("_cdc_operation", lit("INSERT"))
            .withColumn("_cdc_timestamp", col("source_ts_ms").cast("timestamp"))
            .withColumn("_event_id", col("event_id"))
        )
        
        (
            target_table.alias("target")
            .merge(
                new_records.alias("source"),
                "target.id = source.id"
            )
            .whenNotMatchedInsertAll()
            .execute()
        )
    
    # Apply UPDATE events (MERGE)
    if updates.count() > 0:
        updated_records = (
            updates
            .select("after.*")
            .withColumn("_cdc_operation", lit("UPDATE"))
            .withColumn("_cdc_timestamp", col("source_ts_ms").cast("timestamp"))
            .withColumn("_event_id", col("event_id"))
        )
        
        (
            target_table.alias("target")
            .merge(
                updated_records.alias("source"),
                "target.id = source.id"
            )
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
        )
    
    # Apply DELETE events
    if deletes.count() > 0:
        deleted_keys = deletes.select(
            col("before.id").alias("id")
        ).distinct()
        
        (
            target_table.alias("target")
            .merge(
                deleted_keys.alias("source"),
                "target.id = source.id"
            )
            .whenMatchedDelete()
            .execute()
        )

# ─── Stream CDC Events and Apply to Target ───
cdc_stream = (
    spark.readStream
    .format("delta")
    .load("/mnt/lakehouse/bronze/cdc_events")
    .filter(
        col("source_table").isin("customers", "orders")
    )
)

# Deduplicate within micro-batch (keep latest per event_id)
deduped_stream = (
    cdc_stream
    .withColumn("row_num",
        row_number().over(
            Window.partitionBy("event_id")
            .orderBy(col("processed_at").desc())
        )
    )
    .filter(col("row_num") == 1)
    .drop("row_num")
)

# Write with MERGE
merge_query = (
    deduped_stream
    .writeStream
    .foreachBatch(apply_cdc_to_delta)
    .outputMode("update")
    .option("checkpointLocation", "/mnt/checkpoints/cdc_merge")
    .trigger(processingTime="60 seconds")
    .start()
)

merge_query.awaitTermination()

Example 3: Event Sourcing Implementation with PySpark

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable

spark = SparkSession.builder \
    .appName("Event-Sourcing-CDC") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .getOrCreate()

# ─── Event Store (Append-Only) ───
event_schema = StructType([
    StructField("event_id", StringType(), False),
    StructField("event_type", StringType(), False),
    StructField("aggregate_id", StringType(), False),
    StructField("aggregate_type", StringType(), False),
    StructField("event_data", MapType(StringType(), StringType())),
    StructField("metadata", MapType(StringType(), StringType())),
    StructField("event_version", IntegerType()),
    StructField("created_at", TimestampType()),
])

# Create event store
spark.sql("""
    CREATE TABLE IF NOT EXISTS event_store (
        event_id STRING NOT NULL,
        event_type STRING NOT NULL,
        aggregate_id STRING NOT NULL,
        aggregate_type STRING NOT NULL,
        event_data MAP<STRING, STRING>,
        metadata MAP<STRING, STRING>,
        event_version INT,
        created_at TIMESTAMP
    )
    USING DELTA
    PARTITIONED BY (aggregate_type)
    TBLPROPERTIES (
        'delta.appendOnly' = 'true',
        'delta.logRetentionDuration' = 'interval 365 days'
    )
""")

# ─── Emit Events ───
def emit_event(aggregate_id, aggregate_type, event_type, event_data, metadata=None):
    """Emit an event to the event store."""
    event = spark.createDataFrame([{
        "event_id": str(uuid.uuid4()),
        "event_type": event_type,
        "aggregate_id": aggregate_id,
        "aggregate_type": aggregate_type,
        "event_data": event_data,
        "metadata": metadata or {},
        "event_version": 1,
        "created_at": datetime.now(),
    }])
    
    event.write.format("delta").mode("append").save("/mnt/lakehouse/events")

# Example: Customer lifecycle events
emit_event(
    aggregate_id="C101",
    aggregate_type="customer",
    event_type="CustomerCreated",
    event_data={"name": "Alice Johnson", "email": "alice@email.com", "city": "NYC"},
    metadata={"source": "registration_api", "correlation_id": "req-123"}
)

emit_event(
    aggregate_id="C101",
    aggregate_type="customer",
    event_type="CustomerAddressChanged",
    event_data={"old_city": "NYC", "new_city": "LA", "change_reason": "relocation"},
    metadata={"source": "profile_api", "correlation_id": "req-456"}
)

emit_event(
    aggregate_id="C101",
    aggregate_type="customer",
    event_type="CustomerEmailUpdated",
    event_data={"old_email": "alice@email.com", "new_email": "alice.new@email.com"},
    metadata={"source": "profile_api", "correlation_id": "req-789"}
)

# ─── Build Materialized View (Projection) ───
def build_customer_view(events_df, aggregate_id=None):
    """Rebuild customer state from events (event replay)."""
    filtered = events_df if aggregate_id is None else \
        events_df.filter(col("aggregate_id") == aggregate_id)
    
    # Sort by version to replay events in order
    ordered = filtered.orderBy("event_version", "created_at")
    
    # Apply events to build current state
    state = ordered.groupBy("aggregate_id").agg(
        # Extract fields from event_data maps
        last(
            when(col("event_type") == "CustomerCreated", col("event_data")["name"]),
            ignorenulls=True
        ).alias("name"),
        last(
            when(col("event_type").isin("CustomerCreated", "CustomerEmailUpdated"),
                 col("event_data")["email"]),
            ignorenulls=True
        ).alias("email"),
        last(
            when(col("event_type").isin("CustomerCreated", "CustomerAddressChanged"),
                 coalesce(col("event_data")["new_city"], col("event_data")["city"])),
            ignorenulls=True
        ).alias("city"),
        count("*").alias("total_events"),
        max("created_at").alias("last_updated"),
        first("created_at").alias("created_at"),
    )
    
    return state

# Build view from all events
events_df = spark.read.format("delta").load("/mnt/lakehouse/events")
customer_view = build_customer_view(events_df)
customer_view.show()

# Build view for specific customer (point-in-time query)
def build_state_at_time(events_df, aggregate_id, as_of_timestamp):
    """Build state as of a specific timestamp."""
    return (
        events_df
        .filter(
            (col("aggregate_id") == aggregate_id) &
            (col("created_at") <= as_of_timestamp)
        )
        .orderBy("event_version", "created_at")
        .groupBy("aggregate_id")
        .agg(
            last(
                when(col("event_type") == "CustomerCreated", col("event_data")["name"]),
                ignorenulls=True
            ).alias("name"),
            last(
                when(col("event_type").isin("CustomerCreated", "CustomerEmailUpdated"),
                     col("event_data")["email"]),
                ignorenulls=True
            ).alias("email"),
        )
    )

# Query state at specific time
state_at_may = build_state_at_time(
    events_df, "C101", 
    "2026-05-15"
)
state_at_may.show()

Performance Metrics

MetricBatch ETL (Polling)CDC (Debezium)Improvement
Latency (Source β†’ Target)4-24 hours5-30 seconds99.9% reduction
Source DB Load15-30% CPU (full scans)1-3% CPU (log read)90% reduction
Data Transferred100% of table per runOnly changes (0.1-5%)95-99% reduction
Storage (Delta Lake)Full copies per snapshotEvent log only80% reduction
Query FreshnessStale by hoursNear real-timeSub-minute
Delete DetectionImpossible (without flags)AutomaticNew capability
Ordering GuaranteesNone (race conditions)Transaction-orderedStrict ordering
Exactly-OnceDifficult (dedup needed)Checkpoint-basedGuaranteed
Operational ComplexityLowMedium-HighTrade-off
Cost (Compute)0.50/hour(batch)∣0.50/hour (batch) |0.15/hour (streaming)70% reduction

Best Practices

  1. Use Debezium for database CDC β€” Debezium provides the most mature, well-tested CDC connectors for MySQL, PostgreSQL, SQL Server, Oracle, and MongoDB. It handles log reading, snapshotting, and event serialization correctly.

  2. Enable Kafka topic compaction for latest-state queries β€” For use cases that need the current state without replaying all events, configure Kafka topic compaction (cleanup.policy=compact) to retain only the latest value per key.

  3. Implement watermark-based deduplication β€” Use PySpark's watermark feature (withWatermark) to handle late-arriving events and ensure idempotent processing within configurable time windows.

  4. Separate bronze and silver CDC processing β€” Write raw CDC events to Bronze first, then process and apply to Silver. This provides an audit trail and enables reprocessing if logic changes.

  5. Use Delta Lake MERGE for idempotent applies β€” MERGE operations are idempotent by design; running the same MERGE twice produces the same result. This is critical for exactly-once semantics in distributed systems.

  6. Monitor Kafka consumer lag β€” Set up alerts when consumer lag exceeds thresholds. High lag indicates the CDC processor cannot keep up with the source change rate.

  7. Handle schema evolution in CDC events β€” Use Delta Lake's mergeSchema option and Debezium's schema history tracking to handle source schema changes without breaking the CDC pipeline.

  8. Implement dead-letter queues β€” Route events that fail deserialization or validation to a dead-letter queue for manual inspection rather than blocking the entire pipeline.

  9. Snapshot before CDC starts β€” When setting up CDC for the first time, take a consistent snapshot of the source database and replay it through the CDC pipeline to establish a baseline.

  10. Tune micro-batch frequency β€” Balance latency vs. throughput by adjusting the trigger interval. For most use cases, 30-60 seconds provides a good balance; for sub-second latency, consider continuous processing mode.

See also: Snowflake Time Travel (snowflake/02), Kafka CDC (kafka/04), Airflow DAGs (airflow/02)

Advertisement

Need Expert PySpark Help?

Get personalized Spark optimization, cluster tuning, or production data pipeline consulting.

Advertisement