πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Structured Streaming

Apache SparkStreaming⭐ Premium

Advertisement

Structured Streaming

Difficulty: Senior Level | Companies: Databricks, Netflix, Uber, Apple, Airbnb

Structured Streaming Fundamentals

Structured Streaming treats a live data stream as an unbounded table that is continuously appended. This model simplifies streaming applications with DataFrame-like operations.

Basic Streaming Application

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("StructuredStreaming") \
    .config("spark.sql.streaming.schemaInference", "true") \
    .config("spark.sql.streaming.checkpointLocation", "hdfs://checkpoints/streaming-app") \
    .getOrCreate()

# Read from Kafka
raw_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") \
    .option("subscribe", "events") \
    .option("startingOffsets", "latest") \
    .option("maxOffsetsPerTrigger", 1000000) \
    .load()

# Parse JSON messages
events = raw_stream \
    .select(
        F.col("key").cast("string").alias("event_key"),
        F.from_json(F.col("value").cast("string"), schema).alias("data"),
        F.col("timestamp").alias("event_time")
    ) \
    .select("event_key", "data.*", "event_time")

# Perform transformations
processed = events \
    .filter(F.col("event_type") == "click") \
    .withColumn("processing_time", F.current_timestamp()) \
    .withColumn("window", F.window("event_time", "1 hour"))

# Write to sink
query = processed \
    .writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "hdfs://checkpoints/click-aggregation") \
    .option("path", "hdfs://output/click-aggregations") \
    .trigger(processingTime="30 seconds") \
    .start()

query.awaitTermination()

ℹ️

Interview Insight: Structured Streaming provides exactly-once semantics through checkpointing and write-ahead logs. Always configure checkpointing for production workloads.

Output Modes

# Three output modes for streaming queries

# 1. Append Mode: Only new rows added since last trigger
# Best for: Stateless operations, filtering, projections
query_append = events \
    .filter(F.col("amount") > 100) \
    .writeStream \
    .format("console") \
    .outputMode("append") \
    .start()

# 2. Complete Mode: Entire result table output each trigger
# Best for: Stateless aggregations
query_complete = events \
    .groupBy("event_type") \
    .count() \
    .writeStream \
    .format("console") \
    .outputMode("complete") \
    .start()

# 3. Update Mode: Only rows that were updated since last trigger
# Best for: Stateful aggregations
query_update = events \
    .groupBy("user_id") \
    .agg(F.count("*").alias("event_count")) \
    .writeStream \
    .format("console") \
    .outputMode("update") \
    .start()

Windowed Aggregations

# Tumbling windows: Fixed-size, non-overlapping
tumbling = events \
    .withColumn("window", F.window("event_time", "10 minutes")) \
    .groupBy("window", "event_type") \
    .agg(F.count("*").alias("count")) \
    .select(
        F.col("window.start").alias("window_start"),
        F.col("window.end").alias("window_end"),
        "event_type",
        "count"
    )

# Sliding windows: Fixed-size, overlapping
sliding = events \
    .withColumn("window", F.window("event_time", "1 hour", "15 minutes")) \
    .groupBy("window", "event_type") \
    .agg(F.count("*").alias("count"))

# Session windows: Activity-based, dynamic size
# Spark 3.2+ supports session windows
session = events \
    .withColumn("window", F.session_window("event_time", "30 minutes")) \
    .groupBy("window", "user_id") \
    .agg(F.count("*").alias("event_count"))

⚠️

Warning: Window operations require watermarking for state management. Without watermarks, state grows unboundedly and causes OOM.

Watermarking for Late Data

# Watermark tells Spark to discard state older than watermark
watermarked = events \
    .withWatermark("event_time", "10 minutes") \
    .withColumn("window", F.window("event_time", "1 hour")) \
    .groupBy("window", "event_type") \
    .agg(F.count("*").alias("count"))

# Watermark handling:
# - Data arriving after watermark is dropped
# - State older than watermark is cleaned up
# - Late data within watermark is included in results

# Configure watermark
watermarked = events \
    .withWatermark("event_time", "5 minutes") \
    .groupBy(
        F.window("event_time", "10 minutes"),
        "event_type"
    ) \
    .agg(F.count("*").alias("count"))

# Write with watermark
query = watermarked \
    .writeStream \
    .format("delta") \
    .outputMode("update") \
    .option("checkpointLocation", "hdfs://checkpoints/watermarked") \
    .start()

State Management

# Use mapGroupsWithFlatMap for custom state management
from pyspark.sql.streaming.group import GroupState, GroupStateTimeout

def update_user_state(user_id, events, state: GroupState):
    """Custom state management for user sessions"""
    if state.hasTimedOut:
        # State timed out, emit final result
        if state.exists:
            current_state = state.get
            yield (user_id, current_state["count"], "timeout")
        state.remove()
    elif state.exists:
        # Update existing state
        current_state = state.get
        for event in events:
            current_state["count"] += 1
            current_state["last_event"] = event["event_time"]
        state.update(current_state)
        state.setTimeoutTimestamp(
            state.getCurrentProcessingTime() + 3600000  # 1 hour timeout
        )
    else:
        # Initialize new state
        new_state = {"count": len(events), "last_event": events[0]["event_time"]}
        state.update(new_state)
        state.setTimeoutTimestamp(
            state.getCurrentProcessingTime() + 3600000
        )

# Apply stateful operation
user_sessions = events \
    .groupByKey(lambda row: row["user_id"]) \
    .mapGroupsWithFlatMap(update_user_state, GroupStateTimeout.ProcessingTimeTimeout)

Trigger Strategies

# Different trigger options
# Default: Process all available data before next trigger
trigger_default = events.writeStream.trigger()

# Fixed interval: Process every N seconds
trigger_interval = events.writeStream.trigger(processingTime="30 seconds")

# Once: Process all data and stop
trigger_once = events.writeStream.trigger(once=True)

# Available now: Process all available data and stop (Spark 3.3+)
trigger_available = events.writeStream.trigger(availableNow=True)

# Continuous: Low-latency processing (experimental)
trigger_continuous = events.writeStream.trigger(continuous="1 second")

# Choose based on latency requirements:
# - Batch-like: trigger(processingTime="1 minute")
# - Near real-time: trigger(processingTime="10 seconds")
# - Low-latency: trigger(continuous="1 second")

ℹ️

Pro Tip: Use trigger(availableNow=True) for backfill scenarios. It processes all available data and stops, unlike trigger(once=True) which may miss data.

Fault Tolerance and Checkpointing

# Checkpointing is essential for fault tolerance
query = events \
    .writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "hdfs://checkpoints/my-streaming-app") \
    .start()

# Checkpoint stores:
# 1. offsets (where we are in the stream)
# 2. state (for stateful operations)
# 3. committed offsets (for exactly-once)

# Recovery after failure:
# Spark automatically recovers from checkpoints
# Just restart the query with the same checkpoint location

# Monitor checkpoint size
def monitor_checkpoint(checkpoint_path):
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(
        spark._jsc.hadoopConfiguration()
    )
    status = fs.listStatus(
        spark._jvm.org.apache.hadoop.fs.Path(checkpoint_path)
    )
    for s in status:
        print(f"{s.getPath().getName()}: {s.getLen() / 1024 / 1024:.2f} MB")

Performance Optimization

# Optimize streaming performance
spark = SparkSession.builder \
    .appName("StreamingOptimization") \
    .config("spark.sql.streaming.numPartitions", "200") \
    .config("spark.sql.streaming.concurrentOps.pollingDelay", "10") \
    .config("spark.sql.streaming.noDataMicroBatches.enabled", "true") \
    .getOrCreate()

# Optimize Kafka integration
kafka_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker1:9092") \
    .option("subscribe", "events") \
    .option("maxOffsetsPerTrigger", 1000000) \
    .option("minPartitions", "100") \
    .option("failOnDataLoss", "false") \
    .load()

# Process in batches for better performance
batched = kafka_stream \
    .repartition(200) \
    .withColumn("processing_time", F.current_timestamp()) \
    .writeStream \
    .format("delta") \
    .outputMode("append") \
    .trigger(processingTime="60 seconds") \
    .start()

ℹ️

Key Takeaway: Structured Streaming provides a unified API for batch and streaming. Use watermarking for state management, checkpointing for fault tolerance, and appropriate triggers for latency requirements.

Follow-Up Questions

  • How does Structured Streaming achieve exactly-once semantics?
  • Explain the difference between processing time and event time.
  • How would you handle schema evolution in a streaming application?
  • Describe strategies for backfilling data in a streaming pipeline.
  • How does continuous processing differ from micro-batch processing?

Advertisement