Kafka Streams & Connect API: Stream Processing and Data Integration

Free Lesson

Advertisement

Kafka Streams & Connect API: Stream Processing and Data Integration

Architecture Diagram: Kafka Streams Topology

Architecture Diagram
╔══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
β•‘                                                                                                                            β•‘
β•‘                           KAFKA STREAMS TOPOLOGY & PROCESSING ARCHITECTURE                                                β•‘
β•‘                                                                                                                            β•‘
β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β”‚   SOURCE TOPICS                                  PROCESSOR TOPOLOGY                               SINK TOPICS       β”‚   β•‘
β•‘  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                           β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚   β•‘
β•‘  β”‚   β”‚                 β”‚                           β”‚                                 β”‚                β”‚                 β”‚β”‚   β•‘
β•‘  β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                β”‚                β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚β”‚   β•‘
β•‘  β”‚   β”‚  β”‚raw-events β”‚  │───►│   SOURCE     β”‚      β”‚  β”‚              β”‚                β”‚                β”‚  β”‚enriched- β”‚  β”‚β”‚   β•‘
β•‘  β”‚   β”‚  β”‚ (Part: 6) β”‚  β”‚    β”‚   KSTREAM    β”‚      β”‚  β”‚   FILTER     β”‚                β”‚                β”‚  β”‚events    β”‚  β”‚β”‚   β•‘
β•‘  β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚    β”‚   (topic)    β”‚      β”‚  β”‚              β”‚                β”‚                β”‚  β”‚ (Part: 6)β”‚  β”‚β”‚   β•‘
β•‘  β”‚   β”‚                 β”‚    β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚  β”‚  filter(e ->  β”‚                β”‚                β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚β”‚   β•‘
β•‘  β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚           β”‚              β”‚  β”‚   e.isValid) β”‚                β”‚                β”‚                 β”‚β”‚   β•‘
β•‘  β”‚   β”‚  β”‚user-data  β”‚  β”‚           β”‚              β”‚  β”‚              β”‚                β”‚                β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚β”‚   β•‘
β•‘  β”‚   β”‚  β”‚ (Part: 3) β”‚  β”‚           β”‚              β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜                β”‚                β”‚  β”‚alerts     β”‚  β”‚β”‚   β•‘
β•‘  β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚           β–Ό              β”‚         β”‚                        β”‚                β”‚  β”‚ (Part: 3)β”‚  β”‚β”‚   β•‘
β•‘  β”‚   β”‚                 β”‚    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚         β–Ό                        β”‚                β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚β”‚   β•‘
β•‘  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚  KSTREAM     β”‚      β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                β”‚                β”‚                 β”‚β”‚   β•‘
β•‘  β”‚                          β”‚  - MAP       β”‚      β”‚  β”‚              β”‚                β”‚                β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚   β•‘
β•‘  β”‚                          β”‚  - FILTER    β”‚      β”‚  β”‚   JOIN       β”‚                β”‚                                      β”‚   β•‘
β•‘  β”‚                          β”‚  - FLATMAP   β”‚      β”‚  β”‚              β”‚                β”‚                                      β”‚   β•‘
β•‘  β”‚                          β”‚  - AGGREGATE β”‚      β”‚  β”‚  kstream-    β”‚                β”‚                                      β”‚   β•‘
β•‘  β”‚                          β”‚  - JOIN      β”‚      β”‚  β”‚  ktable join β”‚                β”‚                                      β”‚   β•‘
β•‘  β”‚                          β”‚              β”‚      β”‚  β”‚              β”‚                β”‚                                      β”‚   β•‘
β•‘  β”‚                          β”‚  KTABLE      β”‚      β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜                β”‚                                      β”‚   β•‘
β•‘  β”‚                          β”‚  - FILTER    β”‚      β”‚         β”‚                        β”‚                                      β”‚   β•‘
β•‘  β”‚                          β”‚  - MAPVALUES β”‚      β”‚         β–Ό                        β”‚                                      β”‚   β•‘
β•‘  β”‚                          β”‚  - JOIN      β”‚      β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                β”‚                                      β”‚   β•‘
β•‘  β”‚                          β”‚              β”‚      β”‚  β”‚              β”‚                β”‚                                      β”‚   β•‘
β•‘  β”‚                          β”‚  KGROUPED    β”‚      β”‚  β”‚   AGGREGATE  β”‚                β”‚                                      β”‚   β•‘
β•‘  β”‚                          β”‚  - COUNT     β”‚      β”‚  β”‚              β”‚                β”‚                                      β”‚   β•‘
β•‘  β”‚                          β”‚  - REDUCE    β”‚      β”‚  β”‚  windowedBy  β”‚                β”‚                                      β”‚   β•‘
β•‘  β”‚                          β”‚  - AGGREGATE β”‚      β”‚  β”‚  TimeWindows β”‚                β”‚                                      β”‚   β•‘
β•‘  β”‚                          β”‚              β”‚      β”‚  β”‚              β”‚                β”‚                                      β”‚   β•‘
β•‘  β”‚                          β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜                β”‚                                      β”‚   β•‘
β•‘  β”‚                                                  β”‚         β”‚                        β”‚                                      β”‚   β•‘
β•‘  β”‚                                                  β”‚         β–Ό                        β”‚                                      β”‚   β•‘
β•‘  β”‚                                                  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                β”‚                                      β”‚   β•‘
β•‘  β”‚                                                  β”‚  β”‚   TO         │────────────────┼──────► SINK TOPICS                  β”‚   β•‘
β•‘  β”‚                                                  β”‚  β”‚   (branch)   β”‚                β”‚                                      β”‚   β•‘
β•‘  β”‚                                                  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                β”‚                                      β”‚   β•‘
β•‘  β”‚                                                  β”‚                                 β”‚                                      β”‚   β•‘
β•‘  β”‚                                                  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                      β”‚   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β•‘
β•‘                                                                                                                            β•‘
β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β•‘
β•‘  β”‚                              LOCAL STATE STORES (RocksDB)                                                          β”‚   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚   β•‘
β•‘  β”‚   β”‚  user-state-store   β”‚    β”‚  window-agg-store    β”‚    β”‚  session-store       β”‚    β”‚  global-kv-store    β”‚      β”‚   β•‘
β•‘  β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚    β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚    β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚    β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚      β”‚   β•‘
β•‘  β”‚   β”‚  β”‚ Key: user_id   β”‚  β”‚    β”‚  β”‚ Key: window_keyβ”‚  β”‚    β”‚  β”‚ Key: session_idβ”‚  β”‚    β”‚  β”‚ Key: any_key   β”‚  β”‚      β”‚   β•‘
β•‘  β”‚   β”‚  β”‚ Val: UserState β”‚  β”‚    β”‚  β”‚ Val: AggResult β”‚  β”‚    β”‚  β”‚ Val: Session   β”‚  β”‚    β”‚  β”‚ Val: any_val   β”‚  β”‚      β”‚   β•‘
β•‘  β”‚   β”‚  β”‚ DB: RocksDB    β”‚  β”‚    β”‚  β”‚ DB: RocksDB    β”‚  β”‚    β”‚  β”‚ DB: RocksDB    β”‚  β”‚    β”‚  β”‚ DB: In-Memory  β”‚  β”‚      β”‚   β•‘
β•‘  β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚    β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚    β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚    β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚      β”‚   β•‘
β•‘  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β•‘
β•‘                                                                                                                            β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•

Architecture Diagram: Kafka Connect Framework

Architecture Diagram
╔══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
β•‘                                                                                                                            β•‘
β•‘                              KAFKA CONNECT FRAMEWORK ARCHITECTURE                                                          β•‘
β•‘                                                                                                                            β•‘
β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β”‚   EXTERNAL SYSTEMS                        CONNECT CLUSTER                         KAFKA CLUSTER                  β”‚   β•‘
β•‘  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”‚   β•‘
β•‘  β”‚   β”‚                 β”‚                    β”‚                                     β”‚   β”‚                 β”‚            β”‚   β•‘
β•‘  β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚            β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  MySQL     β”‚  │───►│ SOURCE   β”‚   β”‚   β”‚  REST API / CLI             β”‚   β”‚   β”‚  β”‚ topic-1   β”‚  β”‚            β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  Database  β”‚  β”‚    β”‚ Connectorβ”‚   β”‚   β”‚  POST /connectors           β”‚   β”‚   β”‚  β”‚ (3 parts) β”‚  β”‚            β”‚   β•‘
β•‘  β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚    β”‚          β”‚   β”‚   β”‚  GET /connectors            β”‚   β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚            β”‚   β•‘
β•‘  β”‚   β”‚                 β”‚    β”‚ Task 0   β”‚   β”‚   β”‚  PUT /connectors            β”‚   β”‚   β”‚                 β”‚            β”‚   β•‘
β•‘  β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚    β”‚ Task 1   β”‚   β”‚   β”‚  DELETE /connectors         β”‚   β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚            β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  PostgreSQLβ”‚  │───►│ Task 2   β”‚   β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚  β”‚ topic-2   β”‚  β”‚            β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  Database  β”‚  β”‚    β”‚          β”‚   β”‚                                     β”‚   β”‚  β”‚ (6 parts) β”‚  β”‚            β”‚   β•‘
β•‘  β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚            β”‚   β•‘
β•‘  β”‚   β”‚                 β”‚                    β”‚   β”‚  CONFIG PROVIDER            β”‚   β”‚   β”‚                 β”‚            β”‚   β•‘
β•‘  β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚                    β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚            β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  MongoDB   β”‚  β”‚                    β”‚   β”‚  β”‚  Vault             β”‚   β”‚   β”‚   β”‚  β”‚ topic-3   β”‚  β”‚            β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  NoSQL     β”‚  β”‚                    β”‚   β”‚  β”‚  AWS Secrets Mgr   β”‚   β”‚   β”‚   β”‚  β”‚ (3 parts) β”‚  β”‚            β”‚   β•‘
β•‘  β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚                    β”‚   β”‚  β”‚  Azure Key Vault   β”‚   β”‚   β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚            β”‚   β•‘
β•‘  β”‚   β”‚                 β”‚                    β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚   β”‚                 β”‚            β”‚   β•‘
β•‘  β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚                    β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜            β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  Elasticsearchβ”‚                    β”‚                                     β”‚                                  β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  (Search)  β”‚  β”‚                    β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚                                  β”‚   β•‘
β•‘  β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚                    β”‚   β”‚  CONVERTERS                 β”‚   β”‚                                  β”‚   β•‘
β•‘  β”‚   β”‚                 β”‚                    β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚                                  β”‚   β•‘
β•‘  β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚                    β”‚   β”‚  β”‚  Avro Converter    β”‚   β”‚   β”‚                                  β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  S3 Bucket β”‚  β”‚                    β”‚   β”‚  β”‚  JSON Converter   β”‚   β”‚   β”‚                                  β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  (Object)  β”‚  β”‚                    β”‚   β”‚  β”‚  Protobuf Convert β”‚   β”‚   β”‚                                  β”‚   β•‘
β•‘  β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚                    β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚                                  β”‚   β•‘
β•‘  β”‚   β”‚                 β”‚                    β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚                                  β”‚   β•‘
β•‘  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                    β”‚                                     β”‚                                  β”‚   β•‘
β•‘  β”‚                                          β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚                                  β”‚   β•‘
β•‘  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                    β”‚   β”‚  TRANSFORMS                 β”‚   β”‚                                  β”‚   β•‘
β•‘  β”‚   β”‚                 β”‚                    β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚                                  β”‚   β•‘
β•‘  β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚  β”‚  InsertField       β”‚   β”‚   β”‚                                  β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  Files     β”‚  │◄───│  SINK    β”‚   β”‚   β”‚  β”‚  ReplaceField     β”‚   β”‚   β”‚                                  β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  (CSV/JSON)β”‚  β”‚    β”‚ Connectorβ”‚   β”‚   β”‚  β”‚  MaskField        β”‚   β”‚   β”‚                                  β”‚   β•‘
β•‘  β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚    β”‚          β”‚   β”‚   β”‚  β”‚  TimestampRouter  β”‚   β”‚   β”‚                                  β”‚   β•‘
β•‘  β”‚   β”‚                 β”‚    β”‚ Task 0   β”‚   β”‚   β”‚  β”‚  Flatten          β”‚   β”‚   β”‚                                  β”‚   β•‘
β•‘  β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚    β”‚ Task 1   β”‚   β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚                                  β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  JDBC     β”‚  │◄───│          β”‚   β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚                                  β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  Target   β”‚  β”‚    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚                                     β”‚                                  β”‚   β•‘
β•‘  β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚                    β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚                                  β”‚   β•‘
β•‘  β”‚   β”‚                 β”‚                    β”‚   β”‚  DEAD LETTER QUEUE          β”‚   β”‚                                  β”‚   β•‘
β•‘  β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚                    β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚                                  β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  HDFS     β”‚  │◄───                 β”‚   β”‚  β”‚  errors-topic      β”‚   β”‚   β”‚                                  β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  (Data Lake)β”‚  β”‚                    β”‚   β”‚  β”‚  (for failed msgs) β”‚   β”‚   β”‚                                  β”‚   β•‘
β•‘  β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚                    β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚                                  β”‚   β•‘
β•‘  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                    β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚                                  β”‚   β•‘
β•‘  β”‚                                          β”‚                                     β”‚                                  β”‚   β•‘
β•‘  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β•‘
β•‘                                                                                                                            β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•

Architecture Diagram: Stream Processing Patterns

Architecture Diagram
╔══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
β•‘                                                                                                                            β•‘
β•‘                           STREAM PROCESSING PATTERNS & STATE MANAGEMENT                                                   β•‘
β•‘                                                                                                                            β•‘
β•‘  PATTERN 1: EVENT TIME WINDOWING                                                                                           β•‘
β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β”‚   Input Stream:   ───e1 (t=100)β”œβ”€β”€β”€e2 (t=150)β”œβ”€β”€β”€e3 (t=200)β”œβ”€β”€β”€e4 (t=250)β”œβ”€β”€β”€e5 (t=300)β”œβ”€β”€                       β”‚   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β”‚   Tumbling Window (size=100ms):                                                                                   β”‚   β•‘
β•‘  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                              β”‚   β•‘
β•‘  β”‚   β”‚  [0-100)          β”‚  [100-200)        β”‚  [200-300)        β”‚  [300-400)        β”‚                              β”‚   β•‘
β•‘  β”‚   β”‚  e1 (t=100)       β”‚  e2 (t=150)       β”‚  e3 (t=200)       β”‚  e5 (t=300)       β”‚                              β”‚   β•‘
β•‘  β”‚   β”‚  count=1          β”‚  count=1          β”‚  count=2          β”‚  count=1          β”‚                              β”‚   β•‘
β•‘  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                              β”‚   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β”‚   Hopping Window (size=100ms, advance=50ms):                                                                     β”‚   β•‘
β•‘  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                              β”‚   β•‘
β•‘  β”‚   β”‚  [0-100)          β”‚  [50-150)         β”‚  [100-200)        β”‚  [150-250)        β”‚                              β”‚   β•‘
β•‘  β”‚   β”‚  e1               β”‚  e1, e2           β”‚  e2, e3           β”‚  e3, e4           β”‚                              β”‚   β•‘
β•‘  β”‚   β”‚  count=1          β”‚  count=2          β”‚  count=2          β”‚  count=2          β”‚                              β”‚   β•‘
β•‘  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                              β”‚   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β”‚   Session Window (gap=50ms):                                                                                     β”‚   β•‘
β•‘  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                                  β”‚   β•‘
β•‘  β”‚   β”‚  Session 1        β”‚  Session 2        β”‚  Session 3        β”‚                                                  β”‚   β•‘
β•‘  β”‚   β”‚  e1 (t=100)       β”‚  e2-e4 (150-250)  β”‚  e5 (t=300)       β”‚                                                  β”‚   β•‘
β•‘  β”‚   β”‚  count=1          β”‚  count=3          β”‚  count=1          β”‚                                                  β”‚   β•‘
β•‘  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                                  β”‚   β•‘
β•‘  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β•‘
β•‘                                                                                                                            β•‘
β•‘  PATTERN 2: KSTREAM-KTABLE JOIN                                                                                           β•‘
β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β”‚   KStream (orders):        KTable (users):         Joined Result:                                                β”‚   β•‘
β•‘  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                           β”‚   β•‘
β•‘  β”‚   β”‚  order-1: user-A β”‚    β”‚  user-A: {premium}β”‚   β”‚  order-1: {      β”‚                                           β”‚   β•‘
β•‘  β”‚   β”‚  order-2: user-B β”‚    β”‚  user-B: {basic}  β”‚   β”‚    user: premium,β”‚                                           β”‚   β•‘
β•‘  β”‚   β”‚  order-3: user-A β”‚    β”‚  user-C: {premium}β”‚   β”‚    discount: 10% β”‚                                           β”‚   β•‘
β•‘  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚  }               β”‚                                           β”‚   β•‘
β•‘  β”‚                                                    β”‚  order-2: {      β”‚                                           β”‚   β•‘
β•‘  β”‚                                                    β”‚    user: basic,  β”‚                                           β”‚   β•‘
β•‘  β”‚                                                    β”‚    discount: 0%  β”‚                                           β”‚   β•‘
β•‘  β”‚                                                    β”‚  }               β”‚                                           β”‚   β•‘
β•‘  β”‚                                                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                           β”‚   β•‘
β•‘  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β•‘
β•‘                                                                                                                            β•‘
β•‘  PATTERN 3: EXACTLY-ONCE STREAM PROCESSING                                                                               β•‘
β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β•‘
β•‘  β”‚   β”‚                              TRANSACTION BOUNDARY                                                         β”‚  β”‚   β•‘
β•‘  β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚  β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  1. Read from source topic          ──►  position stored                                            β”‚   β”‚  β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  2. Process / Transform records     ──►  state updated in local store                                β”‚   β”‚  β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  3. Write to sink topic             ──►  output produced                                             β”‚   β”‚  β”‚   β•‘
β•‘  β”‚   β”‚  β”‚  4. Commit source offsets           ──►  offsets committed atomically                                 β”‚   β”‚  β”‚   β•‘
β•‘  β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚  β”‚   β•‘
β•‘  β”‚   β”‚                                                                                                           β”‚  β”‚   β•‘
β•‘  β”‚   β”‚  At any point: transaction can be aborted, and all effects (writes + offsets) are rolled back.            β”‚  β”‚   β•‘
β•‘  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β•‘
β•‘  β”‚                                                                                                                   β”‚   β•‘
β•‘  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β•‘
β•‘                                                                                                                            β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•

Formal Definitions

DfStream Processing Topology

A stream processing topology is a directed acyclic graph (DAG) of processors where nodes represent operations (source, transform, sink) and edges represent data flow. The topology defines how data moves from source topics through transformations to sink topics. Kafka Streams automatically partitions the topology across instances by assigning different input partitions to different instances.

DfWindowing

Windowing is the grouping of records by time into finite buckets for aggregation. Kafka Streams supports three window types: tumbling windows (fixed-size, non-overlapping), hopping windows (fixed-size, overlapping with configurable advance), and session windows (activity-based, variable-size with inactivity gap). Windowing enables time-bounded aggregations on unbounded streams.

DfKTable

A KTable is an abstraction of a changelog stream where each key has at most one value. It represents the latest state for each key, enabling point lookups and joins. Unlike KStream (append-only), KTable updates are upserts: a new value for a key replaces the previous one. KTable state is backed by a local RocksDB store with a changelog topic for fault tolerance.

Key Formulas

Ttexte2eleqTtextproduce+Ttextpoll+Ttextprocess+TtextcommitT_{\\text{e2e}} \\leq T_{\\text{produce}} + T_{\\text{poll}} + T_{\\text{process}} + T_{\\text{commit}}

Window Aggregation (Tumbling)

Atextwindow=bigopluseinWif(e)A_{\\text{window}} = \\bigoplus_{e \\in W_i} f(e)

Here,

  • WiW_i=Tumbling window [t_i, t_i + s)
  • ss=Window size (e.g., 5 minutes)
  • f(e)f(e)=Aggregation function applied to each event e
  • ⨁\bigoplus=Aggregation operator (count, sum, reduce, etc.)

Window Aggregation (Hopping)

Ai=bigopluseinWif(e),quadWi=[ti,ti+s)A_i = \\bigoplus_{e \\in W_i} f(e), \\quad W_i = [t_i, t_i + s)

Here,

  • ss=Window size
  • aa=Advance interval (a < s for overlap)
  • tit_i=Window start time, t_i = t_0 + i \cdot a

Kafka Streams uses event-time windowing by default (via WallclockTimestampExtractor or CustomTimestampExtractor). Watermarks track progress of event time; late-arriving data beyond the grace period is dropped. Configure grace period with TimeWindows.ofSizeWithNoGrace() or TimeWindows.ofSizeAndGrace().

For exactly-once stream processing, set processing.guarantee=exactly_once_v2. This automatically configures idempotent producers, transactional writes, and read_committed consumers within the Streams library. EOS v2 reduces latency by ~30% compared to v1.

Detailed Explanation

Kafka Streams is a client library for building stream processing applications on top of Kafka. Unlike traditional stream processing frameworks (Apache Flink, Apache Spark Streaming), Kafka Streams runs within the application process, requiring no separate cluster management. This embedded architecture simplifies deployment and scaling, as each instance of the application is a stream processor that participates in the consumer group for the input topics. The library provides both a high-level DSL (Domain Specific Language) for common operations and a Processor API for custom processing logic.

Topology and Processor DAG: A Kafka Streams application defines a processing topology as a directed acyclic graph (DAG) of processors. Each processor is either a source (reads from topics), a transformation (map, filter, aggregate), or a sink (writes to topics). The topology is automatically distributed across instances by assigning different partitions to different instances. The StreamsBuilder DSL automatically constructs this topology, while the Processor API allows building custom processor nodes with fine-grained control over state management and scheduling.

Stateful Operations: Kafka Streams supports several stateful operations that require local state storage. These include aggregations (count, reduce, aggregate), joins (KStream-KTable, KStream-KStream, KTable-KTable), and windowing (tumbling, hopping, session windows). State is stored in a local RocksDB instance, backed by a changelog topic in Kafka for fault tolerance. The changelog topic ensures that state can be rebuilt from Kafka if a node fails. The state.dir configuration determines where local state is stored, and num.rocksdb.background.threads controls background compaction.

Windowing Strategies: Event-time windowing groups records by their event timestamp rather than processing time. Kafka Streams supports tumbling windows (fixed-size, non-overlapping), hopping windows (fixed-size, overlapping), and session windows (activity-based, variable-size). Watermarks track the progress of event time, allowing the system to handle late-arriving data. The cache.max.bytes.buffering setting controls memory usage for window state, and commit.interval.ms determines how often state is flushed to the changelog topic.

Kafka Connect provides a framework for moving data between Kafka and external systems. Connectors are plugins that implement either SourceConnector (reading from external systems and writing to Kafka) or SinkConnector (reading from Kafka and writing to external systems) interfaces. The Connect framework handles offset management, fault tolerance, and parallelism. Each connector instance runs one or more tasks, and the Connect framework distributes tasks across worker nodes. Connectors can be standalone (single JVM) or distributed (cluster of workers), with the distributed mode providing automatic scaling and fault tolerance.

Connect Configuration and transforms: Connectors are configured via JSON configuration files or REST API. The framework supports Single Message Transforms (SMTs) for in-flight record transformation, converters for serialization format handling, and dead letter queues for error handling. SMTs are applied in order and can modify records before they reach the converter. Common SMTs include InsertField, ReplaceField, MaskField, TimestampRouter, and Flatten. Converters handle serialization between Kafka's internal representation and the external format (JSON, Avro, Protobuf).

Exactly-Once Semantics in Connect: Kafka Connect supports exactly-once semantics through the exactly.once.support configuration. When enabled, Sink connectors must implement exactly-once semantics in their put() and preCommit() methods. Source connectors must implement exactly-once support by providing a transactional ID and implementing the Transaction API. This ensures that records are not duplicated during task rebalancing or connector restarts.

Key Concepts Table

ComponentDescriptionKey ConfigurationsUse Cases
KStreamUnbounded stream of recordsprocessing.guaranteeEvent processing, real-time analytics
KTableChangelog stream (latest value per key)state.dir, cache.max.bytes.bufferingMaterialized views, join tables
GlobalKTableFull copy on every instancenum.stream.threadsReference data, dimension tables
TopologyDAG of processorsapplication.id, client.idProcessing pipeline definition
State StoreLocal RocksDB persistencerocksdb.block.cache.sizeStateful operations, windowing
Changelog TopicFault-tolerant state backupchangelog.configState recovery, fault tolerance
Source ConnectorRead from external systemstasks.max, connector.classCDC, file ingestion
Sink ConnectorWrite to external systemstopics, tasks.maxData export, search indexing
ConverterSerialization format handlingkey.converter, value.converterJSON, Avro, Protobuf
SMTSingle Message Transformtransforms, transforms.*Record modification, routing

Code Examples

Kafka Streams DSL Application

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Properties;

public class OrderProcessingStream {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processing-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        
        // Exactly-once semantics
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
            StreamsConfig.EXACTLY_ONCE_V2);
        
        // State store configuration
        props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
        
        // Thread and task configuration
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // Source: Raw order events
        KStream<String, String> orders = builder.stream("raw-orders",
            Consumed.with(Serdes.String(), Serdes.String())
                .withTimestampExtractor(new WallclockTimestampExtractor()));
        
        // Source: User data (KTable for joins)
        KTable<String, String> users = builder.table("user-data",
            Consumed.with(Serdes.String(), Serdes.String()));
        
        // Transform and enrich
        KStream<String, Order> enrichedOrders = orders
            .filter((key, value) -> value != null && !value.isEmpty())
            .mapValues(value -> parseOrder(value))
            .filter((key, order) -> order.getAmount() > 0)
            .join(users,
                (order, user) -> order.withUser(user),
                Joined.with(Serdes.String(), Serdes.String(), Serdes.String()));
        
        // Branch by order type
        Map<String, KStream<String, Order>> branches = enrichedOrders
            .branch((key, order) -> order.getType().equals("PREMIUM"),
                Branched.withFunction((stream) -> {
                    stream.mapValues(o -> o.applyDiscount(0.10))
                          .to("premium-orders", Produced.with(Serdes.String(), orderSerde));
                    return stream;
                }, "premium"))
            .branch((key, order) -> order.getType().equals("STANDARD"),
                Branched.withFunction((stream) -> {
                    stream.mapValues(o -> o.applyDiscount(0.0))
                          .to("standard-orders", Produced.with(Serdes.String(), orderSerde));
                    return stream;
                }, "standard"));
        
        // Aggregation: Count orders per user in 1-hour tumbling windows
        KTable<Windowed<String>, Long> orderCounts = enrichedOrders
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
            .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
                    "order-count-store")
                .withValueSerde(Serdes.Long()));
        
        // Aggregate: Total revenue per user in 5-minute hopping windows
        KTable<Windowed<String>, Double> revenueByUser = enrichedOrders
            .mapValues(order -> order.getAmount())
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5))
                .advanceBy(Duration.ofMinutes(1)))
            .reduce(Double::sum, Materialized.<String, Double, WindowStore<Bytes, byte[]>>as(
                    "revenue-store")
                .withValueSerde(Serdes.Double()));
        
        // Session windowing for activity tracking
        KTable<Windowed<String>, Long> sessionCounts = enrichedOrders
            .map((key, order) -> KeyValue.pair(order.getUserId(), order))
            .groupByKey()
            .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
            .count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as(
                    "session-store")
                .withValueSerde(Serdes.Long()));
        
        // Sink: Write enriched orders
        enrichedOrders
            .mapValues(order -> order.toJson())
            .to("enriched-orders", Produced.with(Serdes.String(), Serdes.String()));
        
        StreamsTopology topology = builder.build();
        System.out.println(topology.describe());
        
        KafkaStreams streams = new KafkaStreams(topology, props);
        streams.start();
        
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
    
    private static Order parseOrder(String json) {
        // JSON parsing implementation
        return new Order(); // Placeholder
    }
}

Kafka Connect Source Connector Configuration

{
  "name": "mysql-cdc-source",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql-primary.example.com",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "${file:/opt/kafka/secrets/mysql.properties:password}",
    "database.server.id": "184054",
    "database.server.name": "ecommerce",
    "database.include.list": "ecommerce",
    "table.include.list": "ecommerce.orders,ecommerce.users,ecommerce.products",
    
    "topic.prefix": "cdc",
    "topic.creation.default.replication.factor": 3,
    "topic.creation.default.partitions": 12,
    "topic.creation.default.cleanup.policy": "delete",
    "topic.creation.default.retention.ms": 604800000,
    
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    
    "transforms": "route,unwrap,addTimestamp",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "ecommerce\\.(.*)",
    "transforms.route.replacement": "cdc-$1",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.add.fields": "op,ts_ms,source.ts_ms",
    "transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.addTimestamp.timestamp.field": "processed_at",
    
    "heartbeat.interval.ms": "30000",
    "poll.interval.ms": "1000",
    "max.batch.size": "8192",
    "snapshot.mode": "initial",
    "snapshot.locking.mode": "none",
    "snapshot.select.statement.overrides": "ecommerce.orders",
    
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dlq-mysql-cdc",
    "errors.deadletterqueue.topic.replication.factor": 3,
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    
    "tasks.max": "3"
  }
}

Kafka Connect Sink Connector Configuration

{
  "name": "elasticsearch-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "connection.url": "http://elasticsearch:9200",
    "connection.username": "${file:/opt/kafka/secrets/es.properties:username}",
    "connection.password": "${file:/opt/kafka/secrets/es.properties:password}",
    
    "topics": "enriched-orders,enriched-users,enriched-products",
    "key.ignore": false,
    "schema.ignore": false,
    
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    
    "transforms": "extractKey,indexByDate,typeRoute",
    "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractKey.field": "order_id",
    "transforms.indexByDate.type": "org.apache.kafka.connect.transforms.TimestampRouter",
    "transforms.indexByDate.timestamp.format": "YYYY-MM-dd",
    "transforms.indexByDate.topic.format": "${topic}-$${timestamp}",
    "transforms.typeRoute.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.typeRoute.regex": "enriched-(.*)",
    "transforms.typeRoute.replacement": "kafka-$1",
    
    "batch.size": 200,
    "max.buffered.records": 500,
    "linger.ms": 1,
    "flush.timeout.ms": 10000,
    "max.retries": 5,
    "retry.backoff.ms": 100,
    
    "behavior.on.null.values": "delete",
    "behavior.on.malformed.documents": "warn",
    
    "write.method": "upsert",
    "document.id.strategy": "record_key",
    
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dlq-elasticsearch",
    "errors.deadletterqueue.topic.replication.factor": 3,
    
    "tasks.max": "3"
  }
}

Kafka Streams with Processor API (Custom Processor)

import org.apache.kafka.streams.processor.*;
import org.apache.kafka.streams.state.*;
import java.time.Duration;
import java.util.*;

public class FraudDetectionProcessor implements Processor<String, OrderEvent, String, Alert> {
    
    private ProcessorContext<String, Alert> context;
    private KeyValueStore<String, UserTransactionSummary> stateStore;
    private WindowStore<String, OrderEvent> recentOrders;
    
    @Override
    public void init(ProcessorContext<String, Alert> context) {
        this.context = context;
        this.stateStore = context.getStateStore("user-transaction-summary");
        this.recentOrders = context.getStateStore("recent-orders-window");
        
        // Schedule punctuations
        context.schedule(Duration.ofMinutes(5), 
            PunctuationType.WALL_CLOCK_TIME, 
            this::cleanupOldOrders);
        context.schedule(Duration.ofMinutes(1), 
            PunctuationType.WALL_CLOCK_TIME, 
            this::checkVelocity);
    }
    
    @Override
    public void process(Record<String, OrderEvent> record) {
        String userId = record.value().getUserId();
        OrderEvent event = record.value();
        
        // Store in window store for velocity checks
        recentOrders.put(record.key(), event, 
            Instant.ofEpochMilli(event.getTimestamp()));
        
        // Get or initialize user summary
        UserTransactionSummary summary = stateStore.get(userId);
        if (summary == null) {
            summary = new UserTransactionSummary(userId);
        }
        
        // Fraud detection logic
        Alert alert = null;
        
        // Rule 1: Amount threshold
        if (event.getAmount() > summary.getAverageAmount() * 3) {
            alert = new Alert(userId, "HIGH_AMOUNT", 
                "Transaction amount 3x above average", event);
        }
        
        // Rule 2: Velocity check (too many transactions)
        long recentCount = countRecentTransactions(userId, Duration.ofMinutes(5));
        if (recentCount > summary.getTransactionsPerMinute() * 5) {
            alert = new Alert(userId, "HIGH_VELOCITY", 
                "Transaction velocity 5x above normal", event);
        }
        
        // Rule 3: Geographic anomaly
        if (summary.getLastLocation() != null && 
            !summary.getLastLocation().equals(event.getLocation())) {
            alert = new Alert(userId, "GEO_ANOMALY", 
                "Location changed from " + summary.getLastLocation() + 
                " to " + event.getLocation(), event);
        }
        
        // Update summary
        summary.update(event);
        stateStore.put(userId, summary);
        
        // Forward alert if fraud detected
        if (alert != null) {
            context.forward(new Record<>(userId, alert, record.timestamp()));
        }
    }
    
    private long countRecentTransactions(String userId, Duration window) {
        Instant from = Instant.now().minus(window);
        Instant to = Instant.now();
        
        long count = 0;
        try (KeyValueIterator<Windowed<String>, OrderEvent> iterator = 
                recentOrders.fetch(userId, from, to)) {
            while (iterator.hasNext()) {
                count++;
                iterator.next();
            }
        }
        return count;
    }
    
    private void cleanupOldOrders(Instant timestamp) {
        Instant cutoff = timestamp.minus(Duration.ofHours(1));
        try (KeyValueIterator<Windowed<String>, OrderEvent> iterator = 
                recentOrders.all()) {
            while (iterator.hasNext()) {
                KeyValue<Windowed<String>, OrderEvent> entry = iterator.next();
                if (entry.key.window().endTime().isBefore(cutoff)) {
                    recentOrders.delete(entry.key);
                }
            }
        }
    }
    
    private void checkVelocity(Instant timestamp) {
        // Periodic velocity check logic
    }
    
    @Override
    public void close() {}
}

Performance Metrics

MetricKafka Streams (single instance)Kafka Streams (4 instances)Kafka Connect (distributed)
Throughput (msg/sec)100K400K300K
Latency (p99)10ms20ms50ms
State Store Size1GB1GB per instanceN/A
Memory (JVM Heap)4GB4GB per instance8GB per worker
CPU Usage40%30% per instance50% per worker
Changelog Topic Throughput50K msg/sec200K msg/secN/A
Rebalance Time30s60s120s
Startup Time10s15s30s

Best Practices

  1. Application ID and Instance Management: Use unique application.id per stream processing application. Each instance gets a unique client.id for monitoring. Scale by adding more instances to the same consumer group.

  2. State Store Optimization: Configure cache.max.bytes.buffering to balance memory usage vs. write amplification. Enable RocksDB bloom filters (rocksdb.block.cache.size) for faster lookups. Use rocksdb.write.buffer.size to tune write performance.

  3. Windowing Configuration: Choose window type based on use case: tumbling for fixed intervals, hopping for overlapping analysis, session for activity tracking. Configure grace periods to handle late events. Use TimeWindows.ofSizeWithNoGrace() when you want to reject late events.

  4. Exactly-Once Semantics: Enable processing.guarantee=exactly_once_v2 for critical applications. Understand that EOS increases latency and reduces throughput. Use idempotent sink operations and transactional producers.

  5. Connect Task Scaling: Set tasks.max based on the number of partitions for source connectors and external system capacity. Monitor connector health via REST API. Use dead letter queues for error handling instead of failing tasks.

  6. Converter Selection: Use Avro with Schema Registry for schema evolution and compatibility. JSON converters are simpler but lack schema enforcement. Protobuf is efficient but requires additional tooling.

  7. SMT Ordering: Apply SMTs in the correct order (e.g., route before transform). Limit the number of SMTs to avoid performance overhead. Use custom SMTs only when built-in options are insufficient.

  8. Monitoring: Track commit-latency-avg, poll-latency-avg, process-latency-avg for Streams. Track connector-failed-task-count, connector-total-task-count for Connect. Use Confluent Control Center or JMX metrics.

  9. Error Handling: Use errors.tolerance=all with dead letter queues for non-critical failures. Implement circuit breakers for external system dependencies. Monitor DLQ size to detect systemic issues.

  10. Testing: Use TopologyTestDriver for unit testing Kafka Streams topologies. Use Testcontainers for integration testing with real Kafka and external systems. Test with realistic data volumes and edge cases.

Key Takeaways:

  • Kafka Streams runs embedded in the application; no separate cluster needed β€” scale by adding instances to the same consumer group
  • End-to-end latency = produce + poll + process + commit; optimize each component for real-time requirements
  • Tumbling windows partition time into non-overlapping buckets; hopping windows allow overlap; session windows are activity-based
  • KTable provides latest-value-per-key semantics for joins and materialized views
  • Exactly-once via processing.guarantee=exactly_once_v2 handles idempotent production + transactional offsets automatically
  • Kafka Connect handles offset management and parallelism; set tasks.max based on source partitions or sink capacity

See also: Data Engineering Pipeline patterns (data-engineering/019) | PySpark Structured Streaming (pyspark/11-structured-streaming)

Advertisement

Need Expert Kafka Help?

Get personalized streaming architecture, cluster tuning, or production Kafka consulting.

Advertisement