Real-Time Analytics with PySpark

Free Lesson

Advertisement

Real-Time Analytics with PySpark

Architecture Diagram: Real-Time Analytics Stack

Architecture Diagram
╔══════════════════════════════════════════════════════════════════════════════════════════╗
β•‘                       REAL-TIME ANALYTICS ARCHITECTURE                                   β•‘
╠══════════════════════════════════════════════════════════════════════════════════════════╣
β•‘                                                                                          β•‘
β•‘  DATA SOURCES (Ingestion)                                                               β•‘
β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”         β•‘
β•‘  β”‚  IoT   β”‚ β”‚  Click β”‚ β”‚  App   β”‚ β”‚  Log   β”‚ β”‚  DB    β”‚ β”‚  API   β”‚ β”‚  File  β”‚         β•‘
β•‘  β”‚ Sensorsβ”‚ β”‚  Streamβ”‚ β”‚ Events β”‚ β”‚  Files β”‚ β”‚  CDC   β”‚ β”‚ Feeds  β”‚ β”‚ Drops  β”‚         β•‘
β•‘  β””β”€β”€β”€β”¬β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”¬β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”¬β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”¬β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”¬β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”¬β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”¬β”€β”€β”€β”€β”˜         β•‘
β•‘      β”‚          β”‚          β”‚          β”‚          β”‚          β”‚          β”‚                β•‘
β•‘      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                β•‘
β•‘                                  β”‚                                                      β•‘
β•‘                                  β–Ό                                                      β•‘
β•‘  ╔═════════════════════════════════════════════════════════════════════════════════╗   β•‘
β•‘  β•‘                    EVENT STREAMING PLATFORM (Apache Kafka)                       β•‘   β•‘
β•‘  β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β•‘   β•‘
β•‘  β•‘  β”‚                                                                           β”‚  β•‘   β•‘
β•‘  β•‘  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚  β•‘   β•‘
β•‘  β•‘  β”‚  β”‚ Topic:   β”‚  β”‚ Topic:   β”‚  β”‚ Topic:   β”‚  β”‚ Topic:   β”‚  β”‚ Topic:   β”‚   β”‚  β•‘   β•‘
β•‘  β•‘  β”‚  β”‚ events   β”‚  β”‚ metrics  β”‚  β”‚ alerts   β”‚  β”‚ commands β”‚  β”‚ cdc      β”‚   β”‚  β•‘   β•‘
β•‘  β•‘  β”‚  β”‚ (raw)    β”‚  β”‚ (typed)  β”‚  β”‚ (urgent) β”‚  β”‚ (ordered)β”‚  β”‚ (change) β”‚   β”‚  β•‘   β•‘
β•‘  β•‘  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚  β•‘   β•‘
β•‘  β•‘  β”‚                                                                           β”‚  β•‘   β•‘
β•‘  β•‘  β”‚  Retention: 7 days (raw), 30 days (aggregated)                            β”‚  β•‘   β•‘
β•‘  β•‘  β”‚  Partitions: 12-24 per topic (parallelism)                                β”‚  β•‘   β•‘
β•‘  β•‘  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β•‘   β•‘
β•‘  β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•   β•‘
β•‘                                  β”‚                                                      β•‘
β•‘                                  β–Ό                                                      β•‘
β•‘  ╔═════════════════════════════════════════════════════════════════════════════════╗   β•‘
β•‘  β•‘               PySpark Structured Streaming Engine                                 β•‘   β•‘
β•‘  β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β•‘   β•‘
β•‘  β•‘  β”‚                                                                           β”‚  β•‘   β•‘
β•‘  β•‘  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚  β•‘   β•‘
β•‘  β•‘  β”‚  β”‚   Micro-    β”‚  β”‚   Window    β”‚  β”‚   State     β”‚  β”‚   Output    β”‚     β”‚  β•‘   β•‘
β•‘  β•‘  β”‚  β”‚   Batch     β”‚  β”‚   Operationsβ”‚  β”‚   Managementβ”‚  β”‚   Mode      β”‚     β”‚  β•‘   β•‘
β•‘  β•‘  β”‚  β”‚   Processingβ”‚  β”‚             β”‚  β”‚             β”‚  β”‚             β”‚     β”‚  β•‘   β•‘
β•‘  β•‘  β”‚  β”‚             β”‚  β”‚ β€’ Tumbling  β”‚  β”‚ β€’ MapState  β”‚  β”‚ β€’ Append    β”‚     β”‚  β•‘   β•‘
β•‘  β•‘  β”‚  β”‚ β€’ Trigger   β”‚  β”‚ β€’ Sliding   β”‚  β”‚ β€’ ListState β”‚  β”‚ β€’ Update    β”‚     β”‚  β•‘   β•‘
β•‘  β•‘  β”‚  β”‚ β€’ Batch Sizeβ”‚  β”‚ β€’ Session   β”‚  β”‚ β€’ ValueStateβ”‚  β”‚ β€’ Complete  β”‚     β”‚  β•‘   β•‘
β•‘  β•‘  β”‚  β”‚ β€’ Watermark β”‚  β”‚ β€’ Global    β”‚  β”‚ β€’ TTL       β”‚  β”‚             β”‚     β”‚  β•‘   β•‘
β•‘  β•‘  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚  β•‘   β•‘
β•‘  β•‘  β”‚                                                                           β”‚  β•‘   β•‘
β•‘  β•‘  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚  β•‘   β•‘
β•‘  β•‘  β”‚  β”‚  ADAPTIVE QUERY EXECUTION (AQE)                                     β”‚  β”‚  β•‘   β•‘
β•‘  β•‘  β”‚  β”‚  β€’ Dynamic partition coalescing                                     β”‚  β”‚  β•‘   β•‘
β•‘  β•‘  β”‚  β”‚  β€’ Skew join optimization                                           β”‚  β”‚  β•‘   β•‘
β•‘  β•‘  β”‚  β”‚  β€’ Runtime SQL_adaptive coalescePartitions                          β”‚  β”‚  β•‘   β•‘
β•‘  β•‘  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚  β•‘   β•‘
β•‘  β•‘  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β•‘   β•‘
β•‘  β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•   β•‘
β•‘                                  β”‚                                                      β•‘
β•‘                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                        β•‘
β•‘                    β–Ό             β–Ό             β–Ό                                        β•‘
β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β•‘
β•‘  β”‚  DELTA LAKE TABLES   β”‚ β”‚  MATERIALIZED VIEWS  β”‚ β”‚  REAL-TIME           β”‚            β•‘
β•‘  β”‚  (Persistent State)  β”‚ β”‚  (Pre-computed Aggs) β”‚ β”‚  DASHBOARDS          β”‚            β•‘
β•‘  β”‚                      β”‚ β”‚                      β”‚ β”‚                      β”‚            β•‘
β•‘  β”‚  β€’ Streaming Sink    β”‚ β”‚  β€’ Tumbling Windows  β”‚ β”‚  β€’ Grafana           β”‚            β•‘
β•‘  β”‚  β€’ Exactly-Once      β”‚ β”‚  β€’ Session Windows   β”‚ β”‚  β€’ Apache Superset   β”‚            β•‘
β•‘  β”‚  β€’ Time Travel       β”‚ β”‚  β€’ Running Aggregatesβ”‚ β”‚  β€’ Custom Dashboards β”‚            β•‘
β•‘  β”‚  β€’ ACID Transactions β”‚ β”‚  β€’ Incremental Refreshβ”‚ β”‚  β€’ Alerting Rules    β”‚            β•‘
β•‘  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜            β•‘
β•‘                                                                                          β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•

Architecture Diagram: Streaming Window Operations

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    STREAMING WINDOW OPERATIONS                                        β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                                     β”‚
β”‚  INPUT STREAM (Events with timestamps)                                              β”‚
β”‚  ─────────────────────────────────────────────────────                              β”‚
β”‚  t=1    t=2    t=3    t=4    t=5    t=6    t=7    t=8    t=9    t=10              β”‚
β”‚  β”‚      β”‚      β”‚      β”‚      β”‚      β”‚      β”‚      β”‚      β”‚      β”‚                  β”‚
β”‚  ●      ●      ●      ●      ●      ●      ●      ●      ●      ●                β”‚
β”‚                                                                                     β”‚
β”‚  ═══════════════════════════════════════════════════════════════════════════════   β”‚
β”‚                                                                                     β”‚
β”‚  TUMBLING WINDOW (Non-overlapping, fixed-size)                                     β”‚
β”‚  ─────────────────────────────────────────────────────────────────────────          β”‚
β”‚  Window Size: 3, Slide: 3 (same as size = tumbling)                                β”‚
β”‚                                                                                     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”‚
β”‚  β”‚ Window [1-3]β”‚    β”‚ Window [4-6]β”‚    β”‚ Window [7-9]β”‚    β”‚ Window [10+]β”‚          β”‚
β”‚  β”‚ ● ● ●       β”‚    β”‚ ● ● ●       β”‚    β”‚ ● ● ●       β”‚    β”‚ ●           β”‚          β”‚
β”‚  β”‚ Count: 3    β”‚    β”‚ Count: 3    β”‚    β”‚ Count: 3    β”‚    β”‚ Count: 1    β”‚          β”‚
β”‚  β”‚ Sum: 6      β”‚    β”‚ Sum: 15     β”‚    β”‚ Sum: 24     β”‚    β”‚ Sum: 10     β”‚          β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜          β”‚
β”‚                                                                                     β”‚
β”‚  ═══════════════════════════════════════════════════════════════════════════════   β”‚
β”‚                                                                                     β”‚
β”‚  SLIDING WINDOW (Overlapping, fixed-size)                                          β”‚
β”‚  ─────────────────────────────────────────────────────────────────────────          β”‚
β”‚  Window Size: 4, Slide: 2                                                          β”‚
β”‚                                                                                     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”               β”‚
β”‚  β”‚ Window [1-4]      β”‚  β”‚ Window [3-6]      β”‚  β”‚ Window [5-8]      β”‚               β”‚
β”‚  β”‚ ● ● ● ●           β”‚  β”‚   ● ● ● ●         β”‚  β”‚     ● ● ● ●       β”‚               β”‚
β”‚  β”‚ Count: 4          β”‚  β”‚ Count: 4          β”‚  β”‚ Count: 4          β”‚               β”‚
β”‚  β”‚ Sum: 10           β”‚  β”‚ Sum: 18           β”‚  β”‚ Sum: 26           β”‚               β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜               β”‚
β”‚                                                                                     β”‚
β”‚  ═══════════════════════════════════════════════════════════════════════════════   β”‚
β”‚                                                                                     β”‚
β”‚  SESSION WINDOW (Dynamic size based on activity gaps)                               β”‚
β”‚  ─────────────────────────────────────────────────────────────────────────          β”‚
β”‚  Gap Threshold: 2                                                                   β”‚
β”‚                                                                                     β”‚
β”‚  ● ● ●   ●   ● ●   ● ● ● ●   ●                                                   β”‚
β”‚  β”‚ β”‚ β”‚   β”‚   β”‚ β”‚   β”‚ β”‚ β”‚ β”‚   β”‚                                                     β”‚
β”‚  β””β”€β”€β”¬β”€β”€β”˜  β””β”€β”€β”€β”΄β”€β”˜   β””β”€β”€β”€β”¬β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”˜                                             β”‚
β”‚     β”‚                   β”‚                                                           β”‚
β”‚  Session 1          Session 2      Session 3                                       β”‚
β”‚  (t=1-3)            (t=5-9)        (t=10+)                                        β”‚
β”‚  Count: 3           Count: 5       Count: 1                                        β”‚
β”‚  Duration: 2        Duration: 4    Duration: 0                                     β”‚
β”‚                                                                                     β”‚
β”‚  ═══════════════════════════════════════════════════════════════════════════════   β”‚
β”‚                                                                                     β”‚
β”‚  WATERMARK (Late event handling)                                                    β”‚
β”‚  ─────────────────────────────────────────────────────────────────────────          β”‚
β”‚                                                                                     β”‚
β”‚  Event Time:  t=1  t=2  t=3  t=4  t=5  t=6  t=7  t=8  t=9  t=10                  β”‚
β”‚  Watermark:   ─    ─    ─    ─    w=1  w=2  w=3  w=4  w=5  w=6                    β”‚
β”‚                                                                                     β”‚
β”‚  Events arriving:                                                                    β”‚
β”‚  t=1 β†’ OK (before watermark)                                                       β”‚
β”‚  t=2 β†’ OK (before watermark)                                                       β”‚
β”‚  t=7 β†’ OK (before watermark)                                                       β”‚
β”‚  t=1 β†’ LATE (after watermark, dropped or handled)                                  β”‚
β”‚                                                                                     β”‚
β”‚  Watermark = max event time seen - allowed lateness                                β”‚
β”‚  Events before watermark are considered "complete" for the window                  β”‚
β”‚                                                                                     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Architecture Diagram: Materialized View Incremental Refresh

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    INCREMENTAL MATERIALIZED VIEW REFRESH                             β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                                     β”‚
β”‚  FULL REFRESH (Traditional - Inefficient)                                           β”‚
β”‚  ─────────────────────────────────────────────────────                              β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”‚
β”‚  β”‚                                                                     β”‚            β”‚
β”‚  β”‚  Every refresh:                                                     β”‚            β”‚
β”‚  β”‚  1. Read ENTIRE source table (100M rows)                           β”‚            β”‚
β”‚  β”‚  2. Compute ALL aggregations from scratch                           β”‚            β”‚
β”‚  β”‚  3. Overwrite ENTIRE materialized view                              β”‚            β”‚
β”‚  β”‚                                                                     β”‚            β”‚
β”‚  β”‚  Cost: $50/refresh Γ— 24 refreshes/day = $1,200/day                β”‚            β”‚
β”‚  β”‚  Latency: 45 minutes per refresh                                    β”‚            β”‚
β”‚  β”‚                                                                     β”‚            β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜            β”‚
β”‚                                                                                     β”‚
β”‚  INCREMENTAL REFRESH (PySpark + Delta Lake - Efficient)                             β”‚
β”‚  ─────────────────────────────────────────────────────                              β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”‚
β”‚  β”‚                                                                     β”‚            β”‚
β”‚  β”‚  Every refresh:                                                     β”‚            β”‚
β”‚  β”‚  1. Read ONLY new/changed records (delta)                          β”‚            β”‚
β”‚  β”‚  2. Compute aggregations on DELTA only                              β”‚            β”‚
β”‚  β”‚  3. MERGE changes into existing materialized view                   β”‚            β”‚
β”‚  β”‚                                                                     β”‚            β”‚
β”‚  β”‚  Cost: $2/refresh Γ— 24 refreshes/day = $48/day                    β”‚            β”‚
β”‚  β”‚  Latency: 30 seconds per refresh                                    β”‚            β”‚
β”‚  β”‚                                                                     β”‚            β”‚
β”‚  β”‚  Savings: 96% cost reduction, 90x latency improvement              β”‚            β”‚
β”‚  β”‚                                                                     β”‚            β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜            β”‚
β”‚                                                                                     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    INCREMENTAL REFRESH FLOW                                  β”‚   β”‚
β”‚  β”‚                                                                             β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                β”‚   β”‚
β”‚  β”‚  β”‚ Source Table  │────▢│ Change       │────▢│ Aggregate    β”‚                β”‚   β”‚
β”‚  β”‚  β”‚ (Delta Lake)  β”‚     β”‚ Detection    β”‚     β”‚ Delta Only   β”‚                β”‚   β”‚
β”‚  β”‚  β”‚              β”‚     β”‚              β”‚     β”‚              β”‚                β”‚   β”‚
β”‚  β”‚  β”‚ 100M rows    β”‚     β”‚ 50K new rows β”‚     β”‚ 10K new aggs β”‚                β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜                β”‚   β”‚
β”‚  β”‚                                                    β”‚                        β”‚   β”‚
β”‚  β”‚                                                    β–Ό                        β”‚   β”‚
β”‚  β”‚                                            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                β”‚   β”‚
β”‚  β”‚                                            β”‚ MERGE into   β”‚                β”‚   β”‚
β”‚  β”‚                                            β”‚ Materialized β”‚                β”‚   β”‚
β”‚  β”‚                                            β”‚ View         β”‚                β”‚   β”‚
β”‚  β”‚                                            β”‚              β”‚                β”‚   β”‚
β”‚  β”‚                                            β”‚ 2M rows      β”‚                β”‚   β”‚
β”‚  β”‚                                            β”‚ (updated)    β”‚                β”‚   β”‚
β”‚  β”‚                                            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                β”‚   β”‚
β”‚  β”‚                                                                             β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                                     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    MATERIALIZED VIEW PATTERNS                                 β”‚   β”‚
β”‚  β”‚                                                                             β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Tumbling    β”‚  β”‚  Sliding     β”‚  β”‚  Session     β”‚  β”‚  Running     β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  Window      β”‚  β”‚  Window      β”‚  β”‚  Window      β”‚  β”‚  Aggregate   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚              β”‚  β”‚              β”‚  β”‚              β”‚  β”‚              β”‚   β”‚   β”‚
β”‚  β”‚  β”‚ Fixed-size   β”‚  β”‚ Overlapping  β”‚  β”‚ Dynamic-size β”‚  β”‚ Cumulative   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚ non-overlap  β”‚  β”‚ fixed-size   β”‚  β”‚ gap-based    β”‚  β”‚ over time    β”‚   β”‚   β”‚
β”‚  β”‚  β”‚              β”‚  β”‚              β”‚  β”‚              β”‚  β”‚              β”‚   β”‚   β”‚
β”‚  β”‚  β”‚ USE: Hourly  β”‚  β”‚ USE: 5-min   β”‚  β”‚ USE: User    β”‚  β”‚ USE: Daily   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚ reports      β”‚  β”‚ rolling avg  β”‚  β”‚ sessions     β”‚  β”‚ totals       β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                             β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                                     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Detailed Explanation

Real-time analytics with PySpark is achieved through Structured Streaming, a micro-batch processing engine built on the Spark SQL engine that processes streaming data as a series of small batch queries. Unlike traditional batch processing that operates on complete datasets, streaming analytics processes data incrementally, maintaining state across micro-batches to provide continuously updated results.

The fundamental abstraction in Structured Streaming is the streaming DataFrame β€” an unbounded table that grows as new data arrives. Each micro-batch processes a chunk of new data, applies transformations (select, filter, join, aggregate), and writes results to a sink (console, Kafka, Delta Lake, etc.). The engine manages fault tolerance through WAL (Write-Ahead Log) and checkpoint-based offset tracking, ensuring exactly-once processing semantics.

Windowed aggregations are the cornerstone of streaming analytics. PySpark supports three window types: tumbling windows (fixed-size, non-overlapping β€” ideal for hourly/daily reports), sliding windows (fixed-size, overlapping β€” ideal for rolling averages), and session windows (dynamic-size based on activity gaps β€” ideal for user session analysis). Each window type requires specifying a time column (event time) and a watermark (maximum lateness tolerance) to handle late-arriving events.

Watermarks are critical for bounded state management in streaming. Without watermarks, the engine must retain state for all possible event times indefinitely. A watermark defines the threshold beyond which late events are either dropped or handled separately. For example, a watermark of "10 minutes" means that events arriving more than 10 minutes after the maximum event time seen so far are considered late and may be dropped from window aggregations.

Materialized views in the streaming context are pre-computed aggregations that are incrementally updated as new data arrives. Instead of recomputing the entire aggregation on every refresh (full refresh), the engine tracks changes since the last refresh and applies only the delta (incremental refresh). This provides orders-of-magnitude improvements in both latency and cost. Delta Lake's MERGE operation enables this by allowing atomic upserts β€” insert new aggregation groups and update existing ones in a single transaction.

State management in Structured Streaming uses RocksDB (via StateStore) to maintain intermediate aggregation state across micro-batches. For simple aggregations (count, sum), state is minimal. For complex stateful operations (session windows, stream-stream joins), state can grow significantly and must be managed through TTL (Time-to-Live) configuration and periodic state cleanup.

The output mode determines how results are written: Append mode writes only new rows (ideal for non-aggregation queries), Update mode writes only rows that changed since the last trigger (ideal for aggregation queries), and Complete mode rewrites the entire result table (ideal for small result sets). The choice of output mode affects both correctness and performance.

Key Concepts Table

Mathematical Foundations

Definition: Streaming Window

A streaming window WW assigns each event ee with timestamp tt to a set of windows:

W(e)={w:t∈w.interval}W(e) = \{w : t \in w.\text{interval}\}

Types: Tumbling (non-overlapping), Sliding (overlapping), Session (activity-based).

Watermark Definition

A watermark w(t)w(t) is a monotonically non-decreasing function that estimates the maximum event time yet to arrive:

w(t)≀min⁑e∈unprocessede.event_timew(t) \leq \min_{e \in \text{unprocessed}} e.\text{event\_time}

Late events beyond watermark threshold Ξ΄\delta are dropped or routed to side output.

Window Correctness Theorem

A window computation is correct if:

  1. Every event assigned to window ww is processed exactly once
  2. No event is missed: βˆ€e:e.event_time∈w.intervalβ€…β€ŠβŸΉβ€…β€Še∈processed(w)\forall e: e.\text{event\_time} \in w.\text{interval} \implies e \in \text{processed}(w)
  3. Watermark progression guarantees eventual completion: lim⁑tβ†’βˆžw(t)=max⁑(e.event_time)\lim_{t \rightarrow \infty} w(t) = \max(e.\text{event\_time})

Materialized View Refresh

Incremental refresh cost for materialized view VV with source SS:

Costrefresh=βˆ£Ξ”Sβˆ£Γ—Ctransform+∣Vβˆ£Γ—Cmerge\text{Cost}_{\text{refresh}} = |\Delta S| \times C_{\text{transform}} + |V| \times C_{\text{merge}}

vs. full recomputation: Cfull=∣Sβˆ£Γ—CtransformC_{\text{full}} = |S| \times C_{\text{transform}}. Break-even when βˆ£Ξ”S∣/∣S∣<Cmerge/Ctransform|\Delta S| / |S| < C_{\text{merge}} / C_{\text{transform}}.

End-to-End Latency

Total latency from event arrival to query result:

Le2e=Lingest+Lprocess+Lcommit+LqueryL_{\text{e2e}} = L_{\text{ingest}} + L_{\text{process}} + L_{\text{commit}} + L_{\text{query}}

Target: Le2e<SLAL_{\text{e2e}} < \text{SLA} (typically seconds for real-time).

Key Insight

Structured Streaming uses micro-batch execution by default. For sub-second latency, use Continuous processing mode (experimental). The trade-off is exactly-once guarantees vs. latency.

Summary

Real-time analytics relies on windowing for temporal aggregation, watermarks for late data handling, and incremental refresh for materialized views. End-to-end latency is the sum of ingestion, processing, commit, and query phases. Window correctness requires exactly-once processing with watermark-driven completion.

Key Concepts Table (cont.)

ConceptDescriptionConfigurationUse Case
Micro-BatchDiscrete processing intervalstrigger(processingTime="10 seconds")General streaming
ContinuousLow-latency processingtrigger(processingMode="continuous", checkpointingInterval="1 second")Sub-second latency
WatermarkLate event tolerancewithWatermark("event_time", "10 minutes")Window aggregations
Tumbling WindowFixed non-overlapping windowswindow("event_time", "5 minutes")Hourly reports
Sliding WindowFixed overlapping windowswindow("event_time", "10 minutes", "5 minutes")Rolling averages
Session WindowDynamic gap-based windowsCustom implementationUser sessions
State StoreRocksDB-based state backendspark.sql.streaming.stateStore.providerClassStateful operations
CheckpointOffset + state persistence.option("checkpointLocation", "/path")Fault tolerance
TriggerMicro-batch frequency.trigger(processingTime="30 seconds")Throughput tuning
Output ModeResult writing strategy.outputMode("update"/"append"/"complete")Result correctness

Code Examples

Example 1: Real-Time Click Stream Analytics with Windowed Aggregations

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("RealTime-ClickAnalytics") \
    .config("spark.sql.streaming.stateStore.providerClass",
            "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") \
    .getOrCreate()

# Define click event schema
click_schema = StructType([
    StructField("user_id", StringType()),
    StructField("page_url", StringType()),
    StructField("action", StringType()),  # click, scroll, hover
    StructField("timestamp", TimestampType()),
    StructField("session_id", StringType()),
    StructField("device_type", StringType()),
    StructField("geo_location", StringType()),
])

# Read click stream from Kafka
click_stream = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "click-events")
    .option("startingOffsets", "latest")
    .load()
    .select(
        from_json(col("value").cast("string"), click_schema).alias("data")
    )
    .select("data.*")
    .withColumn("event_time", col("timestamp"))
)

# ─── Tumbling Window: Page views per 5-minute window ───
page_views_5min = (
    click_stream
    .withWatermark("event_time", "10 minutes")
    .groupBy(
        window(col("event_time"), "5 minutes"),
        col("page_url")
    )
    .agg(
        count("*").alias("view_count"),
        approx_count_distinct("user_id").alias("unique_users"),
        count(when(col("action") == "click", 1)).alias("click_count"),
    )
    .select(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("page_url"),
        col("view_count"),
        col("unique_users"),
        col("click_count"),
        (col("click_count") / col("view_count")).alias("click_through_rate"),
    )
)

# Write to Delta Lake (Update mode for aggregations)
(
    page_views_5min
    .writeStream
    .format("delta")
    .outputMode("update")
    .option("checkpointLocation", "/mnt/checkpoints/page_views_5min")
    .trigger(processingTime="30 seconds")
    .start("/mnt/analytics/gold/page_views_5min")
)

# ─── Sliding Window: Rolling 10-minute average with 5-minute slide ───
rolling_metrics = (
    click_stream
    .withWatermark("event_time", "15 minutes")
    .groupBy(
        window(col("event_time"), "10 minutes", "5 minutes"),
        col("device_type")
    )
    .agg(
        count("*").alias("total_events"),
        avg(when(col("action") == "click", 1).otherwise(0)).alias("avg_click_rate"),
        countDistinct("session_id").alias("active_sessions"),
    )
)

(
    rolling_metrics
    .writeStream
    .format("delta")
    .outputMode("update")
    .option("checkpointLocation", "/mnt/checkpoints/rolling_metrics")
    .trigger(processingTime="30 seconds")
    .start("/mnt/analytics/gold/rolling_metrics")
)

# ─── Session Window: User session analytics ───
# Custom session window implementation
from pyspark.sql.window import Window

session_events = (
    click_stream
    .withColumn("session_start",
        first(col("event_time")).over(
            Window.partitionBy("session_id")
            .orderBy("event_time")
            .rowsBetween(Window.unboundedPreceding, 0)
        )
    )
    .withColumn("gap_minutes",
        (col("event_time").cast("long") - col("session_start").cast("long")) / 60
    )
    .withColumn("new_session",
        when(col("gap_minutes") > 30, 1).otherwise(0)
    )
    .withColumn("session_group",
        sum("new_session").over(
            Window.partitionBy("user_id").orderBy("event_time")
        )
    )
)

session_analytics = (
    session_events
    .groupBy(
        col("user_id"),
        col("session_id"),
        col("session_start")
    )
    .agg(
        min("event_time").alias("session_start"),
        max("event_time").alias("session_end"),
        count("*").alias("event_count"),
        countDistinct("page_url").alias("pages_visited"),
        (max("event_time").cast("long") - min("event_time").cast("long")).alias(
            "session_duration_seconds"
        ),
    )
)

Example 2: Real-Time Anomaly Detection with Stateful Processing

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("RealTime-AnomalyDetection") \
    .getOrCreate()

# Read sensor data stream
sensor_schema = StructType([
    StructField("sensor_id", StringType()),
    StructField("metric_name", StringType()),
    StructField("value", DoubleType()),
    StructField("timestamp", TimestampType()),
])

sensor_stream = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "sensor-metrics")
    .load()
    .select(from_json(col("value").cast("string"), sensor_schema).alias("data"))
    .select("data.*")
)

# ─── Rolling Statistics for Anomaly Detection ───
windowed_stats = (
    sensor_stream
    .withWatermark("timestamp", "5 minutes")
    .groupBy(
        window(col("timestamp"), "15 minutes", "1 minute"),
        col("sensor_id"),
        col("metric_name"),
    )
    .agg(
        avg("value").alias("avg_value"),
        stddev("value").alias("stddev_value"),
        min("value").alias("min_value"),
        max("value").alias("max_value"),
        count("*").alias("reading_count"),
    )
    .withColumn("upper_bound", col("avg_value") + 3 * col("stddev_value"))
    .withColumn("lower_bound", col("avg_value") - 3 * col("stddev_value"))
)

# ─── Detect Anomalies in Real-Time ───
anomaly_stream = (
    sensor_stream
    .join(
        windowed_stats,
        (sensor_stream["sensor_id"] == windowed_stats["sensor_id"]) &
        (sensor_stream["metric_name"] == windowed_stats["metric_name"]) &
        (sensor_stream["timestamp"] >= windowed_stats["window.start"]) &
        (sensor_stream["timestamp"] <= windowed_stats["window.end"]),
        "left"
    )
    .withColumn("is_anomaly",
        (col("value") > col("upper_bound")) |
        (col("value") < col("lower_bound"))
    )
    .filter(col("is_anomaly") == True)
    .select(
        sensor_stream["sensor_id"],
        sensor_stream["metric_name"],
        sensor_stream["value"],
        sensor_stream["timestamp"],
        col("avg_value").alias("expected_value"),
        col("stddev_value").alias("expected_stddev"),
        (abs(col("value") - col("avg_value")) / col("stddev_value")).alias("z_score"),
    )
)

# Write anomalies to Delta Lake + send alerts
(
    anomaly_stream
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/mnt/checkpoints/anomaly_alerts")
    .trigger(processingTime="10 seconds")
    .start("/mnt/analytics/gold/anomaly_alerts")
)

# ─── Real-Time Dashboard Query (Read from Materialized View) ───
dashboard_metrics = spark.read.format("delta").load("/mnt/analytics/gold/page_views_5min")

# Top pages in last hour
top_pages = (
    dashboard_metrics
    .filter(col("window_start") >= current_timestamp() - expr("INTERVAL 1 HOUR"))
    .groupBy("page_url")
    .agg(
        sum("view_count").alias("total_views"),
        sum("click_count").alias("total_clicks"),
        avg("click_through_rate").alias("avg_ctr"),
    )
    .orderBy(desc("total_views"))
    .limit(20)
)

top_pages.show()

Example 3: Streaming JOIN between Two Data Streams

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("StreamStream-Join") \
    .getOrCreate()

# Stream 1: Click events
clicks = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "click-events")
    .load()
    .select(
        from_json(col("value").cast("string"), click_schema).alias("data")
    )
    .select("data.*")
    .withColumn("event_time", col("timestamp"))
    .withWatermark("event_time", "10 minutes")
)

# Stream 2: Purchase events
purchases = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "purchase-events")
    .load()
    .select(
        from_json(col("value").cast("string"), purchase_schema).alias("data")
    )
    .select("data.*")
    .withColumn("event_time", col("timestamp"))
    .withWatermark("event_time", "10 minutes")
)

# Stream-Stream Join: Match clicks to purchases within 30-minute window
click_purchase_joined = (
    clicks.alias("c")
    .join(
        purchases.alias("p"),
        expr("""
            c.user_id = p.user_id AND
            p.event_time BETWEEN c.event_time AND c.event_time + INTERVAL 30 MINUTES
        """),
        "left"
    )
    .select(
        col("c.user_id"),
        col("c.page_url").alias("clicked_page"),
        col("c.event_time").alias("click_time"),
        col("p.product_id"),
        col("p.amount").alias("purchase_amount"),
        col("p.event_time").alias("purchase_time"),
        (col("p.event_time").cast("long") - col("c.event_time").cast("long")).alias(
            "time_to_purchase_seconds"
        ),
    )
)

# Write joined stream to Delta Lake
(
    click_purchase_joined
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/mnt/checkpoints/click_purchase_join")
    .trigger(processingTime="30 seconds")
    .start("/mnt/analytics/gold/click_purchase_attribution")
)

Performance Metrics

MetricBatch ProcessingStreaming (Micro-Batch)Streaming (Continuous)Improvement
End-to-End Latency1-24 hours10-60 seconds100-500 ms99.9% reduction
Throughput (events/sec)100K (batch)500K (streaming)1M (continuous)10x improvement
Cost per Million Events0.50∣0.50 |0.10$0.0884% reduction
State Size (1B events)N/A (stateless)2-10 GB (RocksDB)2-10 GB (RocksDB)Bounded
Recovery TimeRe-run entire batchReplay from checkpointReplay from checkpointSame
Late Event HandlingN/A (re-process)Watermark-basedWatermark-basedNew capability
Concurrent Queries1 (batch)Multiple (streaming)Multiple (streaming)Scaling
Exactly-OnceDifficultCheckpoint-basedCheckpoint-basedGuaranteed
Resource Utilization100% during batch30-50% (micro-batch)60-80% (continuous)More efficient
Complex Event ProcessingNot supportedWindow aggregationsWindow aggregationsNew capability

Best Practices

  1. Use watermarks for all windowed aggregations β€” Without watermarks, state grows unbounded. Set watermarks to slightly larger than the maximum expected event lateness (e.g., 10 minutes for events that typically arrive within 5 minutes).

  2. Tune micro-batch size for latency vs. throughput β€” Smaller trigger intervals (1-10 seconds) reduce latency but increase overhead; larger intervals (30-60 seconds) improve throughput but increase latency. Profile your workload to find the optimal balance.

  3. Use Delta Lake as the primary streaming sink β€” Delta Lake provides exactly-once semantics, ACID transactions, time travel, and schema evolution. It integrates natively with Structured Streaming as both source and sink.

  4. Implement idempotent writes β€” Design streaming writes to be idempotent so that replaying from checkpoints produces the same results. Delta Lake's MERGE operation is inherently idempotent.

  5. Monitor state size growth β€” For stateful operations (windowed aggregations, stream-stream joins), monitor RocksDB state size. Set TTL on state entries and configure periodic state cleanup to prevent OOM errors.

  6. Use AQE for streaming β€” Enable Adaptive Query Execution (spark.sql.streaming.microBatchPartitions=200 and AQE settings) to dynamically optimize partition counts and handle data skew in streaming aggregations.

  7. Separate raw and aggregated streams β€” Write raw events to a Bronze layer first, then process aggregations in a separate streaming query. This provides fault isolation and enables reprocessing.

  8. Implement dead-letter queues β€” Route events that fail deserialization or validation to a dead-letter topic/table for manual inspection rather than blocking the entire pipeline.

  9. Use structured streaming for CDC β€” Combine Debezium CDC with Structured Streaming to apply database changes to Delta Lake targets in near-real-time with exactly-once semantics.

  10. Optimize for your query pattern β€” For dashboards that query recent data, partition by time and use Z-ORDER on frequently filtered columns. For point-in-time queries, leverage Delta Lake time travel.

See also: Snowflake Time Travel (snowflake/02), Kafka CDC (kafka/04), Airflow DAGs (airflow/02)

Advertisement

Need Expert PySpark Help?

Get personalized Spark optimization, cluster tuning, or production data pipeline consulting.

Advertisement