11. Structured Streaming in PySpark
DfStructured Streaming
Structured Streaming is a stream processing engine built on the Spark SQL engine that treats a live data stream as a continuously appended unbounded table. It provides exactly-once fault tolerance guarantees and supports event-time processing via watermarks.
DfTrigger
A trigger defines when micro-batch processing occurs: FixedInterval (default 500ms), Once (process all available data), AvailableNow (process all available, then stop), or Continuous (low-latency processing).
DfWatermark
A watermark is a threshold that tracks the progress of event-time in a stream. It determines when to stop waiting for late-arriving data and trigger window aggregation. Watermark = max event time seen - delay threshold.
Micro-Batch Processing Time
Here,
- =Total micro-batch processing time
- =Time to read input data from source
- =Time to execute query transformations
- =Time to write output to sink
Structured Streaming provides exactly-once semantics by coordinating source offsets, state updates, and sink writes within each micro-batch. For at-least-once sinks (e.g., Kafka), deduplication via event time and idempotent writes is required.
For low-latency use cases (< 100ms), use trigger(continuous="1 second") instead of micro-batch mode. However, continuous mode supports only a limited set of operations (no aggregations).
ThLate Data Handling
Theorem: With a watermark threshold of D_{threshold}, any event arriving more than D_{threshold} time units after the maximum event time seen will be dropped from window aggregations. Events arriving within D_{threshold} are included in the current window.
- Structured Streaming = unbounded table with exactly-once guarantees
- Triggers: FixedInterval (default), Once, AvailableNow, Continuous
- Watermark = max(event_time) - delay_threshold; controls late data handling
- Output modes: Append (new rows only), Complete (full result table), Update (changed rows)
ποΈ Streaming Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β STRUCTURED STREAMING ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β SOURCE β β PROCESSING β β SINK β β
β β (Kafka, βββββΆβ (Continuous βββββΆβ (Console, β β
β β Files) β β Queries) β β Delta, etc) β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β β β β
β βΌ βΌ βΌ β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β INPUT β β EXECUTION β β OUTPUT β β
β β STREAM β β ENGINE β β MODE β β
β β (Micro- β β (DAG β β (Append, β β
β β batch) β β Planning) β β Complete, β β
β ββββββββββββββββ ββββββββββββββββ β Update) β β
β β β ββββββββββββββββ β
β βΌ βΌ β β
β ββββββββββββββββ ββββββββββββββββ β β
β β DATAFRAME β β CATALOG β β β
β β API β β (Schema β β β
β β (Unified) β β Registry) β β β
β ββββββββββββββββ ββββββββββββββββ β β
β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββ β
β β CHECKPOINT & WRITE-AHEAD LOG ββ β
β β ββββββββββββ ββββββββββββ ββββββββββββ ββ β
β β β Offset β β Commit β β Data β ββ β
β β β Log β β Log β β Checkpointβ ββ β
β β ββββββββββββ ββββββββββββ ββββββββββββ ββ β
β βββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Trigger Mechanisms Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β TRIGGER TYPES IN STREAMING β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β DEFAULT TRIGGER (Micro-batch) β β
β β β β
β β ββββββ ββββββ ββββββ ββββββ ββββββ β β
β β βB 1 β βB 2 β βB 3 β βB 4 β βB 5 β ... β β
β β ββββ¬ββ ββββ¬ββ ββββ¬ββ ββββ¬ββ ββββ¬ββ β β
β β βββββββββ΄ββββββββ΄ββββββββ΄ββββββββ β β
β β Each batch processes all available data β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β FIXED INTERVAL TRIGGER β β
β β β β
β β ββ[10s]βββΆ ββ[10s]βββΆ ββ[10s]βββΆ ββ[10s]βββΆ β β
β β β β β β β β
β β ββββββ ββββββ ββββββ ββββββ β β
β β βB 1 β βB 2 β βB 3 β βB 4 β β β
β β ββββββ ββββββ ββββββ ββββββ β β
β β Processes at fixed time intervals regardless of data β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β ONCE TRIGGER (Single Batch) β β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β ONE SHOT EXECUTION β β β
β β β β β β
β β β Process all available data once, then terminate β β β
β β β β β β
β β β Input βββΆ [Process] βββΆ Output βββΆ STOP β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β CONTINUOUS TRIGGER β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β Continuous processing with ~1ms latency β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β Records processed as they arrive β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π§ Watermark Handling Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β WATERMARK HANDLING STRATEGY β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Event Time βββββββββββββββββββββββββββββββββββββββββββββββββββββββΆ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β WITHOUT WATERMARK β β
β β β β
β β Events: βββββ βββββ βββββ βββββ βββββ βββββ β β
β β β 1 β β 3 β β 2 β β 5 β β 4 β β 6 β β β
β β βββββ βββββ βββββ βββββ βββββ βββββ β β
β β β β
β β State grows indefinitely βββΆ Potential OOM β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β STATE: [1,2,3,4,5,6,...] - Never cleaned β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β WITH WATERMARK (delayThreshold = 10) β β
β β β β
β β Events: βββββ βββββ βββββ βββββ βββββ βββββ β β
β β β 1 β β 3 β β 2 β β 5 β β 4 β β 6 β β β
β β βββββ βββββ βββββ βββββ βββββ βββββ β β
β β β β
β β Watermark: ββββββββββββββββββββββββββββββββββββΆ β β
β β Max Event Time - 10 units β β
β β β β
β β State: ββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β ββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β Only keeps relevant events β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β Late data (>10 units late) βββΆ Dropped or handled separately β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β WATERMARK PROPAGATION β β
β β β β
β β Source βββΆ Event Time Watermark βββΆ Processing Time Watermark β β
β β β β
β β βββββββββββ ββββββββββββββββ ββββββββββββββββ β β
β β β Source βββββΆβ 15:30:00 βββββΆβ 15:30:05 β β β
β β β Event β β (Event Time) β β (Process Time)β β β
β β β Time β β β β β β β
β β βββββββββββ ββββββββββββββββ ββββββββββββββββ β β
β β β β
β β Watermark = max(event_time) - threshold β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Detailed Explanation
Structured Streaming is a scalable, fault-tolerant stream processing engine built on the Spark SQL engine. It treats the streaming data as an unbounded table being continuously appended, allowing developers to use the same DataFrame/Dataset API for both batch and streaming queries. This unification dramatically simplifies the development of real-time data pipelines.
The core abstraction in Structured Streaming is the concept of an "input stream" that continuously receives data and a "result table" that is updated as new data arrives. The engine processes data in micro-batches or continuously, depending on the trigger configuration, and maintains exactly-once processing guarantees through checkpointing and write-ahead logs.
The trigger mechanism determines when the streaming query processes the next batch. The default trigger processes each batch as soon as the previous batch completes, maximizing throughput but potentially causing variable latency. Fixed interval triggers provide predictable processing intervals but may process empty batches. The ONCE trigger is useful for batch-style processing of streaming sources, while the continuous trigger provides the lowest latency at the cost of some functionality limitations.
Watermarking is a critical mechanism for handling late-arriving data. Without watermarks, the state for event-time windowing operations would grow indefinitely as the system waits for late events. Watermarks define a threshold beyond which events are considered too late to be included in aggregations, allowing the system to clean up state and provide memory guarantees.
The checkpoint mechanism is essential for fault tolerance. It persists the progress of the streaming query, including offsets, accumulated state, and committed offsets to the sink. In case of failures, the query can resume from the last checkpoint, ensuring exactly-once semantics.
Output modes determine how the results are written to the sink. Append mode only outputs new rows since the last trigger, Complete mode outputs the entire result table each time, and Update mode outputs only rows that were updated since the last trigger. The choice of output mode depends on the operation type and the sink's capabilities.
Performance tuning involves several considerations: partitioning of the source data, parallelism of the query, state store configuration for stateful operations, and memory management for large state. Monitoring streaming queries requires attention to processing times, input rates, and state sizes to detect potential bottlenecks.
Advanced features include multi-lingual queries, which allow mixing SQL and DataFrame operations, and streaming joins, which enable joining streaming data with batch data or other streams. These features expand the use cases for Structured Streaming beyond simple transformations.
π Key Concepts Table
| Concept | Description | Use Case |
|---|---|---|
| Micro-batch Processing | Processes data in small batches at intervals | High throughput, moderate latency |
| Continuous Processing | Processes records individually as they arrive | Ultra-low latency (~1ms) |
| Trigger | Determines when a new batch is processed | Control batch timing and frequency |
| Watermark | Threshold for handling late-arriving data | Event-time windowing operations |
| Output Mode | How results are written to the sink | Append, Complete, Update modes |
| Checkpoint | Persists query progress for fault tolerance | Exactly-once processing guarantees |
| State Store | Stores state for stateful operations | Window aggregations, joins |
| Event Time | Timestamp embedded in the data | Time-based windowing and ordering |
| Processing Time | Time when the record is processed | Monitoring and debugging |
| Late Data | Events arriving after their watermark threshold | Requires special handling |
π» Code Examples
Basic Structured Streaming Setup
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# Initialize Spark Session
spark = SparkSession.builder \
.appName("StructuredStreamingExample") \
.config("spark.sql.streaming.schemaInference", "true") \
.config("spark.sql.streaming.checkpointLocation", "/checkpoint/path") \
.getOrCreate()
# Read from a socket stream
lines = spark.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
# Split the lines into words
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# Count words in each batch
word_counts = words.groupBy("word").count()
# Start the streaming query
query = word_counts.writeStream \
.outputMode("complete") \
.format("console") \
.trigger(processingTime="10 seconds") \
.start()
query.awaitTermination()
Windowed Aggregation with Watermark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("WindowedAggregation") \
.getOrCreate()
# Read from Kafka
lines = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events") \
.load()
# Parse JSON data
events = lines.select(
col("value").cast("string").alias("json")
).select(
from_json(col("json"), "timestamp TIMESTAMP, value DOUBLE").alias("data")
).select("data.*")
# Windowed aggregation with watermark
windowed_counts = events \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window("timestamp", "5 minutes", "1 minute"),
"value"
).count()
# Write to console
query = windowed_counts.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", "false") \
.start()
query.awaitTermination()
Streaming Join with Batch Data
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("StreamingJoin") \
.getOrCreate()
# Read streaming data from Kafka
stream_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "user_events") \
.load() \
.selectExpr("CAST(value AS STRING) as event_json") \
.select(from_json(col("event_json"), "user_id INT, action STRING, timestamp TIMESTAMP").alias("data")) \
.select("data.*")
# Read batch reference data
batch_df = spark.read.parquet("/path/to/user_profiles")
# Stream-stream join
joined_df = stream_df.alias("s").join(
batch_df.alias("b"),
col("s.user_id") == col("b.user_id"),
"inner"
).select("s.user_id", "s.action", "b.profile_name", "s.timestamp")
# Write to Delta sink
query = joined_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoint/join") \
.start("/output/path")
query.awaitTermination()
Advanced: Multi-Query Streaming
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("MultiQueryStreaming") \
.getOrCreate()
# Single source
source_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events") \
.load()
# Multiple transformations from same source
parsed_df = source_df.select(
col("value").cast("string").alias("json"),
col("timestamp").alias("event_time")
).select(
from_json(col("json"), "user_id INT, action STRING, amount DOUBLE").alias("data"),
"event_time"
).select("data.*", "event_time")
# Query 1: Real-time aggregation
aggregated = parsed_df \
.withWatermark("event_time", "5 minutes") \
.groupBy(
window("event_time", "1 minute"),
"action"
).agg(
sum("amount").alias("total_amount"),
count("*").alias("event_count")
)
# Query 2: Detailed event log
detailed_log = parsed_df.select(
"user_id", "action", "amount", "event_time"
)
# Start multiple queries
query1 = aggregated.writeStream \
.outputMode("update") \
.format("console") \
.start()
query2 = detailed_log.writeStream \
.outputMode("append") \
.format("delta") \
.option("checkpointLocation", "/checkpoint/detailed") \
.start("/detailed_log_path")
# Wait for all queries
spark.streams.awaitAnyTermination()
π Performance Metrics
| Metric | Micro-batch | Continuous | Notes |
|---|---|---|---|
| Latency | 100ms-10s | ~1ms | Depends on batch interval |
| Throughput | 100K-1M records/sec | 10K-100K records/sec | Micro-batch higher throughput |
| Fault Recovery | seconds-minutes | seconds | Checkpoint-based recovery |
| State Management | Optimized | Limited | Micro-batch has better state support |
| Exactly-once | Yes | Yes | Both guarantee exactly-once |
| Resource Usage | Higher | Lower | Micro-batch uses more resources |
π Best Practices
- Always set checkpoint location - Critical for fault tolerance and exactly-once processing
- Use appropriate output mode - Append for simple aggregations, Complete for full result tables
- Configure watermarks - Essential for event-time windowing and state management
- Monitor streaming metrics - Track processing times, input rates, and state sizes
- Tune micro-batch interval - Balance between latency and throughput requirements
- Use schema inference carefully - Can be expensive; prefer explicit schemas when possible
- Handle late data explicitly - Define watermark thresholds based on data patterns
- Optimize state store - Configure RocksDB or other state stores for large state
- Use multiple queries - Separate different processing logic into multiple queries
- Test with realistic data - Validate performance with production-like data volumes
π Related Topics
- 12-state-management.mdx: Advanced stateful operations and checkpointing
- 13-window-operations.mdx: Detailed windowing strategies and implementations
- 14-merge-upsert.mdx: Delta Lake merge operations for streaming data
- 15-data-quality.mdx: Data validation in streaming pipelines
See Also
- Kafka Streams (kafka/03): Kafka integration with Structured Streaming
- Data Engineering Streaming (data-engineering/022): End-to-end streaming pipeline architecture