11. Structured Streaming in PySpark

Free Lesson

Advertisement

11. Structured Streaming in PySpark

DfStructured Streaming

Structured Streaming is a stream processing engine built on the Spark SQL engine that treats a live data stream as a continuously appended unbounded table. It provides exactly-once fault tolerance guarantees and supports event-time processing via watermarks.

DfTrigger

A trigger defines when micro-batch processing occurs: FixedInterval (default 500ms), Once (process all available data), AvailableNow (process all available, then stop), or Continuous (low-latency processing).

DfWatermark

A watermark is a threshold that tracks the progress of event-time in a stream. It determines when to stop waiting for late-arriving data and trigger window aggregation. Watermark = max event time seen - delay threshold.

W=max(Etime)βˆ’DthresholdW = \\max(E_{time}) - D_{threshold}

Micro-Batch Processing Time

Tbatch=max(Tsource,Tprocessing,Tsink)T_{batch} = \\max(T_{source}, T_{processing}, T_{sink})

Here,

  • TbatchT_{batch}=Total micro-batch processing time
  • TsourceT_{source}=Time to read input data from source
  • TprocessingT_{processing}=Time to execute query transformations
  • TsinkT_{sink}=Time to write output to sink

Structured Streaming provides exactly-once semantics by coordinating source offsets, state updates, and sink writes within each micro-batch. For at-least-once sinks (e.g., Kafka), deduplication via event time and idempotent writes is required.

For low-latency use cases (< 100ms), use trigger(continuous="1 second") instead of micro-batch mode. However, continuous mode supports only a limited set of operations (no aggregations).

ThLate Data Handling

Theorem: With a watermark threshold of D_{threshold}, any event arriving more than D_{threshold} time units after the maximum event time seen will be dropped from window aggregations. Events arriving within D_{threshold} are included in the current window.

  • Structured Streaming = unbounded table with exactly-once guarantees
  • Triggers: FixedInterval (default), Once, AvailableNow, Continuous
  • Watermark = max(event_time) - delay_threshold; controls late data handling
  • Output modes: Append (new rows only), Complete (full result table), Update (changed rows)

πŸ—οΈ Streaming Architecture Diagram

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    STRUCTURED STREAMING ARCHITECTURE                     β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”‚
β”‚  β”‚   SOURCE     β”‚    β”‚  PROCESSING  β”‚    β”‚   SINK       β”‚              β”‚
β”‚  β”‚  (Kafka,     │───▢│  (Continuous │───▢│ (Console,    β”‚              β”‚
β”‚  β”‚   Files)     β”‚    β”‚   Queries)   β”‚    β”‚  Delta, etc) β”‚              β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜              β”‚
β”‚         β”‚                   β”‚                    β”‚                      β”‚
β”‚         β–Ό                   β–Ό                    β–Ό                      β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”‚
β”‚  β”‚  INPUT       β”‚    β”‚  EXECUTION   β”‚    β”‚  OUTPUT      β”‚              β”‚
β”‚  β”‚  STREAM      β”‚    β”‚  ENGINE      β”‚    β”‚  MODE        β”‚              β”‚
β”‚  β”‚  (Micro-     β”‚    β”‚  (DAG        β”‚    β”‚ (Append,     β”‚              β”‚
β”‚  β”‚   batch)     β”‚    β”‚   Planning)  β”‚    β”‚  Complete,   β”‚              β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚  Update)     β”‚              β”‚
β”‚         β”‚                   β”‚            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜              β”‚
β”‚         β–Ό                   β–Ό                    β”‚                      β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”           β”‚                      β”‚
β”‚  β”‚  DATAFRAME   β”‚    β”‚  CATALOG     β”‚           β”‚                      β”‚
β”‚  β”‚  API         β”‚    β”‚  (Schema     β”‚           β”‚                      β”‚
β”‚  β”‚  (Unified)   β”‚    β”‚   Registry)  β”‚           β”‚                      β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜           β”‚                      β”‚
β”‚                                                   β”‚                      β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚                      β”‚
β”‚  β”‚            CHECKPOINT & WRITE-AHEAD LOG       β”‚β”‚                      β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚β”‚                      β”‚
β”‚  β”‚  β”‚ Offset   β”‚  β”‚ Commit   β”‚  β”‚ Data     β”‚   β”‚β”‚                      β”‚
β”‚  β”‚  β”‚ Log      β”‚  β”‚ Log      β”‚  β”‚ Checkpointβ”‚  β”‚β”‚                      β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚β”‚                      β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚                      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ”„ Trigger Mechanisms Diagram

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                      TRIGGER TYPES IN STREAMING                         β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    DEFAULT TRIGGER (Micro-batch)                 β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”                        β”‚   β”‚
β”‚  β”‚  β”‚B 1 β”‚  β”‚B 2 β”‚  β”‚B 3 β”‚  β”‚B 4 β”‚  β”‚B 5 β”‚  ...                   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”¬β”€β”˜  β””β”€β”€β”¬β”€β”˜  β””β”€β”€β”¬β”€β”˜  β””β”€β”€β”¬β”€β”˜  β””β”€β”€β”¬β”€β”˜                        β”‚   β”‚
β”‚  β”‚     β””β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”˜                            β”‚   β”‚
β”‚  β”‚     Each batch processes all available data                      β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    FIXED INTERVAL TRIGGER                        β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  ──[10s]──▢  ──[10s]──▢  ──[10s]──▢  ──[10s]──▢                 β”‚   β”‚
β”‚  β”‚     β”‚          β”‚          β”‚          β”‚                           β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”                        β”‚   β”‚
β”‚  β”‚  β”‚B 1 β”‚     β”‚B 2 β”‚     β”‚B 3 β”‚     β”‚B 4 β”‚                        β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”˜                        β”‚   β”‚
β”‚  β”‚  Processes at fixed time intervals regardless of data            β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    ONCE TRIGGER (Single Batch)                   β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚                    ONE SHOT EXECUTION                    β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                          β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  Process all available data once, then terminate         β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                          β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  Input ──▢ [Process] ──▢ Output ──▢ STOP                β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    CONTINUOUS TRIGGER                            β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚   β”‚
β”‚  β”‚  β”‚  ═══════════════════════════════════════════════════    β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  Continuous processing with ~1ms latency               β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  ─────────────────────────────────────────────────────  β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  Records processed as they arrive                       β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  ═══════════════════════════════════════════════════    β”‚    β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ’§ Watermark Handling Diagram

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    WATERMARK HANDLING STRATEGY                           β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                         β”‚
β”‚  Event Time ──────────────────────────────────────────────────────▢    β”‚
β”‚                                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    WITHOUT WATERMARK                              β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  Events:  β”Œβ”€β”€β”€β” β”Œβ”€β”€β”€β” β”Œβ”€β”€β”€β” β”Œβ”€β”€β”€β” β”Œβ”€β”€β”€β” β”Œβ”€β”€β”€β”                   β”‚   β”‚
β”‚  β”‚           β”‚ 1 β”‚ β”‚ 3 β”‚ β”‚ 2 β”‚ β”‚ 5 β”‚ β”‚ 4 β”‚ β”‚ 6 β”‚                   β”‚   β”‚
β”‚  β”‚           β””β”€β”€β”€β”˜ β””β”€β”€β”€β”˜ β””β”€β”€β”€β”˜ β””β”€β”€β”€β”˜ β””β”€β”€β”€β”˜ β””β”€β”€β”€β”˜                   β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  State grows indefinitely ──▢ Potential OOM                      β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  STATE: [1,2,3,4,5,6,...] - Never cleaned                β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    WITH WATERMARK (delayThreshold = 10)         β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  Events:  β”Œβ”€β”€β”€β” β”Œβ”€β”€β”€β” β”Œβ”€β”€β”€β” β”Œβ”€β”€β”€β” β”Œβ”€β”€β”€β” β”Œβ”€β”€β”€β”                   β”‚   β”‚
β”‚  β”‚           β”‚ 1 β”‚ β”‚ 3 β”‚ β”‚ 2 β”‚ β”‚ 5 β”‚ β”‚ 4 β”‚ β”‚ 6 β”‚                   β”‚   β”‚
β”‚  β”‚           β””β”€β”€β”€β”˜ β””β”€β”€β”€β”˜ β””β”€β”€β”€β”˜ β””β”€β”€β”€β”˜ β””β”€β”€β”€β”˜ β””β”€β”€β”€β”˜                   β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  Watermark: ═══════════════════════════════════▢                 β”‚   β”‚
β”‚  β”‚             Max Event Time - 10 units                           β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  State: β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚   β”‚
β”‚  β”‚         β”‚  β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘ β”‚    β”‚   β”‚
β”‚  β”‚         β”‚  Only keeps relevant events                      β”‚    β”‚   β”‚
β”‚  β”‚         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  Late data (>10 units late) ──▢ Dropped or handled separately   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    WATERMARK PROPAGATION                         β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  Source ──▢ Event Time Watermark ──▢ Processing Time Watermark  β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”‚   β”‚
β”‚  β”‚  β”‚ Source  │───▢│ 15:30:00     │───▢│ 15:30:05     β”‚            β”‚   β”‚
β”‚  β”‚  β”‚ Event  β”‚    β”‚ (Event Time) β”‚    β”‚ (Process Time)β”‚            β”‚   β”‚
β”‚  β”‚  β”‚ Time   β”‚    β”‚              β”‚    β”‚              β”‚            β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜            β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  Watermark = max(event_time) - threshold                        β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ“š Detailed Explanation

Structured Streaming is a scalable, fault-tolerant stream processing engine built on the Spark SQL engine. It treats the streaming data as an unbounded table being continuously appended, allowing developers to use the same DataFrame/Dataset API for both batch and streaming queries. This unification dramatically simplifies the development of real-time data pipelines.

The core abstraction in Structured Streaming is the concept of an "input stream" that continuously receives data and a "result table" that is updated as new data arrives. The engine processes data in micro-batches or continuously, depending on the trigger configuration, and maintains exactly-once processing guarantees through checkpointing and write-ahead logs.

The trigger mechanism determines when the streaming query processes the next batch. The default trigger processes each batch as soon as the previous batch completes, maximizing throughput but potentially causing variable latency. Fixed interval triggers provide predictable processing intervals but may process empty batches. The ONCE trigger is useful for batch-style processing of streaming sources, while the continuous trigger provides the lowest latency at the cost of some functionality limitations.

Watermarking is a critical mechanism for handling late-arriving data. Without watermarks, the state for event-time windowing operations would grow indefinitely as the system waits for late events. Watermarks define a threshold beyond which events are considered too late to be included in aggregations, allowing the system to clean up state and provide memory guarantees.

The checkpoint mechanism is essential for fault tolerance. It persists the progress of the streaming query, including offsets, accumulated state, and committed offsets to the sink. In case of failures, the query can resume from the last checkpoint, ensuring exactly-once semantics.

Output modes determine how the results are written to the sink. Append mode only outputs new rows since the last trigger, Complete mode outputs the entire result table each time, and Update mode outputs only rows that were updated since the last trigger. The choice of output mode depends on the operation type and the sink's capabilities.

Performance tuning involves several considerations: partitioning of the source data, parallelism of the query, state store configuration for stateful operations, and memory management for large state. Monitoring streaming queries requires attention to processing times, input rates, and state sizes to detect potential bottlenecks.

Advanced features include multi-lingual queries, which allow mixing SQL and DataFrame operations, and streaming joins, which enable joining streaming data with batch data or other streams. These features expand the use cases for Structured Streaming beyond simple transformations.

πŸ“Š Key Concepts Table

ConceptDescriptionUse Case
Micro-batch ProcessingProcesses data in small batches at intervalsHigh throughput, moderate latency
Continuous ProcessingProcesses records individually as they arriveUltra-low latency (~1ms)
TriggerDetermines when a new batch is processedControl batch timing and frequency
WatermarkThreshold for handling late-arriving dataEvent-time windowing operations
Output ModeHow results are written to the sinkAppend, Complete, Update modes
CheckpointPersists query progress for fault toleranceExactly-once processing guarantees
State StoreStores state for stateful operationsWindow aggregations, joins
Event TimeTimestamp embedded in the dataTime-based windowing and ordering
Processing TimeTime when the record is processedMonitoring and debugging
Late DataEvents arriving after their watermark thresholdRequires special handling

πŸ’» Code Examples

Basic Structured Streaming Setup

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("StructuredStreamingExample") \
    .config("spark.sql.streaming.schemaInference", "true") \
    .config("spark.sql.streaming.checkpointLocation", "/checkpoint/path") \
    .getOrCreate()

# Read from a socket stream
lines = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Split the lines into words
words = lines.select(
    explode(
        split(lines.value, " ")
    ).alias("word")
)

# Count words in each batch
word_counts = words.groupBy("word").count()

# Start the streaming query
query = word_counts.writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime="10 seconds") \
    .start()

query.awaitTermination()

Windowed Aggregation with Watermark

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("WindowedAggregation") \
    .getOrCreate()

# Read from Kafka
lines = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "events") \
    .load()

# Parse JSON data
events = lines.select(
    col("value").cast("string").alias("json")
).select(
    from_json(col("json"), "timestamp TIMESTAMP, value DOUBLE").alias("data")
).select("data.*")

# Windowed aggregation with watermark
windowed_counts = events \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window("timestamp", "5 minutes", "1 minute"),
        "value"
    ).count()

# Write to console
query = windowed_counts.writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

Streaming Join with Batch Data

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("StreamingJoin") \
    .getOrCreate()

# Read streaming data from Kafka
stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "user_events") \
    .load() \
    .selectExpr("CAST(value AS STRING) as event_json") \
    .select(from_json(col("event_json"), "user_id INT, action STRING, timestamp TIMESTAMP").alias("data")) \
    .select("data.*")

# Read batch reference data
batch_df = spark.read.parquet("/path/to/user_profiles")

# Stream-stream join
joined_df = stream_df.alias("s").join(
    batch_df.alias("b"),
    col("s.user_id") == col("b.user_id"),
    "inner"
).select("s.user_id", "s.action", "b.profile_name", "s.timestamp")

# Write to Delta sink
query = joined_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoint/join") \
    .start("/output/path")

query.awaitTermination()

Advanced: Multi-Query Streaming

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("MultiQueryStreaming") \
    .getOrCreate()

# Single source
source_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "events") \
    .load()

# Multiple transformations from same source
parsed_df = source_df.select(
    col("value").cast("string").alias("json"),
    col("timestamp").alias("event_time")
).select(
    from_json(col("json"), "user_id INT, action STRING, amount DOUBLE").alias("data"),
    "event_time"
).select("data.*", "event_time")

# Query 1: Real-time aggregation
aggregated = parsed_df \
    .withWatermark("event_time", "5 minutes") \
    .groupBy(
        window("event_time", "1 minute"),
        "action"
    ).agg(
        sum("amount").alias("total_amount"),
        count("*").alias("event_count")
    )

# Query 2: Detailed event log
detailed_log = parsed_df.select(
    "user_id", "action", "amount", "event_time"
)

# Start multiple queries
query1 = aggregated.writeStream \
    .outputMode("update") \
    .format("console") \
    .start()

query2 = detailed_log.writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", "/checkpoint/detailed") \
    .start("/detailed_log_path")

# Wait for all queries
spark.streams.awaitAnyTermination()

πŸ“ˆ Performance Metrics

MetricMicro-batchContinuousNotes
Latency100ms-10s~1msDepends on batch interval
Throughput100K-1M records/sec10K-100K records/secMicro-batch higher throughput
Fault Recoveryseconds-minutessecondsCheckpoint-based recovery
State ManagementOptimizedLimitedMicro-batch has better state support
Exactly-onceYesYesBoth guarantee exactly-once
Resource UsageHigherLowerMicro-batch uses more resources

πŸ† Best Practices

  1. Always set checkpoint location - Critical for fault tolerance and exactly-once processing
  2. Use appropriate output mode - Append for simple aggregations, Complete for full result tables
  3. Configure watermarks - Essential for event-time windowing and state management
  4. Monitor streaming metrics - Track processing times, input rates, and state sizes
  5. Tune micro-batch interval - Balance between latency and throughput requirements
  6. Use schema inference carefully - Can be expensive; prefer explicit schemas when possible
  7. Handle late data explicitly - Define watermark thresholds based on data patterns
  8. Optimize state store - Configure RocksDB or other state stores for large state
  9. Use multiple queries - Separate different processing logic into multiple queries
  10. Test with realistic data - Validate performance with production-like data volumes

πŸ”— Related Topics

  • 12-state-management.mdx: Advanced stateful operations and checkpointing
  • 13-window-operations.mdx: Detailed windowing strategies and implementations
  • 14-merge-upsert.mdx: Delta Lake merge operations for streaming data
  • 15-data-quality.mdx: Data validation in streaming pipelines

See Also

  • Kafka Streams (kafka/03): Kafka integration with Structured Streaming
  • Data Engineering Streaming (data-engineering/022): End-to-end streaming pipeline architecture

Advertisement

Need Expert PySpark Help?

Get personalized Spark optimization, cluster tuning, or production data pipeline consulting.

Advertisement