Apache Kafka Producer & Consumer: Advanced Configurations and Patterns

Free Lesson

Advertisement

Apache Kafka Producer & Consumer: Advanced Configurations and Patterns

Architecture Diagram: Producer Message Flow

Architecture Diagram
╔══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
β•‘                                                                                                                            β•‘
β•‘                            KAFKA PRODUCER MESSAGE FLOW & INTERNAL ARCHITECTURE                                             β•‘
β•‘                                                                                                                            β•‘
β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β”‚   APPLICATION LAYER                                                                                                β”‚   β•‘
β•‘  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β•‘
β•‘  β”‚   β”‚                                                                                                             β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                             β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚ Record 1 β”‚    β”‚ Record 2 β”‚    β”‚ Record 3 β”‚    β”‚ Record 4 β”‚    β”‚ Record 5 β”‚                             β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚ key=K1   β”‚    β”‚ key=K2   β”‚    β”‚ key=K1   β”‚    β”‚ key=K3   β”‚    β”‚ key=K2   β”‚                             β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚ val=V1   β”‚    β”‚ val=V2   β”‚    β”‚ val=V3   β”‚    β”‚ val=V4   β”‚    β”‚ val=V5   β”‚                             β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜                             β”‚   β”‚   β•‘
β•‘  β”‚   β”‚        β”‚               β”‚               β”‚               β”‚               β”‚                                     β”‚   β”‚   β•‘
β•‘  β”‚   β”‚        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                     β”‚   β”‚   β•‘
β•‘  β”‚   β”‚                                        β”‚                                                                     β”‚   β”‚   β•‘
β•‘  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β•‘
β•‘  β”‚                                            β”‚                                                                       β”‚   β•‘
β•‘  β”‚                                            β–Ό                                                                       β”‚   β•‘
β•‘  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β•‘
β•‘  β”‚   β”‚                              INTERCEPTORS & SERIALizers                                                    β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                        β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”‚ Producer        β”‚    β”‚ Key             β”‚    β”‚ Value           β”‚                                        β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”‚ Interceptor     │───►│ Serializer      │───►│ Serializer      β”‚                                        β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”‚ (Logging/Metrics)β”‚   β”‚ (String/Avro)  β”‚    β”‚ (JSON/Avro)    β”‚                                        β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                        β”‚   β”‚   β•‘
β•‘  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β•‘
β•‘  β”‚                                            β”‚                                                                       β”‚   β•‘
β•‘  β”‚                                            β–Ό                                                                       β”‚   β•‘
β•‘  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β•‘
β•‘  β”‚   β”‚                              PARTITIONING & ROUTING                                                        β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                        β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”‚ Partitioner     β”‚    β”‚ Partition       β”‚    β”‚ Record          β”‚                                        β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”‚ (Default/Custom)│───►│ Selector        │───►│ Batch           β”‚                                        β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”‚ MurmurHash2    β”‚    β”‚ Round Robin     β”‚    β”‚ Accumulator     β”‚                                        β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                        β”‚   β”‚   β•‘
β•‘  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β•‘
β•‘  β”‚                                                            β”‚                                                     β”‚   β•‘
β•‘  β”‚                                                            β–Ό                                                     β”‚   β•‘
β•‘  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β•‘
β•‘  β”‚   β”‚                              NETWORK I/O LAYER                                                             β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                  β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”‚ Sender Thread   β”‚    β”‚ InFlight        β”‚    β”‚ Network         β”‚    β”‚ Response        β”‚                  β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”‚ (Async Send)    │───►│ Requests Queue  │───►│ Client (NIO)    │───►│ Handler         β”‚                  β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”‚                 β”‚    β”‚ (max.in.flight) β”‚    β”‚                 β”‚    β”‚                 β”‚                  β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                  β”‚   β”‚   β•‘
β•‘  β”‚   β”‚         β”‚                    β”‚                      β”‚                     β”‚                                  β”‚   β”‚   β•‘
β•‘  β”‚   β”‚         β”‚                    β”‚                      β”‚                     β”‚                                  β”‚   β”‚   β•‘
β•‘  β”‚   β”‚         β–Ό                    β–Ό                      β–Ό                     β–Ό                                  β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  BROKER 0        BROKER 1        BROKER 2        BROKER 3                                         β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  β”‚Partition β”‚    β”‚Partition β”‚    β”‚Partition β”‚    β”‚Partition β”‚                                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  β”‚    0     β”‚    β”‚    1     β”‚    β”‚    2     β”‚    β”‚    3     β”‚                                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  β”‚ (Leader) β”‚    β”‚ (Leader) β”‚    β”‚ (Leader) β”‚    β”‚ (Leader) β”‚                                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                     β”‚   β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚   β•‘
β•‘  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β•‘
β•‘                                                                                                                            β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•

Architecture Diagram: Consumer Group Rebalancing Process

Architecture Diagram
╔══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
β•‘                                                                                                                            β•‘
β•‘                           CONSUMER GROUP REBALANCING STATE MACHINE                                                        β•‘
β•‘                                                                                                                            β•‘
β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β”‚  PHASE 1: PREPARE REBALANCE (Trigger: Consumer Join/Leave)                                                        β”‚   β•‘
β•‘  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β•‘
β•‘  β”‚  β”‚                                                                                                             β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                  β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”‚  Consumer 1 β”‚         β”‚  Consumer 2 β”‚         β”‚  Consumer 3 β”‚         β”‚  Consumer 4 β”‚                  β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”‚   PREPARING β”‚         β”‚   PREPARING β”‚         β”‚   PREPARING β”‚         β”‚   PREPARING β”‚  ← New Consumer β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”‚   PREPARE   β”‚         β”‚   PREPARE   β”‚         β”‚   PREPARE   β”‚         β”‚   PREPARE   β”‚                  β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜         β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜         β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜         β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜                  β”‚   β”‚   β•‘
β•‘  β”‚  β”‚          β”‚                       β”‚                       β”‚                       β”‚                          β”‚   β”‚   β•‘
β•‘  β”‚  β”‚          β–Ό                       β–Ό                       β–Ό                       β–Ό                          β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”‚                            JOIN GROUP REQUEST                                                       β”‚  β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚  β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”‚  β”‚  Members: [consumer-1, consumer-2, consumer-3, consumer-4]                                 β”‚   β”‚  β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”‚  β”‚  Group Protocol: CooperativeStickyAssignor                                                 β”‚   β”‚  β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”‚  β”‚  Subscription: [order-events, payment-events]                                              β”‚   β”‚  β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚  β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β”‚   β•‘
β•‘  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β•‘
β•‘  β”‚                                            β”‚                                                                       β”‚   β•‘
β•‘  β”‚                                            β–Ό                                                                       β”‚   β•‘
β•‘  β”‚  PHASE 2: SYNC GROUP (Partition Assignment)                                                                       β”‚   β•‘
β•‘  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β•‘
β•‘  β”‚  β”‚                                                                                                             β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”‚                            GROUP COORDINATOR (Broker 0)                                            β”‚   β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”‚  β”‚  ASSIGNMENT STRATEGY: CooperativeStickyAssignor                                            β”‚   β”‚   β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”‚  β”‚                                                                                             β”‚   β”‚   β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”‚  β”‚  Partition Assignments:                                                                     β”‚   β”‚   β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚   β”‚   β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”‚  β”‚  β”‚  Consumer 1 ◄── Partition 0, Partition 4                                            β”‚    β”‚   β”‚   β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”‚  β”‚  β”‚  Consumer 2 ◄── Partition 1, Partition 5                                            β”‚    β”‚   β”‚   β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”‚  β”‚  β”‚  Consumer 3 ◄── Partition 2                                                         β”‚    β”‚   β”‚   β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”‚  β”‚  β”‚  Consumer 4 ◄── Partition 3                                                         β”‚    β”‚   β”‚   β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚   β”‚   β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β•‘
β•‘  β”‚  β”‚                                                                                                             β”‚   β•‘
β•‘  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β•‘
β•‘  β”‚                                            β”‚                                                                       β”‚   β•‘
β•‘  β”‚                                            β–Ό                                                                       β”‚   β•‘
β•‘  β”‚  PHASE 3: STABLE (Normal Operation)                                                                               β”‚   β•‘
β•‘  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β•‘
β•‘  β”‚  β”‚                                                                                                             β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                 β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”‚  Consumer 1 β”‚    β”‚  Consumer 2 β”‚    β”‚  Consumer 3 β”‚    β”‚  Consumer 4 β”‚                                 β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”‚   STABLE    β”‚    β”‚   STABLE    β”‚    β”‚   STABLE    β”‚    β”‚   STABLE    β”‚                                 β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”‚  Part: 0,4  β”‚    β”‚  Part: 1,5  β”‚    β”‚  Part: 2    β”‚    β”‚  Part: 3    β”‚                                 β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”‚  Heartbeat  β”‚    β”‚  Heartbeat  β”‚    β”‚  Heartbeat  β”‚    β”‚  Heartbeat  β”‚                                 β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β”‚  every 3s   β”‚    β”‚  every 3s   β”‚    β”‚  every 3s   β”‚    β”‚  every 3s   β”‚                                 β”‚   β”‚   β•‘
β•‘  β”‚  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                 β”‚   β”‚   β•‘
β•‘  β”‚  β”‚                                                                                                             β”‚   β•‘
β•‘  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β•‘
β•‘                                                                                                                            β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•

Architecture Diagram: Offset Management Flow

Architecture Diagram
╔══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
β•‘                                                                                                                            β•‘
β•‘                                OFFSET MANAGEMENT & DUAL-WRITE ARCHITECTURE                                                β•‘
β•‘                                                                                                                            β•‘
β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β”‚   CONSUMER PROCESSING PIPELINE                                                                                    β”‚   β•‘
β•‘  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β•‘
β•‘  β”‚   β”‚                                                                                                             β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚  POLL    β”‚    β”‚  DECODE  β”‚    β”‚  PROCESS β”‚    β”‚  VALIDATEβ”‚    β”‚  ENRICH  β”‚    β”‚  COMMIT  β”‚            β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚          │───►│          │───►│          │───►│          │───►│          │───►│          β”‚            β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚  fetch   β”‚    β”‚  deser   β”‚    β”‚  businessβ”‚    β”‚  schema  β”‚    β”‚  lookup  β”‚    β”‚  offset  β”‚            β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β”‚  records β”‚    β”‚  records β”‚    β”‚  logic   β”‚    β”‚  valid   β”‚    β”‚  cache   β”‚    β”‚  to ZK   β”‚            β”‚   β”‚   β•‘
β•‘  β”‚   β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”˜            β”‚   β”‚   β•‘
β•‘  β”‚   β”‚                                                                                           β”‚                β”‚   β”‚   β•‘
β•‘  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β•‘
β•‘  β”‚                                                                                               β”‚                    β”‚   β•‘
β•‘  β”‚                                                                                               β–Ό                    β”‚   β•‘
β•‘  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β•‘
β•‘  β”‚   β”‚                              OFFSET STORAGE OPTIONS                                                        β”‚   β”‚   β•‘
β•‘  β”‚   β”‚                                                                                                             β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  OPTION 1: AUTO-COMMIT              OPTION 2: MANUAL SYNC          OPTION 3: TRANSACTIONAL               β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”           β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  enable.auto.commit  β”‚           β”‚  commitSync()        β”‚       β”‚  beginTransaction()  β”‚              β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  = true              β”‚           β”‚  after processing    β”‚       β”‚  process records     β”‚              β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  auto.commit.intervalβ”‚           β”‚  every N records     β”‚       β”‚  commitTransaction() β”‚              β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  = 5000ms            β”‚           β”‚  or time-based       β”‚       β”‚  atomic with DB      β”‚              β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜           β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜              β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  ⚠️ May lose messages               βœ… Reliable, simpler            βœ… Exactly-once semantics              β”‚   β”‚   β•‘
β•‘  β”‚   β”‚                                                                                                             β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  OPTION 4: EXTERNAL STORAGE         OPTION 5: AT-LEAST-ONCE                                           β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”           β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                            β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  Store offsets in    β”‚           β”‚  Producer txn with   β”‚                                            β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  database (MySQL,    β”‚           β”‚  consumer offsets    β”‚                                            β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  PostgreSQL, Redis)  β”‚           β”‚  in same txn         β”‚                                            β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  + idempotent writes β”‚           β”‚                      β”‚                                            β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜           β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                            β”‚   β”‚   β•‘
β•‘  β”‚   β”‚  βœ… Complete control                 βœ… Combined with DB writes                                          β”‚   β”‚   β•‘
β•‘  β”‚   β”‚                                                                                                             β”‚   β”‚   β•‘
β•‘  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β•‘
β•‘                                                                                                                            β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•

Formal Definitions

DfProducer Batching

Producer batching is the accumulation of multiple records into a single network request before sending to the broker. The RecordAccumulator buffers records per partition, and the Sender thread drains these buffers when either batch.size is reached or linger.ms expires. Batching amortizes network overhead and enables compression across multiple records, significantly improving throughput at the cost of slight latency increase.

DfConsumer Lag

Consumer lag is the difference between the latest offset in a partition log and the consumer's last committed offset. It measures how far behind a consumer is from the producer's write position. Consumer lag = log_end_offset - committed_offset. Persistent lag indicates the consumer cannot keep up with the production rate, requiring scaling or optimization.

DfIdempotent Producer

An idempotent producer prevents duplicate messages during retries by assigning each producer a Producer ID (PID) and tracking per-partition sequence numbers. The broker rejects any message whose sequence number has already been seen, ensuring exactly-once delivery per partition even across retries and producer restarts.

Key Formulas

Ttextproducer=fracBtextbatchtimesNtextrecordsmax(Btextbatch,Ltextlinger)+LtextnetworkT_{\\text{producer}} = \\frac{B_{\\text{batch}} \\times N_{\\text{records}}}{\\max(B_{\\text{batch}}, L_{\\text{linger}}) + L_{\\text{network}}}

Consumer Processing Rate

Rtextconsumer=minleft(fracNtextpollTtextprocess,fracBtextfetchTtextfetchright)R_{\\text{consumer}} = \\min\\left(\\frac{N_{\\text{poll}}}{T_{\\text{process}}}, \\frac{B_{\\text{fetch}}}{T_{\\text{fetch}}}\\right)

Here,

  • NpollN_{\text{poll}}=Records per poll (max.poll.records)
  • TprocessT_{\text{process}}=Time to process one poll batch
  • BfetchB_{\text{fetch}}=Fetch buffer size (fetch.max.bytes)
  • TfetchT_{\text{fetch}}=Fetch round-trip time

Consumer Lag Growth Rate

fracd,textLagdt=Rtextproduceβˆ’Rtextconsume\\frac{d\\,\\text{Lag}}{dt} = R_{\\text{produce}} - R_{\\text{consume}}

Here,

  • RproduceR_{\text{produce}}=Message production rate (msg/sec)
  • RconsumeR_{\text{consume}}=Message consumption rate (msg/sec)

When consumer lag grows continuously (dLag/dt > 0), the consumer is falling behind. Solutions: (1) increase partition count and consumer instances, (2) optimize processing logic, (3) increase max.poll.records, (4) reduce per-record processing time.

The acks setting controls the durability-latency tradeoff: acks=0 (fire-and-forget, lowest latency, no durability), acks=1 (leader acknowledgment, moderate), acks=all (all ISR acknowledge, highest durability, highest latency). For exactly-once, use acks=all with idempotence enabled.

Detailed Explanation

The Kafka Producer API is a sophisticated client library designed for high-throughput, low-latency message publishing. At its core, the producer operates on an asynchronous, batching model where records are accumulated in a memory buffer before being sent in batches to the appropriate brokers. The producer maintains a RecordAccumulator that buffers records per partition, and a background Sender thread that drains these buffers and sends batched requests to the brokers. This architecture minimizes network overhead and enables efficient compression, as multiple records can be compressed together in a single batch.

The Partitioning Strategy is critical for both ordering and load distribution. By default, Kafka uses a murmur2 hash of the key modulo the number of partitions, ensuring that records with the same key always go to the same partition (and thus maintain ordering). Custom partitioners can be implemented for more sophisticated routing, such as sticky partitioning (which batches records to the same partition before switching) or region-aware partitioning. The partitioner is invoked after serialization but before batching, and the result determines which partition's accumulator buffer receives the record.

Producer Configurations heavily influence behavior. The acks setting determines durability guarantees: acks=0 provides no acknowledgment (fire-and-forget), acks=1 acknowledges when the leader writes to its local log, and acks=all acknowledges when all in-sync replicas write the record. The max.in.flight.requests.per.connection setting controls how many unacknowledged requests can be outstanding per broker connection, with values > 1 potentially causing reordering (unless idempotence is enabled). The retries and retry.backoff.ms settings control automatic retry behavior, and with idempotence enabled, the producer will automatically retry with deduplication to avoid duplicates.

Consumer Group Coordination is managed by a Group Coordinator broker that maintains the group's membership state and partition assignments. When a consumer joins or leaves, a rebalance is triggered using a two-phase protocol: first, consumers send JoinGroup requests with their subscriptions, then the coordinator elects a leader consumer (typically the first to join) who computes the partition assignment, and finally all consumers send SyncGroup requests to receive their assignments. The CooperativeStickyAssignor (recommended) performs incremental rebalancing, only revoking partitions that need to move, minimizing the impact on processing continuity.

Offset Management is crucial for at-least-once and exactly-once processing semantics. Offsets can be committed automatically (enable.auto.commit=true) or manually via commitSync() or commitAsync(). Manual commits provide more control, allowing consumers to commit only after successful processing. For transactional systems, offsets can be committed atomically with the business operation using sendOffsetsToTransaction(). The __consumer_offsets topic stores committed offsets, and the consumer reads these on startup to resume from the last committed position. The auto.offset.reset setting determines behavior when no committed offset exists: earliest reads from the beginning, latest reads from the end, and none throws an exception.

Consumer Polling and Processing follows a single-threaded model where poll() fetches batches of records, and the application processes them sequentially. The max.poll.records setting controls how many records are returned per poll, while max.poll.interval.ms sets the maximum time between polls before the consumer is considered failed and removed from the group. This design prevents slow consumers from blocking the group but requires careful tuning of processing time vs. poll interval. The isolation.level setting determines transactional visibility: read_uncommitted sees all records, while read_committed only sees records from committed transactions.

Key Concepts Table

ConceptDescriptionConfigurationImpact
Record AccumulatorIn-memory buffer for batching recordsbuffer.memory, batch.sizeThroughput vs. Memory
Sender ThreadBackground thread sending batcheslinger.ms, request.timeout.msLatency vs. Throughput
PartitionerDetermines target partition for recordpartitioner.classOrdering, Load Distribution
IdempotencePrevents duplicate records on retryenable.idempotenceExactly-once per partition
TransactionalAtomic multi-partition writestransactional.idExactly-once across partitions
Group CoordinatorBroker managing consumer groupsN/ARebalance coordination
HeartbeatConsumer liveness signalheartbeat.interval.msFailure detection speed
Session TimeoutConsumer failure thresholdsession.timeout.msFalse positive vs. Detection
Rebalance ListenerCallback for partition changespartition.assignment.strategyRebalance handling
Isolation LevelTransactional read semanticsisolation.levelConsistent reads

Code Examples

Advanced Producer with Custom Partitioner

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.*;

public class CustomPartitionerExample {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        
        // Use custom partitioner
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 
            RegionAwarePartitioner.class.getName());
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        // Send with custom headers for routing
        for (int i = 0; i < 100; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(
                "user-events",
                "user-" + (i % 10),
                "{\"userId\": " + i + ", \"action\": \"click\"}"
            );
            
            // Add headers for partitioning logic
            record.headers().add("region", ("us-east-" + (i % 3)).getBytes());
            record.headers().add("priority", (i % 5 == 0 ? "high" : "normal").getBytes());
            
            producer.send(record, (metadata, exception) -> {
                if (exception != null) {
                    System.err.println("Error: " + exception.getMessage());
                } else {
                    System.out.printf("Partition: %d, Offset: %d%n", 
                        metadata.partition(), metadata.offset());
                }
            });
        }
        
        producer.flush();
        producer.close();
    }
}

// Custom partitioner implementation
class RegionAwarePartitioner implements Partitioner {
    
    @Override
    public void configure(Map<String, ?> configs) {}
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, 
                         Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        
        // Default: hash-based partitioning
        if (keyBytes == null) {
            return ThreadLocalRandom.current().nextInt(numPartitions);
        }
        
        // Region-aware: distribute by region, then by key hash
        String region = extractRegionFromKey(key.toString());
        int regionHash = region.hashCode() % numPartitions;
        
        // Within region, use key hash
        int keyHash = Utils.murmur2(keyBytes) % numPartitions;
        
        // Combine for final partition
        return (regionHash + keyHash) % numPartitions;
    }
    
    private String extractRegionFromKey(String key) {
        // Extract region from key pattern
        return key.contains("us") ? "us" : key.contains("eu") ? "eu" : "other";
    }
    
    @Override
    public void close() {}
}

Consumer with Manual Offset Management and Retry

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

public class ManualOffsetConsumerExample {
    
    private static final int MAX_RETRIES = 3;
    private static final Map<TopicPartition, Long> failedOffsets = new ConcurrentHashMap<>();
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "retry-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        props.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, false);
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("order-events"), new RebalanceHandler());
        
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                
                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
                
                for (ConsumerRecord<String, String> record : records) {
                    try {
                        processRecordWithRetry(record);
                        
                        // Track successful offset
                        offsetsToCommit.put(
                            new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(record.offset() + 1, "processed")
                        );
                        
                    } catch (Exception e) {
                        System.err.printf("Failed to process record at offset %d: %s%n",
                            record.offset(), e.getMessage());
                        
                        // Track failed offset for later retry
                        failedOffsets.put(
                            new TopicPartition(record.topic(), record.partition()),
                            record.offset()
                        );
                        
                        // Don't commit this offset
                        break;
                    }
                }
                
                // Commit only successful offsets
                if (!offsetsToCommit.isEmpty()) {
                    consumer.commitSync(offsetsToCommit, Duration.ofSeconds(30));
                }
                
                // Process any failed records
                retryFailedRecords(consumer);
            }
        } finally {
            consumer.close();
        }
    }
    
    private static void processRecordWithRetry(ConsumerRecord<String, String> record) 
            throws Exception {
        int retries = 0;
        while (retries < MAX_RETRIES) {
            try {
                // Simulate processing
                if (record.offset() % 10 == 0) {
                    throw new RuntimeException("Simulated processing error");
                }
                System.out.printf("Processed: %s%n", record.value());
                return;
            } catch (Exception e) {
                retries++;
                if (retries >= MAX_RETRIES) {
                    throw e;
                }
                Thread.sleep(1000 * retries); // Exponential backoff
            }
        }
    }
    
    private static void retryFailedRecords(KafkaConsumer<String, String> consumer) {
        if (failedOffsets.isEmpty()) return;
        
        System.out.printf("Retrying %d failed records%n", failedOffsets.size());
        
        failedOffsets.forEach((partition, offset) -> {
            // Seek to failed offset for retry
            consumer.seek(partition, offset);
        });
        
        failedOffsets.clear();
    }
    
    static class RebalanceHandler implements ConsumerRebalanceListener {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            System.out.println("Partitions revoked: " + partitions);
        }
        
        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            System.out.println("Partitions assigned: " + partitions);
        }
    }
}

Transactional Producer-Consumer Example

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.*;

public class ExactlyOnceProducerConsumerExample {
    
    public static void main(String[] args) {
        // Transactional producer that reads from one topic and writes to another
        Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "eos-transformer-1");
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
        producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        
        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "eos-transformer-group");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
        
        producer.initTransactions();
        consumer.subscribe(Arrays.asList("input-events"));
        
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                if (records.isEmpty()) continue;
                
                producer.beginTransaction();
                try {
                    for (ConsumerRecord<String, String> record : records) {
                        // Transform the record
                        String transformedValue = transform(record.value());
                        
                        // Write to output topic
                        ProducerRecord<String, String> outputRecord = new ProducerRecord<>(
                            "output-events",
                            record.key(),
                            transformedValue
                        );
                        
                        // Add transformation metadata
                        outputRecord.headers().add("source-topic", record.topic().getBytes());
                        outputRecord.headers().add("source-offset", 
                            String.valueOf(record.offset()).getBytes());
                        outputRecord.headers().add("transformed-at", 
                            String.valueOf(System.currentTimeMillis()).getBytes());
                        
                        producer.send(outputRecord);
                    }
                    
                    // Commit offsets atomically with the transaction
                    consumer.commitSync(); // Commit to consumer's __consumer_offsets
                    producer.sendOffsetsToTransaction(
                        consumer.position(new TopicPartition("input-events", 0)),
                        "eos-transformer-group"
                    );
                    
                    producer.commitTransaction();
                    System.out.printf("Committed transaction for %d records%n", records.count());
                    
                } catch (Exception e) {
                    producer.abortTransaction();
                    System.err.println("Transaction aborted: " + e.getMessage());
                }
            }
        } finally {
            producer.close();
            consumer.close();
        }
    }
    
    private static String transform(String value) {
        // Example transformation: add processed timestamp
        return value.replace("}", ", \"processedAt\": " + System.currentTimeMillis() + "}");
    }
}

Performance Metrics

ScenarioThroughput (msg/sec)Latency (p99)Memory UsageCPU Usage
Async Batch (16KB)500,0005ms64MB30%
Sync (acks=all)50,00050ms32MB15%
Idempotent450,0008ms64MB35%
Transactional300,00015ms128MB40%
Compression (gzip)200,00010ms16MB50%
Compression (lz4)400,0007ms32MB25%
Consumer (single)100,0002ms32MB20%
Consumer (group=4)400,0005ms128MB80%

Best Practices

  1. Batch Size Optimization: Increase batch.size to 64KB-128KB for higher throughput, and use linger.ms=10-50 to allow batching. This trades slight latency for significantly higher throughput.

  2. Idempotent Producers: Always enable enable.idempotence=true in production. This provides exactly-once semantics per partition without performance overhead. Combine with acks=all and max.in.flight.requests.per.connection=5.

  3. Transaction Management: Use transactional producers when operations span multiple partitions/topics. Keep transaction scope small and avoid long-running transactions that block others.

  4. Consumer Group Sizing: Match consumer count to partition count. More consumers than partitions wastes resources. Use CooperativeStickyAssignor for minimal disruption during rebalances.

  5. Offset Commit Strategy: Disable auto-commit and commit manually after processing. Use commitSync() for critical offsets and commitAsync() for bulk commits. Never commit offsets for unprocessed records.

  6. Error Handling: Implement retry logic with exponential backoff for transient failures. Use a dead-letter queue (DLQ) for records that fail after max retries. Monitor consumer lag to detect processing bottlenecks.

  7. Memory Management: Configure max.partition.fetch.bytes to control memory usage per partition. Set fetch.min.bytes and fetch.max.wait.ms to batch fetch requests efficiently.

  8. Monitoring: Track producer metrics: record-send-rate, batch-size-avg, compression-rate-avg, request-latency-avg. Track consumer metrics: records-lag-max, records-consumed-rate, poll-rate.

  9. Security: Use SASL/SCRAM for authentication and SSL for encryption. Implement ACLs to restrict topic access. Use separate producers for different security zones.

  10. Testing: Use Testcontainers or embedded Kafka for integration tests. Test with realistic volumes and verify exactly-once semantics with idempotent consumer writes.

Key Takeaways:

  • Producer batching (batch.size + linger.ms) trades latency for throughput; typical optimal batch size is 64KB-128KB
  • Consumer lag = log_end_offset - committed_offset; sustained growth indicates consumer cannot keep up
  • Idempotent producers (enable.idempotence=true) prevent duplicates per partition without transactions
  • Partition assignment via CooperativeStickyAssignor minimizes rebalance disruption
  • Manual offset commits after processing prevent data loss; auto-commit risks duplicates on failure
  • Consumer throughput is bounded by min(poll_rate Γ— records_per_poll, fetch_rate)

See also: Data Engineering Pipeline patterns (data-engineering/019) | PySpark Structured Streaming (pyspark/11-structured-streaming)

Advertisement

Need Expert Kafka Help?

Get personalized streaming architecture, cluster tuning, or production Kafka consulting.

Advertisement