πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Structured Streaming: Triggers, Watermarks, State Management

Apache SparkStreaming⭐ Premium

Advertisement

Structured Streaming: Triggers, Watermarks, State Management

Difficulty: Expert | Companies: Uber, Lyft, Netflix, Airbnb, Stripe

ℹ️Interview Context

Structured Streaming is increasingly critical for real-time data pipelines. Interviewers focus on exactly-once semantics, watermark handling, and state management for windowed aggregations.

Question

Explain the complete lifecycle of a Structured Streaming query. How does watermarking handle late-arriving data? Compare different trigger modes and their impact on latency vs. throughput. Describe state management strategies for windowed aggregations and how to handle state store bloat.


Detailed Answer

1. Structured Streaming Architecture

Micro-Batch Engine (default)SourceKafka, FileBatch GeneratorMicro-batch framingOperationmap, groupBy, windowSinkKafka, memory

2. Complete Streaming Query Lifecycle

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("StructuredStreamingDeepDive") \
    .config("spark.sql.streaming.checkpointLocation", "/checkpoint") \
    .config("spark.sql.streaming.schemaInference", "true") \
    .getOrCreate()

# Define schema
schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("amount", DoubleType(), True)
])

# Create streaming source
stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "events") \
    .option("startingOffsets", "latest") \
    .option("maxOffsetsPerTrigger", 100000) \
    .load()

# Parse and transform
parsed = stream_df.select(
    F.from_json(F.col("value").cast("string"), schema).alias("data")
).select("data.*")

# Windowed aggregation with watermark
windowed = parsed \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        F.window("timestamp", "5 minutes", "1 minute"),
        "event_type"
    ).agg(
        F.count("*").alias("event_count"),
        F.sum("amount").alias("total_amount")
    )

# Write to sink
query = windowed.writeStream \
    .outputMode("update") \
    .format("console") \
    .option("checkpointLocation", "/checkpoint/windowed") \
    .trigger(processingTime="30 seconds") \
    .start()

# Query lifecycle states:
# 1. INITIALIZED: query created, not started
# 2. ACTIVE: processing data
# 3. TERMINATED: completed or stopped
# 4. FAILED: error occurred

3. Trigger Modes β€” Detailed Comparison

# Trigger 1: Default (micro-batch, no delay)
# Processes next batch as soon as previous completes
query1 = df.writeStream \
    .trigger()  # default: micro-batch as fast as possible
    .start()

# Trigger 2: Fixed Interval
# Processes batch every N seconds
query2 = df.writeStream \
    .trigger(processingTime="30 seconds")  # every 30 seconds
    .start()

# Trigger 3: Once
# Processes all available data and stops
query3 = df.writeStream \
    .trigger(once=True)  # process all, then stop
    .start()

# Trigger 4: Available Now
# Processes all available data in multiple batches, then stops
query4 = df.writeStream \
    .trigger(availableNow=True)  # process all, then stop
    .start()

# Trigger 5: Continuous (experimental)
# Lowest latency (~1ms) but limited operations
query5 = df.writeStream \
    .trigger(continuous="1 second")  # checkpoint every second
    .start()

# Latency vs Throughput analysis:
<div className="my-6 flex justify-center">
<svg viewBox="0 0 800 170" width="100%" style={{ maxWidth: 720 }} xmlns="http://www.w3.org/2000/svg">
  <defs>
    <linearGradient id="trg-hdr" x1="0" y1="0" x2="1" y2="1">
      <stop offset="0%" stopColor="#6366f1"/>
      <stop offset="100%" stopColor="#4f46e5"/>
    </linearGradient>
    <filter id="trg-shadow">
      <feDropShadow dx="0" dy="2" stdDeviation="3" floodOpacity="0.12"/>
    </filter>
  </defs>
  <rect x="10" y="10" width="780" height="150" rx="14" fill="#fff" filter="url(#trg-shadow)" stroke="#e2e8f0" strokeWidth="1"/>
  <rect x="10" y="10" width="780" height="32" rx="14" fill="url(#trg-hdr)"/>
  <rect x="10" y="28" width="780" height="14" fill="url(#trg-hdr)"/>
  <text x="120" y="32" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui,sans-serif" fontSize="11" fontWeight="700">Trigger</text>
  <text x="340" y="32" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui,sans-serif" fontSize="11" fontWeight="700">Latency</text>
  <text x="510" y="32" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui,sans-serif" fontSize="11" fontWeight="700">Throughput</text>
  <text x="680" y="32" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui,sans-serif" fontSize="11" fontWeight="700">Use Case</text>
  <text x="120" y="60" textAnchor="middle" fill="#334155" fontFamily="Inter,system-ui,sans-serif" fontSize="10">Default</text>
  <text x="340" y="60" textAnchor="middle" fill="#334155" fontFamily="Inter,system-ui,sans-serif" fontSize="10">~100ms</text>
  <text x="510" y="60" textAnchor="middle" fill="#10b981" fontFamily="Inter,system-ui,sans-serif" fontSize="10" fontWeight="600">Highest</text>
  <text x="680" y="60" textAnchor="middle" fill="#334155" fontFamily="Inter,system-ui,sans-serif" fontSize="10">Real-time</text>
  <line x1="30" y1="70" x2="770" y2="70" stroke="#e2e8f0" strokeWidth="0.5"/>
  <text x="120" y="85" textAnchor="middle" fill="#334155" fontFamily="Inter,system-ui,sans-serif" fontSize="10">Fixed (30s)</text>
  <text x="340" y="85" textAnchor="middle" fill="#334155" fontFamily="Inter,system-ui,sans-serif" fontSize="10">~30s</text>
  <text x="510" y="85" textAnchor="middle" fill="#3b82f6" fontFamily="Inter,system-ui,sans-serif" fontSize="10" fontWeight="600">High</text>
  <text x="680" y="85" textAnchor="middle" fill="#334155" fontFamily="Inter,system-ui,sans-serif" fontSize="10">Batch-like</text>
  <line x1="30" y1="95" x2="770" y2="95" stroke="#e2e8f0" strokeWidth="0.5"/>
  <text x="120" y="110" textAnchor="middle" fill="#334155" fontFamily="Inter,system-ui,sans-serif" fontSize="10">Once</text>
  <text x="340" y="110" textAnchor="middle" fill="#6b7280" fontFamily="Inter,system-ui,sans-serif" fontSize="10">N/A</text>
  <text x="510" y="110" textAnchor="middle" fill="#10b981" fontFamily="Inter,system-ui,sans-serif" fontSize="10" fontWeight="600">Highest</text>
  <text x="680" y="110" textAnchor="middle" fill="#334155" fontFamily="Inter,system-ui,sans-serif" fontSize="10">Backfill</text>
  <line x1="30" y1="120" x2="770" y2="120" stroke="#e2e8f0" strokeWidth="0.5"/>
  <text x="120" y="135" textAnchor="middle" fill="#334155" fontFamily="Inter,system-ui,sans-serif" fontSize="10">Available Now</text>
  <text x="340" y="135" textAnchor="middle" fill="#334155" fontFamily="Inter,system-ui,sans-serif" fontSize="10">~100ms</text>
  <text x="510" y="135" textAnchor="middle" fill="#3b82f6" fontFamily="Inter,system-ui,sans-serif" fontSize="10" fontWeight="600">High</text>
  <text x="680" y="135" textAnchor="middle" fill="#334155" fontFamily="Inter,system-ui,sans-serif" fontSize="10">Migration</text>
  <line x1="30" y1="145" x2="770" y2="145" stroke="#e2e8f0" strokeWidth="0.5"/>
  <text x="120" y="158" textAnchor="middle" fill="#334155" fontFamily="Inter,system-ui,sans-serif" fontSize="10">Continuous</text>
  <text x="340" y="158" textAnchor="middle" fill="#334155" fontFamily="Inter,system-ui,sans-serif" fontSize="10">~1ms</text>
  <text x="510" y="158" textAnchor="middle" fill="#f59e0b" fontFamily="Inter,system-ui,sans-serif" fontSize="10" fontWeight="600">Medium</text>
  <text x="680" y="158" textAnchor="middle" fill="#334155" fontFamily="Inter,system-ui,sans-serif" fontSize="10">Ultra low-lat</text>
</svg>
</div>

4. Watermarking β€” Late Data Handling

# Watermark definition:
# A watermark is a threshold that tells Spark how late data can be
# and still be included in aggregation results.

# Mathematical model:
# Let W = watermark threshold (e.g., "10 minutes")
# Let T_max = maximum event time seen so far
# Watermark value = T_max - W
# Any data with event_time < watermark is considered "too late"

# Example:
# Event times seen: [10:00, 10:05, 10:15, 10:20]
# Watermark threshold: 10 minutes
# Current max event time: 10:20
# Watermark value: 10:20 - 10:00 = 10:10
# Late data (event_time < 10:10) will be dropped

# Implementation:
stream_with_watermark = parsed \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        F.window("timestamp", "5 minutes", "1 minute"),
        "event_type"
    ).agg(F.count("*").alias("count"))

# Watermark behavior:
<div className="my-6 flex justify-center">
<svg viewBox="0 0 800 130" width="100%" style={{ maxWidth: 650 }} xmlns="http://www.w3.org/2000/svg">
  <defs>
    <linearGradient id="wm-hdr" x1="0" y1="0" x2="1" y2="1">
      <stop offset="0%" stopColor="#8b5cf6"/>
      <stop offset="100%" stopColor="#7c3aed"/>
    </linearGradient>
    <filter id="wm-shadow">
      <feDropShadow dx="0" dy="2" stdDeviation="3" floodOpacity="0.12"/>
    </filter>
  </defs>
  <rect x="10" y="10" width="780" height="110" rx="14" fill="#fff" filter="url(#wm-shadow)" stroke="#e2e8f0" strokeWidth="1"/>
  <rect x="10" y="10" width="780" height="30" rx="14" fill="url(#wm-hdr)"/>
  <rect x="10" y="24" width="780" height="16" fill="url(#wm-hdr)"/>
  <text x="150" y="30" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui,sans-serif" fontSize="11" fontWeight="700">Event Time</text>
  <text x="380" y="30" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui,sans-serif" fontSize="11" fontWeight="700">Watermark</text>
  <text x="600" y="30" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui,sans-serif" fontSize="11" fontWeight="700">Action</text>
  <text x="150" y="56" textAnchor="middle" fill="#334155" fontFamily="Inter,system-ui,sans-serif" fontSize="10">10:25</text>
  <text x="380" y="56" textAnchor="middle" fill="#334155" fontFamily="Inter,system-ui,sans-serif" fontSize="10">10:10</text>
  <text x="600" y="56" textAnchor="middle" fill="#10b981" fontFamily="Inter,system-ui,sans-serif" fontSize="10" fontWeight="600">Included (within window)</text>
  <line x1="30" y1="66" x2="770" y2="66" stroke="#e2e8f0" strokeWidth="0.5"/>
  <text x="150" y="80" textAnchor="middle" fill="#334155" fontFamily="Inter,system-ui,sans-serif" fontSize="10">10:08</text>
  <text x="380" y="80" textAnchor="middle" fill="#334155" fontFamily="Inter,system-ui,sans-serif" fontSize="10">10:10</text>
  <text x="600" y="80" textAnchor="middle" fill="#ef4444" fontFamily="Inter,system-ui,sans-serif" fontSize="10" fontWeight="600">DROPPED (before watermark)</text>
  <line x1="30" y1="90" x2="770" y2="90" stroke="#e2e8f0" strokeWidth="0.5"/>
  <text x="150" y="104" textAnchor="middle" fill="#334155" fontFamily="Inter,system-ui,sans-serif" fontSize="10">10:12</text>
  <text x="380" y="104" textAnchor="middle" fill="#334155" fontFamily="Inter,system-ui,sans-serif" fontSize="10">10:10</text>
  <text x="600" y="104" textAnchor="middle" fill="#10b981" fontFamily="Inter,system-ui,sans-serif" fontSize="10" fontWeight="600">Included (after watermark)</text>
  <line x1="30" y1="114" x2="770" y2="114" stroke="#e2e8f0" strokeWidth="0.5"/>
  <text x="150" y="118" textAnchor="middle" fill="#334155" fontFamily="Inter,system-ui,sans-serif" fontSize="10">10:05</text>
  <text x="380" y="118" textAnchor="middle" fill="#334155" fontFamily="Inter,system-ui,sans-serif" fontSize="10">10:10</text>
  <text x="600" y="118" textAnchor="middle" fill="#ef4444" fontFamily="Inter,system-ui,sans-serif" fontSize="10" fontWeight="600">DROPPED (too late)</text>
</svg>
</div>

# Late data in append mode:
# If using append output mode, windows are only emitted when
# watermark passes the window end. Late data within watermark
# triggers retraction of previous result and emission of updated count.

# Late data in update mode:
# Each late record triggers an immediate update to the window aggregate.
# More responsive but more state management overhead.

5. State Management Strategies

# State Management in Structured Streaming:
# Spark maintains state for operations that require memory of past data:
# - Window aggregations
# - Streaming joins
# - MapGroupsWithState / FlatMapGroupsWithState

# State Store backends:
# 1. HashMapStateStore (default, on-heap)
# 2. RocksDBStateStore (Spark 3.x, off-heap, better performance)

# Configure state store:
spark.conf.set("spark.sql.streaming.stateStore.providerClass",
    "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

# State management for windowed aggregation:
# State key: (window_start, window_end, group_key)
# State value: aggregation buffer (count, sum, etc.)

# State cleanup via watermark:
# When watermark advances past window end, state is cleaned up
# This prevents state from growing indefinitely

# Example: sliding window aggregation
windowed_aggregated = parsed \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        F.window("timestamp", "10 minutes", "5 minutes"),  # 10-min window, 5-min slide
        "user_id"
    ).agg(
        F.sum("amount").alias("total"),
        F.count("*").alias("count")
    )

# State size estimation:
# Number of active windows = window_duration / slide_duration
# For 10-min window with 5-min slide: 2 active windows per key
# State per key: ~200 bytes (key + aggregation buffer)
# Total state: num_keys Γ— 2 Γ— 200 bytes

# Example: 1M unique users, 2 active windows
# State size = 1,000,000 Γ— 2 Γ— 200 = 400 MB

6. State Store Optimization

# Problem: State bloat in long-running queries
# Solution 1: Use watermark to bound state
# Solution 2: Use state store compaction
# Solution 3: Use FlatMapGroupsWithState for custom state management

# State store compaction (automatic in Spark 3.x):
# Periodically merges state delta files to reduce I/O
spark.conf.set("spark.sql.streaming.stateStore.compression.codec", "zstd")

# Custom state management with MapGroupsWithState:
def update_state(user_id, events, state):
    """Custom state management with explicit control."""
    if state.exists:
        # Update existing state
        total = state.get["total"] + sum(e.amount for e in events)
        count = state.get["count"] + len(events)
    else:
        # Initialize state
        total = sum(e.amount for e in events)
        count = len(events)
    
    state.update({"total": total, "count": count})
    
    # Emit result
    yield (user_id, total, count)

# Apply stateful transformation
from pyspark.sql.streaming import GroupStateTimeout

result = parsed.groupByKey(lambda x: x.user_id) \
    .mapGroupsWithState(update_state, GroupStateTimeout.ProcessingTimeTimeout())

7. Streaming Joins

# Stream-Stream Join with Watermark
# Both streams must have watermarks for state cleanup

left = spark.readStream.format("kafka").load()  # orders stream
right = spark.readStream.format("kafka").load()  # users stream

# Both must have watermark on join key's event time
left_watermarked = left.withWatermark("timestamp", "10 minutes")
right_watermarked = right.withWatermark("timestamp", "15 minutes")

# Join with watermarks
joined = left_watermarked.join(
    right_watermarked,
    on=F.expr("left.user_id = right.user_id AND "
              "left.timestamp BETWEEN right.timestamp - INTERVAL 10 MINUTES AND "
              "right.timestamp + INTERVAL 5 MINUTES")
)

# Join state size estimation:
# State retained = watermark_threshold Γ— arrival_rate
# For 10-min watermark, 1000 events/sec:
# State = 600 seconds Γ— 1000 events = 600K events
# At 1KB per event: 600 MB state per side

# Memory optimization for joins:
spark.conf.set("spark.sql.streaming.join.stateFormatVersion", "2")
spark.conf.set("spark.sql.streaming.stateStore.providerClass",
    "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

8. Fault Tolerance and Exactly-Once Semantics

# Structured Streaming provides exactly-once guarantees via:
# 1. Checkpointing (state + offsets)
# 2. Idempotent sinks
# 3. Transactional writes (where supported)

<div className="my-6 flex justify-center">
<svg viewBox="0 0 800 200" width="100%" style={{ maxWidth: 500 }} xmlns="http://www.w3.org/2000/svg">
  <defs>
    <linearGradient id="ss-ckpt-grad" x1="0" y1="0" x2="0" y2="1">
      <stop offset="0%" stopColor="#6366f1"/>
      <stop offset="100%" stopColor="#4f46e5"/>
    </linearGradient>
    <filter id="ss-ckpt-shadow">
      <feDropShadow dx="0" dy="2" stdDeviation="3" floodOpacity="0.12"/>
    </filter>
  </defs>
  <rect x="20" y="10" width="160" height="34" rx="8" fill="url(#ss-ckpt-grad)" filter="url(#ss-ckpt-shadow)"/>
  <text x="100" y="32" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui,sans-serif" fontSize="12" fontWeight="600">/checkpoint/</text>
  <line x1="100" y1="44" x2="100" y2="65" stroke="#94a3b8" strokeWidth="1.5"/>
  <line x1="100" y1="65" x2="30" y2="65" stroke="#94a3b8" strokeWidth="1.5"/>
  <line x1="100" y1="85" x2="30" y2="85" stroke="#94a3b8" strokeWidth="1.5"/>
  <line x1="100" y1="105" x2="30" y2="105" stroke="#94a3b8" strokeWidth="1.5"/>
  <line x1="100" y1="125" x2="30" y2="125" stroke="#94a3b8" strokeWidth="1.5"/>
  <line x1="100" y1="145" x2="30" y2="145" stroke="#94a3b8" strokeWidth="1.5"/>
  <line x1="30" y1="65" x2="30" y2="145" stroke="#94a3b8" strokeWidth="1.5"/>
  <rect x="40" y="55" width="180" height="24" rx="6" fill="#e0e7ff"/>
  <text x="130" y="72" textAnchor="middle" fill="#3730a3" fontFamily="Inter,system-ui,sans-serif" fontSize="11" fontWeight="500">commits/</text>
  <text x="240" y="72" fill="#6b7280" fontFamily="Inter,system-ui,sans-serif" fontSize="10">Completed batch IDs</text>
  <rect x="40" y="75" width="180" height="24" rx="6" fill="#e0e7ff"/>
  <text x="130" y="92" textAnchor="middle" fill="#3730a3" fontFamily="Inter,system-ui,sans-serif" fontSize="11" fontWeight="500">offsets/</text>
  <text x="240" y="92" fill="#6b7280" fontFamily="Inter,system-ui,sans-serif" fontSize="10">Source offsets per batch</text>
  <rect x="40" y="95" width="180" height="24" rx="6" fill="#dbeafe"/>
  <text x="130" y="112" textAnchor="middle" fill="#1e40af" fontFamily="Inter,system-ui,sans-serif" fontSize="11" fontWeight="500">metadata</text>
  <text x="240" y="112" fill="#6b7280" fontFamily="Inter,system-ui,sans-serif" fontSize="10">Query metadata</text>
  <rect x="40" y="115" width="180" height="24" rx="6" fill="#e0e7ff"/>
  <text x="130" y="132" textAnchor="middle" fill="#3730a3" fontFamily="Inter,system-ui,sans-serif" fontSize="11" fontWeight="500">state/</text>
  <text x="240" y="132" fill="#6b7280" fontFamily="Inter,system-ui,sans-serif" fontSize="10">State store snapshots</text>
  <rect x="40" y="135" width="180" height="24" rx="6" fill="#fef3c7"/>
  <text x="130" y="152" textAnchor="middle" fill="#92400e" fontFamily="Inter,system-ui,sans-serif" fontSize="11" fontWeight="500">commits/latest</text>
  <text x="240" y="152" fill="#6b7280" fontFamily="Inter,system-ui,sans-serif" fontSize="10">Last completed batch</text>
</svg>
</div>

# Recovery process:
# 1. Read last committed batch from commits/
# 2. Read source offsets from offsets/
# 3. Replay from source starting at those offsets
# 4. Restore state from state store snapshots

# Checkpoint configuration:
query = df.writeStream \
    .option("checkpointLocation", "/checkpoint/query1") \
    .start()

# Checkpoint cleanup (prevent unbounded growth):
spark.conf.set("spark.sql.streaming.checkpointCleaning.enabled", "true")
spark.conf.set("spark.sql.streaming.minBatchesToRetain", "1")  # keep minimal

⚠️Common Pitfall

A critical mistake is using append output mode with windowed aggregations without a watermark. Without a watermark, Spark cannot determine when a window is "complete" and will buffer results indefinitely.

πŸ’‘Interview Tip

When discussing watermarks, always clarify that they are event-time based, not processing-time based. A watermark of "10 minutes" means "allow up to 10 minutes of lateness in event time", not "wait 10 minutes of processing time".


Summary

ConceptPurposeConfiguration
TriggerControl batch frequencytrigger(processingTime="30s")
WatermarkHandle late datawithWatermark("col", "10 minutes")
State StoreMaintain aggregation stateRocksDB provider for performance
CheckpointingFault toleranceoption("checkpointLocation", "...")
Output ModeControl result emissionupdate, append, complete

Structured Streaming provides a unified API for both batch and streaming with exactly-once guarantees, making it the preferred choice for modern streaming workloads.

Advertisement