Apache Kafka Exactly-Once Semantics: Transactions, Idempotency, and Consistency

Free Lesson

Advertisement

Apache Kafka Exactly-Once Semantics: Transactions, Idempotency, and Consistency

Architecture Diagram: Exactly-Once Semantics Components

Architecture Diagram
╔══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
β•‘                                                                                                                            β•‘
β•‘                           EXACTLY-ONCE SEMANTICS ARCHITECTURE & COMPONENTS                                                β•‘
β•‘                                                                                                                            β•‘
β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β”‚   PRODUCER LAYER                                                                                                  β”‚   β•‘
β•‘  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β•‘
β•‘  β”‚   β”‚                                                                                                             β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚                           PRODUCER ARCHITECTURE                                                    β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚                                                                                                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”‚  Transaction β”‚    β”‚  Idempotent β”‚    β”‚  Sequence   β”‚    β”‚  Producer   β”‚    β”‚  Epoch      β”‚      β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”‚  Manager     β”‚    β”‚  Manager    β”‚    β”‚  Numbering  β”‚    β”‚  ID (PID)   β”‚    β”‚  Manager    β”‚      β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”‚             β”‚    β”‚             β”‚    β”‚             β”‚    β”‚             β”‚    β”‚             β”‚      β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β” β”‚    β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β” β”‚    β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β” β”‚    β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β” β”‚    β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β” β”‚      β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”‚  β”‚ begin  β”‚ β”‚    β”‚  β”‚ dedup  β”‚ β”‚    β”‚  β”‚ incr   β”‚ β”‚    β”‚  β”‚ assign β”‚ β”‚    β”‚  β”‚ bump   β”‚ β”‚      β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”‚  β”‚ txn()  β”‚ β”‚    β”‚  β”‚ table  β”‚ β”‚    β”‚  β”‚ per    β”‚ β”‚    β”‚  β”‚ by     β”‚ β”‚    β”‚  β”‚ on     β”‚ β”‚      β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”‚  β”‚        β”‚ β”‚    β”‚  β”‚ (local)β”‚ β”‚    β”‚  β”‚ broker β”‚ β”‚    β”‚  β”‚ ZK     β”‚ β”‚    β”‚  β”‚ error  β”‚ β”‚      β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚    β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚    β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚    β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚    β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚      β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚                                                                                                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚                                                                                                             β”‚   β”‚   β•‘
β•‘  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β•‘
β•‘  β”‚                                            β”‚                                                                       β”‚   β•‘
β•‘  β”‚                                            β–Ό                                                                       β”‚   β•‘
β•‘  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β•‘
β•‘  β”‚   β”‚                           TRANSACTION COORDINATOR                                                        β”‚   β”‚   β•‘
β•‘  β”‚   β”‚                                                                                                             β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚  β”‚  __txn-      β”‚    β”‚  Transaction β”‚    β”‚  Timeout     β”‚    β”‚  State       β”‚                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚  β”‚  state-      β”‚    β”‚  Metadata   β”‚    β”‚  Handler    β”‚    β”‚  Machine     β”‚                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚  β”‚  0-log       β”‚    β”‚  Cache      β”‚    β”‚             β”‚    β”‚              β”‚                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚  β”‚             β”‚    β”‚             β”‚    β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β” β”‚    β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚  β”‚  State:     β”‚    β”‚  PID: 1234  β”‚    β”‚  β”‚ timeoutβ”‚ β”‚    β”‚  β”‚empty   β”‚  β”‚                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”‚  Empty/     β”‚    β”‚  Epoch: 1   β”‚    β”‚  β”‚ 30s    β”‚ β”‚    β”‚  β”‚  β”‚     β”‚  β”‚                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚  β”‚  Ongoing/   β”‚    β”‚  Partitions: β”‚    β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚    β”‚  β”‚  β–Ό     β”‚  β”‚                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚  β”‚  Prepare    β”‚    β”‚  [p0,p1,p2] β”‚    β”‚             β”‚    β”‚  β”‚Prepareβ”‚  β”‚                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚  β”‚  Complete   β”‚    β”‚             β”‚    β”‚             β”‚    β”‚  β”‚  β”‚     β”‚  β”‚                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚  β”‚  β–Ό     β”‚  β”‚                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚                                                              β”‚  β”‚Completeβ”‚  β”‚                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚                                                              β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚                                                              β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β•‘
β•‘  β”‚   β”‚                                                                                                             β”‚   β•‘
β•‘  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β•‘
β•‘  β”‚                                            β”‚                                                                       β”‚   β•‘
β•‘  β”‚                                            β–Ό                                                                       β”‚   β•‘
β•‘  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β•‘
β•‘  β”‚   β”‚                           BROKER LAYER (Partitions)                                                        β”‚   β”‚   β•‘
β•‘  β”‚   β”‚                                                                                                             β”‚   β•‘
β•‘  β”‚   β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚  β”‚  Broker 0    β”‚    β”‚  Broker 1    β”‚    β”‚  Broker 2    β”‚    β”‚  Broker 3    β”‚                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚  β”‚  Part: 0,4   β”‚    β”‚  Part: 1,5   β”‚    β”‚  Part: 2,6   β”‚    β”‚  Part: 3,7   β”‚                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚  β”‚              β”‚    β”‚              β”‚    β”‚              β”‚    β”‚              β”‚                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚  β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚    β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚    β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚    β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚  β”‚ β”‚ PID:1234 β”‚ β”‚    β”‚ β”‚ PID:1234 β”‚ β”‚    β”‚ β”‚ PID:1234 β”‚ β”‚    β”‚ β”‚ PID:1234 β”‚ β”‚                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚  β”‚ β”‚ Epoch:1  β”‚ β”‚    β”‚ β”‚ Epoch:1  β”‚ β”‚    β”‚ β”‚ Epoch:1  β”‚ β”‚    β”‚ β”‚ Epoch:1  β”‚ β”‚                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚  β”‚ β”‚ Seq: 0   β”‚ β”‚    β”‚ β”‚ Seq: 0   β”‚ β”‚    β”‚ β”‚ Seq: 0   β”‚ β”‚    β”‚ β”‚ Seq: 0   β”‚ β”‚                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚  β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚    β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚    β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚    β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β•‘
β•‘  β”‚   β”‚                                                                                                             β”‚   β•‘
β•‘  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β•‘
β•‘                                                                                                                            β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•

Architecture Diagram: Transaction Lifecycle State Machine

Architecture Diagram
╔══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
β•‘                                                                                                                            β•‘
β•‘                           TRANSACTION LIFECYCLE STATE MACHINE                                                              β•‘
β•‘                                                                                                                            β•‘
β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β”‚                            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                             β”‚   β•‘
β•‘  β”‚                            β”‚                                         β”‚                                             β”‚   β•‘
β•‘  β”‚                            β”‚              EMPTY STATE                β”‚                                             β”‚   β•‘
β•‘  β”‚                            β”‚   (No active transaction)               β”‚                                             β”‚   β•‘
β•‘  β”‚                            β”‚                                         β”‚                                             β”‚   β•‘
β•‘  β”‚                            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                             β”‚   β•‘
β•‘  β”‚                                                 β”‚                                                                  β”‚   β•‘
β•‘  β”‚                                                 β”‚  initTransactions() /                                                              β”‚   β•‘
β•‘  β”‚                                                 β”‚  beginTransaction()                                                                β”‚   β•‘
β•‘  β”‚                                                 β–Ό                                                                                     β”‚   β•‘
β•‘  β”‚                            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                             β”‚   β•‘
β•‘  β”‚                            β”‚                                         β”‚                                             β”‚   β•‘
β•‘  β”‚                            β”‚            ONGOING STATE                β”‚                                             β”‚   β•‘
β•‘  β”‚                            β”‚   (Transaction in progress)             β”‚                                             β”‚   β•‘
β•‘  β”‚                            β”‚   - Sending records                     β”‚                                             β”‚   β•‘
β•‘  β”‚                            β”‚   - Updating state stores               β”‚                                             β”‚   β•‘
β•‘  β”‚                            β”‚   - Marking partitions as dirty         β”‚                                             β”‚   β•‘
β•‘  β”‚                            β”‚                                         β”‚                                             β”‚   β•‘
β•‘  β”‚                            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                             β”‚   β•‘
β•‘  β”‚                                                 β”‚                                                                  β”‚   β•‘
β•‘  β”‚                                                 β”‚  commitTransaction()                                                                β”‚   β•‘
β•‘  β”‚                                                 β–Ό                                                                                     β”‚   β•‘
β•‘  β”‚                            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                             β”‚   β•‘
β•‘  β”‚                            β”‚                                         β”‚                                             β”‚   β•‘
β•‘  β”‚                            β”‚         PREPARE COMMIT STATE            β”‚                                             β”‚   β•‘
β•‘  β”‚                            β”‚   (Coordinator preparing)              β”‚                                             β”‚   β•‘
β•‘  β”‚                            β”‚   - Write TxnMarker to partitions      β”‚                                             β”‚   β•‘
β•‘  β”‚                            β”‚   - Wait for all ISR acknowledgment   β”‚                                             β”‚   β•‘
β•‘  β”‚                            β”‚                                         β”‚                                             β”‚   β•‘
β•‘  β”‚                            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                             β”‚   β•‘
β•‘  β”‚                                                 β”‚                                                                  β”‚   β•‘
β•‘  β”‚                                                 β”‚  All markers written                                                               β”‚   β•‘
β•‘  β”‚                                                 β–Ό                                                                                     β”‚   β•‘
β•‘  β”‚                            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                             β”‚   β•‘
β•‘  β”‚                            β”‚                                         β”‚                                             β”‚   β•‘
β•‘  β”‚                            β”‚         COMPLETE STATE                  β”‚                                             β”‚   β•‘
β•‘  β”‚                            β”‚   (Transaction committed)               β”‚                                             β”‚   β•‘
β•‘  β”‚                            β”‚   - Write committed offsets             β”‚                                             β”‚   β•‘
β•‘  β”‚                            β”‚   - Mark transaction as complete        β”‚                                             β”‚   β•‘
β•‘  β”‚                            β”‚                                         β”‚                                             β”‚   β•‘
β•‘  β”‚                            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                             β”‚   β•‘
β•‘  β”‚                                                 β”‚                                                                  β”‚   β•‘
β•‘  β”‚                                                 β”‚  Next transaction                                                                   β”‚   β•‘
β•‘  β”‚                                                 β–Ό                                                                                     β”‚   β•‘
β•‘  β”‚                            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                             β”‚   β•‘
β•‘  β”‚                            β”‚                                         β”‚                                             β”‚   β•‘
β•‘  β”‚                            β”‚              EMPTY STATE                β”‚                                             β”‚   β•‘
β•‘  β”‚                            β”‚                                         β”‚                                             β”‚   β•‘
β•‘  β”‚                            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                             β”‚   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β”‚   ABORT PATH:                                                                                                    β”‚   β•‘
β•‘  β”‚   ONGOING ──► abortTransaction() ──► ABORT COMPLETE ──► EMPTY                                                    β”‚   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β•‘
β•‘  β”‚   β”‚  TIMEOUT PATH:                                                                                            β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  ONGOING ──► timeout (30s default) ──► EXPIRED ──► Fence producer ──► EMPTY                               β”‚   β”‚   β•‘
β•‘  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β•‘
β•‘                                                                                                                            β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•

Architecture Diagram: Idempotent Producer Deduplication

Architecture Diagram
╔══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
β•‘                                                                                                                            β•‘
β•‘                           IDEMPOTENT PRODUCER DEDUPLICATION MECHANISM                                                     β•‘
β•‘                                                                                                                            β•‘
β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β”‚   PRODUCER SIDE                                                                                                   β”‚   β•‘
β•‘  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β•‘
β•‘  β”‚   β”‚                                                                                                             β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚                           IN-FLIGHT REQUESTS TRACKER                                              β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚                                                                                                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”‚  Request 1: PID=1234, Epoch=1, Seq=0   [ACKED]  ──► Remove from tracker                      β”‚  β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”‚  Request 2: PID=1234, Epoch=1, Seq=1   [PENDING] ──► In flight                              β”‚  β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”‚  Request 3: PID=1234, Epoch=1, Seq=2   [PENDING] ──► In flight                              β”‚  β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”‚  Request 4: PID=1234, Epoch=1, Seq=3   [QUEUED]  ──► Waiting for slot                       β”‚  β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚                                                                                                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   max.in.flight.requests.per.connection = 5                                                          β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   (With idempotence enabled, ordering is guaranteed even with retries)                              β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚                                                                                                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚                                                                                                             β”‚   β”‚   β•‘
β•‘  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β•‘
β•‘  β”‚                                            β”‚                                                                       β”‚   β•‘
β•‘  β”‚                                            β–Ό                                                                       β”‚   β•‘
β•‘  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β•‘
β•‘  β”‚   β”‚                           BROKER SIDE DEDUPLICATION                                                      β”‚   β”‚   β•‘
β•‘  β”‚   β”‚                                                                                                             β”‚   β•‘
β•‘  β”‚   β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚                                                                                                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   PER-PARTITION STATE:                                                                               β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”‚  Partition 0:                                                                                β”‚  β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”‚    PID: 1234, Epoch: 1, Last Seq: 5                                                         β”‚  β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”‚    Deduplication Window: [Seq 6 - Seq 5 + max.in.flight]                                   β”‚  β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”‚                                                                                             β”‚  β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”‚  Incoming: PID=1234, Epoch=1, Seq=6  ──► ACCEPT (new)                                       β”‚  β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”‚  Incoming: PID=1234, Epoch=1, Seq=6  ──► REJECT (duplicate)                                 β”‚  β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”‚  Incoming: PID=1234, Epoch=1, Seq=4  ──► REJECT (too old, outside window)                   β”‚  β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”‚  Incoming: PID=1234, Epoch=0, Seq=6  ──► REJECT (stale epoch)                               β”‚  β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚                                                                                                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   DEDUPLICATION TABLE (per partition):                                                              β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”‚  PID   β”‚  Epoch  β”‚  Last Seq  β”‚  Buffer                                                      β”‚  β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   │────────│─────────│────────────│──────────────────────────────────────────────────────────────│  β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”‚  1234  β”‚    1    β”‚     5      β”‚  [6, 7, 8] (in-flight)                                      β”‚  β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β”‚  5678  β”‚    2    β”‚    10      β”‚  [] (none)                                                   β”‚  β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚                                                                                                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚                                                                                                             β”‚   β•‘
β•‘  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β•‘
β•‘                                                                                                                            β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•

Formal Definitions

DfExactly-Once Semantics (EOS)

Exactly-once semantics guarantees that each message is delivered to the consumer exactly once β€” no duplicates, no data loss. In Kafka, EOS is achieved through three coordinated mechanisms: (1) idempotent producers that prevent duplicate writes per partition, (2) transactional producers that enable atomic multi-partition writes, and (3) consumer isolation levels that filter uncommitted records. End-to-end EOS requires all three components working together.

DfIdempotent Producer

An idempotent producer ensures that a message is written to a partition exactly once, even if the producer retries. Each producer instance receives a unique Producer ID (PID) and epoch. Each partition tracks the last sequence number per PID. Retries with the same sequence number are rejected as duplicates. Formally: for producer P with PID=p, the broker accepts message (p, seq) only if seq = lastAcceptedSeq(p) + 1.

DfTransactional Producer

A transactional producer enables atomic writes across multiple partitions and topics. A transaction groups multiple produce requests into a single atomic unit: either all messages in the transaction are committed (visible to read_committed consumers) or none are (aborted and invisible). The Transaction Coordinator manages the two-phase commit protocol using the __transaction_state topic.

Key Formulas

P(\\text{deliver}(m) = 1) = \\begin{cases} 1 & \\text{if } m \\text{ is produced and committed} \\\\ 0 & \\text{otherwise} \\end{cases}

Idempotent Deduplication Condition

\\text{Accept}(\\text{PID}, \\text{seq}) = \\begin{cases} \\text{ACCEPT} & \\text{if } \\text{seq} = \\text{lastSeq}(\\text{PID}) + 1 \\\\ \\text{REJECT} & \\text{otherwise} \\end{cases}

Here,

  • PIDPID=Producer ID
  • seqseq=Sequence number of the incoming message
  • lastSeq(PID)lastSeq(PID)=Last accepted sequence number for this PID on this partition

Deduplication Window

Wtextdedup=[textlastSeqβˆ’Ntextinflight+1,;textlastSeq+Ntextinflight]W_{\\text{dedup}} = [\\text{lastSeq} - N_{\\text{in\\_flight}} + 1,\\; \\text{lastSeq} + N_{\\text{in\\_flight}}]

Here,

  • Nin_flightN_{\text{in\_flight}}=max.in.flight.requests.per.connection (default 5)
  • lastSeqlastSeq=Last committed sequence number

The Producer ID (PID) is assigned by the Transaction Coordinator and persisted in __transaction_state. On producer restart, a new PID is acquired with an incremented epoch, which fences (invalidates) the old producer instance. This prevents split-brain scenarios where two producer instances write to the same partition.

ThOrdering Guarantee with Idempotence

With idempotence enabled, Kafka guarantees exactly-once delivery and ordering within a partition even with retries and max.in.flight.requests.per.connection > 1. The broker buffers out-of-order messages (within the deduplication window) and delivers them in sequence order. Formally: for a producer with PID=p sending messages s_1, s_2, ..., s_k to partition P, a consumer on P will observe them in order s_1, s_2, ..., s_k, each exactly once.

ThExactly-Once Semantics Proof Sketch

Claim: Kafka achieves end-to-end exactly-once semantics when: (1) producer uses idempotence + transactions, (2) consumer uses isolation.level=read_committed, (3) sink operations are idempotent.

Proof sketch:

  1. Source deduplication: Idempotent producer ensures each message is written exactly once per partition (by sequence number tracking).
  2. Atomic multi-partition write: Transaction coordinator ensures all messages in a transaction are either all committed or all aborted (two-phase commit via __transaction_state).
  3. Consumer filtering: read_committed consumers only see messages from committed transactions (filtered by LSO β€” Last Stable Offset).
  4. Sink idempotency: If the sink operation (database write, HTTP call) is idempotent, reprocessing the same committed message produces the same result.
  5. Combined: The composition of these four properties yields exactly-once end-to-end delivery. β–‘

Transaction timeout (transaction.timeout.ms, default 60s) determines how long a transaction can remain open. If the producer crashes during a transaction, the coordinator will abort it after the timeout. Keep transactions short to minimize resource holding and reduce timeout risk.

Detailed Explanation

Exactly-once semantics (EOS) in Kafka ensures that each message is delivered exactly once to the consumer, with no duplicates and no data loss. This is achieved through three coordinated mechanisms: idempotent producers, transactional producers, and consumer isolation levels. The implementation requires careful coordination between producers, brokers, and consumers to guarantee end-to-end exactly-once delivery.

Idempotent Producers prevent duplicate messages from being written during retries. When enabled (enable.idempotence=true), each producer is assigned a Producer ID (PID) and an epoch number. Each partition tracks the sequence number of the last successfully written message from each PID. When a producer sends a message, it includes the PID, epoch, and sequence number. The broker checks if the sequence number is the next expected value; if not, it rejects the message as a duplicate. The deduplication window is determined by max.in.flight.requests.per.connection (default 5), meaning messages with sequence numbers within this range of the last accepted sequence are buffered for ordering.

The Producer ID and Epoch mechanism handles producer restarts and leadership changes. When a producer restarts, it acquires a new epoch, which causes brokers to discard the old deduplication state for that PID, preventing conflicts between old and new producer instances. The epoch is incremented when the producer is fenced (e.g., after a leadership change), ensuring that only one producer instance can write to a partition at a time. This fencing mechanism is critical for preventing split-brain scenarios where two producer instances might write conflicting data.

Transactional Producers enable atomic writes across multiple partitions. A transactional producer begins a transaction, sends messages to multiple partitions, and then commits or aborts the transaction atomically. The Transaction Coordinator (a broker) manages the transaction state and ensures that all partitions involved in the transaction either have all records written (commit) or none (abort). The transaction state is stored in a special internal topic (__transaction_state), and transaction markers (control records) are written to the affected partitions to indicate the transaction's outcome.

Consumer Isolation Levels determine how consumers read transactional data. The read_committed isolation level ensures that consumers only read records from committed transactions, while read_uncommitted reads all records regardless of transaction status. With read_committed, consumers automatically filter out records from uncommitted transactions, providing a consistent view of the data. This isolation is implemented by the broker, which tracks the Last Stable Offset (LSO) - the offset of the first message that is either committed or part of an ongoing transaction.

Transaction Coordinator Protocol: The coordinator uses a two-phase commit protocol. Phase 1 (Prepare): The coordinator writes transaction markers to all partitions involved in the transaction. Phase 2 (Complete): The coordinator writes the committed offsets to the __consumer_offsets topic atomically. If the coordinator fails between phases, a new coordinator will complete the transaction by reading the transaction state from __transaction_state and completing the two-phase commit. This ensures that transactions are never left in an indeterminate state.

End-to-End Exactly-Once: Achieving exactly-once semantics end-to-end requires: (1) Idempotent producer (prevents duplicates at source), (2) Transactional producer (ensures atomicity of multi-partition writes), (3) Consumer with read_committed (ensures only committed records are processed), and (4) Idempotent sink operations (prevents duplicates at destination). Kafka Streams provides this out of the box with processing.guarantee=exactly_once_v2, which automatically configures all these components.

Key Concepts Table

ComponentDescriptionKey ConfigurationsFailure Mode
Producer ID (PID)Unique identifier for producer instancetransactional.idNew PID on restart
EpochFencing mechanism for producer instancesAuto-incrementedFence stale producers
Sequence NumberPer-partition message orderingAuto-incrementedDuplicate rejection
Transaction CoordinatorBroker managing transaction stateAutomatic electionTransaction recovery
Transaction MarkersControl records in partition logAutomaticAbort uncommitted
LSO (Last Stable Offset)Highest offset not in transactionAutomaticConsumer filtering
Deduplication TablePer-partition PID/epoch/seq trackingmax.in.flightDuplicate rejection
FencingPreventing stale producer writesAutomaticSplit-brain prevention
Idempotent ProducerPrevents duplicate writesenable.idempotenceRetry safety
Transactional ProducerAtomic multi-partition writestransactional.idAtomicity guarantee

Code Examples

Idempotent Producer with Detailed Configuration

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.Future;

public class IdempotentProducerExample {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        
        // Bootstrap servers
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
            "kafka-broker-0:9092,kafka-broker-1:9092,kafka-broker-2:9092");
        
        // Serializers
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
            StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
            StringSerializer.class.getName());
        
        // Enable idempotence (critical for exactly-once)
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        
        // Required for idempotence
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        
        // Optional: Transactional configuration for multi-partition atomicity
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-producer-1");
        
        // Performance tuning
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        // Initialize transactions (required if transactional.id is set)
        producer.initTransactions();
        
        try {
            // Begin transaction for atomic writes
            producer.beginTransaction();
            
            for (int i = 0; i < 1000; i++) {
                String key = "order-" + (i % 100);
                String value = "{\"orderId\": \"" + i + "\", \"amount\": " + 
                    (Math.random() * 1000) + "}";
                
                ProducerRecord<String, String> record = 
                    new ProducerRecord<>("order-events", key, value);
                
                // Add metadata headers
                record.headers().add("correlation-id", 
                    java.util.UUID.randomUUID().toString().getBytes());
                record.headers().add("idempotent", "true".getBytes());
                
                // Send with callback for monitoring
                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        System.err.println("Failed: " + exception.getMessage());
                    } else {
                        System.out.printf("Sent to partition %d, offset %d, " +
                            "timestamp %d%n",
                            metadata.partition(), metadata.offset(),
                            metadata.timestamp());
                    }
                });
            }
            
            // Commit transaction atomically
            producer.commitTransaction();
            System.out.println("Transaction committed successfully");
            
        } catch (Exception e) {
            // Abort transaction on any failure
            producer.abortTransaction();
            System.err.println("Transaction aborted: " + e.getMessage());
            throw e;
        } finally {
            producer.close();
        }
    }
}

Consumer with Transactional Read Isolation

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;

public class TransactionalConsumerExample {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
            "kafka-broker-0:9092,kafka-broker-1:9092,kafka-broker-2:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class.getName());
        
        // Critical: Read committed isolation for exactly-once
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        
        // Disable auto-commit for manual offset management
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        // Fetch configuration
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1);
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        
        // Subscribe with rebalance listener
        consumer.subscribe(Arrays.asList("order-events"), 
            new ConsumerRebalanceListener() {
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    System.out.println("Partitions revoked: " + partitions);
                    // Commit any pending offsets
                    consumer.commitSync(Duration.ofSeconds(30));
                }
                
                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    System.out.println("Partitions assigned: " + partitions);
                }
            });
        
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                
                if (records.isEmpty()) continue;
                
                // Process records
                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
                
                for (ConsumerRecord<String, String> record : records) {
                    // Process only committed records (exactly-once guarantee)
                    processRecord(record);
                    
                    // Track offset for manual commit
                    offsetsToCommit.put(
                        new TopicPartition(record.topic(), record.partition()),
                        new OffsetAndMetadata(record.offset() + 1, "processed")
                    );
                }
                
                // Commit offsets after successful processing
                if (!offsetsToCommit.isEmpty()) {
                    consumer.commitSync(offsetsToCommit, Duration.ofSeconds(30));
                }
            }
        } finally {
            consumer.close();
        }
    }
    
    private static void processRecord(ConsumerRecord<String, String> record) {
        System.out.printf("Processing committed record: topic=%s, partition=%d, " +
            "offset=%d, key=%s%n",
            record.topic(), record.partition(), record.offset(), record.key());
        
        // Business logic here
        // This record is guaranteed to be from a committed transaction
    }
}

Kafka Streams with Exactly-Once Semantics

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Properties;

public class ExactlyOnceStreamExample {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "exactly-once-stream-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        
        // Exactly-once semantics (v2 for improved performance)
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
            StreamsConfig.EXACTLY_ONCE_V2);
        
        // State store configuration
        props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // Source stream
        KStream<String, String> sourceStream = builder.stream("input-events",
            Consumed.with(Serdes.String(), Serdes.String()));
        
        // Stateful transformation with exactly-once guarantee
        KTable<Windowed<String>, Long> windowedCounts = sourceStream
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
            .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
                    "windowed-count-store")
                .withValueSerde(Serdes.Long()));
        
        // Branch operations (all atomic within transaction)
        Map<String, KStream<String, String>> branches = sourceStream
            .filter((key, value) -> value != null)
            .branch((key, value) -> value.contains("HIGH"),
                Branched.withConsumer(stream -> 
                    stream.to("high-priority-events", 
                        Produced.with(Serdes.String(), Serdes.String()))))
            .branch((key, value) -> value.contains("NORMAL"),
                Branched.withConsumer(stream -> 
                    stream.to("normal-priority-events", 
                        Produced.with(Serdes.String(), Serdes.String()))));
        
        // Join operation (atomic)
        KTable<String, String> referenceData = builder.table("reference-data",
            Consumed.with(Serdes.String(), Serdes.String()));
        
        KStream<String, String> enrichedStream = sourceStream
            .join(referenceData,
                (event, ref) -> event + ":" + ref,
                Joined.with(Serdes.String(), Serdes.String(), Serdes.String()));
        
        enrichedStream.to("enriched-events", 
            Produced.with(Serdes.String(), Serdes.String()));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
        
        // Graceful shutdown
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            streams.close(Duration.ofSeconds(30));
            streams.cleanUp();
        }));
    }
}

Transaction Coordinator Monitoring Script

#!/bin/bash
# Monitor Kafka transaction coordinators and transaction state

BOOTSTRAP_SERVER="kafka-broker-0:9092"

echo "=== KAFKA TRANSACTION MONITORING ==="
echo "Timestamp: $(date)"
echo ""

# List transactional IDs
echo "--- Active Transactional IDs ---"
kafka-console-consumer.sh \
  --bootstrap-server $BOOTSTRAP_SERVER \
  --topic __transaction_state \
  --from-beginning \
  --max-messages 100 \
  --formatter "kafka.coordinator.transaction.TransactionStateManager\$TransactionStateMessageFormatter"

echo ""
echo "--- Transaction Coordinator Status ---"
# Check transaction coordinator for specific transactional ID
kafka-transactions.sh \
  --bootstrap-server $BOOTSTRAP_SERVER \
  --describe \
  --transactional-id order-producer-1

echo ""
echo "--- Transaction State ---"
# List ongoing transactions
kafka-transactions.sh \
  --bootstrap-server $BOOTSTRAP_SERVER \
  --list

echo ""
echo "--- Producer Fence Status ---"
# Check for fenced producers
kafka-console-consumer.sh \
  --bootstrap-server $BOOTSTRAP_SERVER \
  --topic __transaction_state \
  --from-beginning \
  --max-messages 50 \
  --property print.key=true \
  --property print.value=true

Performance Metrics

ConfigurationThroughput (msg/sec)Latency (p99)Duplicate RateMemory Usage
No Idempotence500,0005ms0.1%32MB
Idempotent (acks=all)450,0008ms0%64MB
Transactional (single partition)400,00012ms0%128MB
Transactional (multi-partition)300,00020ms0%128MB
Kafka Streams (EOS v1)200,00025ms0%256MB
Kafka Streams (EOS v2)250,00018ms0%256MB
Consumer (read_committed)100,0002ms0%32MB
Consumer (read_uncommitted)150,0001ms0%32MB

Best Practices

  1. Idempotence First: Always enable enable.idempotence=true before considering transactions. Idempotence provides duplicate prevention per partition with minimal overhead.

  2. Transactional Scope: Keep transaction scope as small as possible. Long-running transactions increase latency, memory usage, and the risk of timeout. Aim for transactions that complete within seconds.

  3. Fencing Awareness: Use unique transactional.id per producer instance. Never share transactional IDs across different application instances. Understand that fencing prevents split-brain but requires proper instance management.

  4. Consumer Isolation: Always use isolation.level=read_committed for consumers reading transactional data. Use read_uncommitted only when you need to read uncommitted data for debugging.

  5. Error Handling: Implement proper error handling for transaction failures. Use abortTransaction() on any exception to ensure clean state. Monitor transaction coordinator health and handle coordinator failover gracefully.

  6. Performance Tuning: Enable compression (compression.type=lz4) to reduce network overhead. Tune batch.size and linger.ms to balance latency vs. throughput. Use max.in.flight.requests.per.connection=5 for optimal pipelining with idempotence.

  7. Monitoring: Track producer metrics: transactional-id, epoch, transaction-send-rate, transaction-commit-rate. Track consumer metrics: records-lag-max, last-stable-offset. Monitor coordinator metrics: txn-commit-rate, txn-abort-rate.

  8. Testing: Test exactly-once semantics by writing a consumer that deduplicates by message ID. Verify that reprocessing the same data produces identical results. Use chaos testing to simulate producer restarts and network partitions.

  9. Migration: When enabling idempotence on existing topics, ensure all producers are updated simultaneously. Test with a subset of partitions before full rollout. Monitor for fencing events during migration.

  10. Kafka Streams EOS: Use processing.guarantee=exactly_once_v2 for new applications. Understand the overhead: EOS v2 reduces latency by 30% compared to v1. Ensure state.dir is on fast storage (SSD) for optimal performance.

Key Takeaways:

  • EOS = idempotent producers (per-partition dedup) + transactional producers (atomic multi-partition writes) + read_committed consumers
  • Idempotent deduplication: broker accepts message only if seq = lastSeq(PID) + 1; dedup window = max.in.flight.requests
  • Transaction coordinator uses two-phase commit: write TxnMarker to partitions, then commit offsets atomically
  • Producer fencing via epoch increment prevents split-brain; new PID on restart invalidates old producer
  • End-to-end EOS requires idempotent sink operations at the destination
  • EOS v2 (processing.guarantee=exactly_once_v2) reduces overhead by ~30% vs v1

See also: Data Engineering Pipeline patterns (data-engineering/019) | PySpark Structured Streaming (pyspark/11-structured-streaming)

Advertisement

Need Expert Kafka Help?

Get personalized streaming architecture, cluster tuning, or production Kafka consulting.

Advertisement