Real-Time Analytics with PySpark
Architecture Diagram: Real-Time Analytics Stack
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β REAL-TIME ANALYTICS ARCHITECTURE β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ£
β β
β DATA SOURCES (Ingestion) β
β ββββββββββ ββββββββββ ββββββββββ ββββββββββ ββββββββββ ββββββββββ ββββββββββ β
β β IoT β β Click β β App β β Log β β DB β β API β β File β β
β β Sensorsβ β Streamβ β Events β β Files β β CDC β β Feeds β β Drops β β
β βββββ¬βββββ βββββ¬βββββ βββββ¬βββββ βββββ¬βββββ βββββ¬βββββ βββββ¬βββββ βββββ¬βββββ β
β β β β β β β β β
β ββββββββββββ΄βββββββββββ΄ββββββ¬βββββ΄βββββββββββ΄βββββββββββ΄βββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β EVENT STREAMING PLATFORM (Apache Kafka) β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β β
β β β β Topic: β β Topic: β β Topic: β β Topic: β β Topic: β β β β
β β β β events β β metrics β β alerts β β commands β β cdc β β β β
β β β β (raw) β β (typed) β β (urgent) β β (ordered)β β (change) β β β β
β β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β β
β β β β β β
β β β Retention: 7 days (raw), 30 days (aggregated) β β β
β β β Partitions: 12-24 per topic (parallelism) β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β PySpark Structured Streaming Engine β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ β β β
β β β β Micro- β β Window β β State β β Output β β β β
β β β β Batch β β Operationsβ β Managementβ β Mode β β β β
β β β β Processingβ β β β β β β β β β
β β β β β β β’ Tumbling β β β’ MapState β β β’ Append β β β β
β β β β β’ Trigger β β β’ Sliding β β β’ ListState β β β’ Update β β β β
β β β β β’ Batch Sizeβ β β’ Session β β β’ ValueStateβ β β’ Complete β β β β
β β β β β’ Watermark β β β’ Global β β β’ TTL β β β β β β
β β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ β β β
β β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β ADAPTIVE QUERY EXECUTION (AQE) β β β β
β β β β β’ Dynamic partition coalescing β β β β
β β β β β’ Skew join optimization β β β β
β β β β β’ Runtime SQL_adaptive coalescePartitions β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βββββββββββββββΌββββββββββββββ β
β βΌ βΌ βΌ β
β ββββββββββββββββββββββββ ββββββββββββββββββββββββ ββββββββββββββββββββββββ β
β β DELTA LAKE TABLES β β MATERIALIZED VIEWS β β REAL-TIME β β
β β (Persistent State) β β (Pre-computed Aggs) β β DASHBOARDS β β
β β β β β β β β
β β β’ Streaming Sink β β β’ Tumbling Windows β β β’ Grafana β β
β β β’ Exactly-Once β β β’ Session Windows β β β’ Apache Superset β β
β β β’ Time Travel β β β’ Running Aggregatesβ β β’ Custom Dashboards β β
β β β’ ACID Transactions β β β’ Incremental Refreshβ β β’ Alerting Rules β β
β ββββββββββββββββββββββββ ββββββββββββββββββββββββ ββββββββββββββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Architecture Diagram: Streaming Window Operations
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β STREAMING WINDOW OPERATIONS β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β INPUT STREAM (Events with timestamps) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β t=1 t=2 t=3 t=4 t=5 t=6 t=7 t=8 t=9 t=10 β
β β β β β β β β β β β β
β β β β β β β β β β β β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β TUMBLING WINDOW (Non-overlapping, fixed-size) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β Window Size: 3, Slide: 3 (same as size = tumbling) β
β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Window [1-3]β β Window [4-6]β β Window [7-9]β β Window [10+]β β
β β β β β β β β β β β β β β β β β β β β
β β Count: 3 β β Count: 3 β β Count: 3 β β Count: 1 β β
β β Sum: 6 β β Sum: 15 β β Sum: 24 β β Sum: 10 β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β SLIDING WINDOW (Overlapping, fixed-size) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β Window Size: 4, Slide: 2 β
β β
β βββββββββββββββββββββ βββββββββββββββββββββ βββββββββββββββββββββ β
β β Window [1-4] β β Window [3-6] β β Window [5-8] β β
β β β β β β β β β β β β β β β β β β β β
β β Count: 4 β β Count: 4 β β Count: 4 β β
β β Sum: 10 β β Sum: 18 β β Sum: 26 β β
β βββββββββββββββββββββ βββββββββββββββββββββ βββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β SESSION WINDOW (Dynamic size based on activity gaps) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β Gap Threshold: 2 β
β β
β β β β β β β β β β β β β
β β β β β β β β β β β β β
β ββββ¬βββ βββββ΄ββ βββββ¬ββββ βββββββ β
β β β β
β Session 1 Session 2 Session 3 β
β (t=1-3) (t=5-9) (t=10+) β
β Count: 3 Count: 5 Count: 1 β
β Duration: 2 Duration: 4 Duration: 0 β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β WATERMARK (Late event handling) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Event Time: t=1 t=2 t=3 t=4 t=5 t=6 t=7 t=8 t=9 t=10 β
β Watermark: β β β β w=1 w=2 w=3 w=4 w=5 w=6 β
β β
β Events arriving: β
β t=1 β OK (before watermark) β
β t=2 β OK (before watermark) β
β t=7 β OK (before watermark) β
β t=1 β LATE (after watermark, dropped or handled) β
β β
β Watermark = max event time seen - allowed lateness β
β Events before watermark are considered "complete" for the window β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Architecture Diagram: Materialized View Incremental Refresh
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β INCREMENTAL MATERIALIZED VIEW REFRESH β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β FULL REFRESH (Traditional - Inefficient) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β Every refresh: β β
β β 1. Read ENTIRE source table (100M rows) β β
β β 2. Compute ALL aggregations from scratch β β
β β 3. Overwrite ENTIRE materialized view β β
β β β β
β β Cost: $50/refresh Γ 24 refreshes/day = $1,200/day β β
β β Latency: 45 minutes per refresh β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β INCREMENTAL REFRESH (PySpark + Delta Lake - Efficient) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β Every refresh: β β
β β 1. Read ONLY new/changed records (delta) β β
β β 2. Compute aggregations on DELTA only β β
β β 3. MERGE changes into existing materialized view β β
β β β β
β β Cost: $2/refresh Γ 24 refreshes/day = $48/day β β
β β Latency: 30 seconds per refresh β β
β β β β
β β Savings: 96% cost reduction, 90x latency improvement β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β INCREMENTAL REFRESH FLOW β β
β β β β
β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β
β β β Source Table ββββββΆβ Change ββββββΆβ Aggregate β β β
β β β (Delta Lake) β β Detection β β Delta Only β β β
β β β β β β β β β β
β β β 100M rows β β 50K new rows β β 10K new aggs β β β
β β ββββββββββββββββ ββββββββββββββββ ββββββββ¬ββββββββ β β
β β β β β
β β βΌ β β
β β ββββββββββββββββ β β
β β β MERGE into β β β
β β β Materialized β β β
β β β View β β β
β β β β β β
β β β 2M rows β β β
β β β (updated) β β β
β β ββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β MATERIALIZED VIEW PATTERNS β β
β β β β
β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β
β β β Tumbling β β Sliding β β Session β β Running β β β
β β β Window β β Window β β Window β β Aggregate β β β
β β β β β β β β β β β β
β β β Fixed-size β β Overlapping β β Dynamic-size β β Cumulative β β β
β β β non-overlap β β fixed-size β β gap-based β β over time β β β
β β β β β β β β β β β β
β β β USE: Hourly β β USE: 5-min β β USE: User β β USE: Daily β β β
β β β reports β β rolling avg β β sessions β β totals β β β
β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Detailed Explanation
Real-time analytics with PySpark is achieved through Structured Streaming, a micro-batch processing engine built on the Spark SQL engine that processes streaming data as a series of small batch queries. Unlike traditional batch processing that operates on complete datasets, streaming analytics processes data incrementally, maintaining state across micro-batches to provide continuously updated results.
The fundamental abstraction in Structured Streaming is the streaming DataFrame β an unbounded table that grows as new data arrives. Each micro-batch processes a chunk of new data, applies transformations (select, filter, join, aggregate), and writes results to a sink (console, Kafka, Delta Lake, etc.). The engine manages fault tolerance through WAL (Write-Ahead Log) and checkpoint-based offset tracking, ensuring exactly-once processing semantics.
Windowed aggregations are the cornerstone of streaming analytics. PySpark supports three window types: tumbling windows (fixed-size, non-overlapping β ideal for hourly/daily reports), sliding windows (fixed-size, overlapping β ideal for rolling averages), and session windows (dynamic-size based on activity gaps β ideal for user session analysis). Each window type requires specifying a time column (event time) and a watermark (maximum lateness tolerance) to handle late-arriving events.
Watermarks are critical for bounded state management in streaming. Without watermarks, the engine must retain state for all possible event times indefinitely. A watermark defines the threshold beyond which late events are either dropped or handled separately. For example, a watermark of "10 minutes" means that events arriving more than 10 minutes after the maximum event time seen so far are considered late and may be dropped from window aggregations.
Materialized views in the streaming context are pre-computed aggregations that are incrementally updated as new data arrives. Instead of recomputing the entire aggregation on every refresh (full refresh), the engine tracks changes since the last refresh and applies only the delta (incremental refresh). This provides orders-of-magnitude improvements in both latency and cost. Delta Lake's MERGE operation enables this by allowing atomic upserts β insert new aggregation groups and update existing ones in a single transaction.
State management in Structured Streaming uses RocksDB (via StateStore) to maintain intermediate aggregation state across micro-batches. For simple aggregations (count, sum), state is minimal. For complex stateful operations (session windows, stream-stream joins), state can grow significantly and must be managed through TTL (Time-to-Live) configuration and periodic state cleanup.
The output mode determines how results are written: Append mode writes only new rows (ideal for non-aggregation queries), Update mode writes only rows that changed since the last trigger (ideal for aggregation queries), and Complete mode rewrites the entire result table (ideal for small result sets). The choice of output mode affects both correctness and performance.
Key Concepts Table
Mathematical Foundations
Definition: Streaming Window
A streaming window assigns each event with timestamp to a set of windows:
Types: Tumbling (non-overlapping), Sliding (overlapping), Session (activity-based).
Watermark Definition
A watermark is a monotonically non-decreasing function that estimates the maximum event time yet to arrive:
Late events beyond watermark threshold are dropped or routed to side output.
Window Correctness Theorem
A window computation is correct if:
- Every event assigned to window is processed exactly once
- No event is missed:
- Watermark progression guarantees eventual completion:
Materialized View Refresh
Incremental refresh cost for materialized view with source :
vs. full recomputation: . Break-even when .
End-to-End Latency
Total latency from event arrival to query result:
Target: (typically seconds for real-time).
Key Insight
Structured Streaming uses micro-batch execution by default. For sub-second latency, use Continuous processing mode (experimental). The trade-off is exactly-once guarantees vs. latency.
Summary
Real-time analytics relies on windowing for temporal aggregation, watermarks for late data handling, and incremental refresh for materialized views. End-to-end latency is the sum of ingestion, processing, commit, and query phases. Window correctness requires exactly-once processing with watermark-driven completion.
Key Concepts Table (cont.)
| Concept | Description | Configuration | Use Case |
|---|---|---|---|
| Micro-Batch | Discrete processing intervals | trigger(processingTime="10 seconds") | General streaming |
| Continuous | Low-latency processing | trigger(processingMode="continuous", checkpointingInterval="1 second") | Sub-second latency |
| Watermark | Late event tolerance | withWatermark("event_time", "10 minutes") | Window aggregations |
| Tumbling Window | Fixed non-overlapping windows | window("event_time", "5 minutes") | Hourly reports |
| Sliding Window | Fixed overlapping windows | window("event_time", "10 minutes", "5 minutes") | Rolling averages |
| Session Window | Dynamic gap-based windows | Custom implementation | User sessions |
| State Store | RocksDB-based state backend | spark.sql.streaming.stateStore.providerClass | Stateful operations |
| Checkpoint | Offset + state persistence | .option("checkpointLocation", "/path") | Fault tolerance |
| Trigger | Micro-batch frequency | .trigger(processingTime="30 seconds") | Throughput tuning |
| Output Mode | Result writing strategy | .outputMode("update"/"append"/"complete") | Result correctness |
Code Examples
Example 1: Real-Time Click Stream Analytics with Windowed Aggregations
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("RealTime-ClickAnalytics") \
.config("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") \
.getOrCreate()
# Define click event schema
click_schema = StructType([
StructField("user_id", StringType()),
StructField("page_url", StringType()),
StructField("action", StringType()), # click, scroll, hover
StructField("timestamp", TimestampType()),
StructField("session_id", StringType()),
StructField("device_type", StringType()),
StructField("geo_location", StringType()),
])
# Read click stream from Kafka
click_stream = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "click-events")
.option("startingOffsets", "latest")
.load()
.select(
from_json(col("value").cast("string"), click_schema).alias("data")
)
.select("data.*")
.withColumn("event_time", col("timestamp"))
)
# βββ Tumbling Window: Page views per 5-minute window βββ
page_views_5min = (
click_stream
.withWatermark("event_time", "10 minutes")
.groupBy(
window(col("event_time"), "5 minutes"),
col("page_url")
)
.agg(
count("*").alias("view_count"),
approx_count_distinct("user_id").alias("unique_users"),
count(when(col("action") == "click", 1)).alias("click_count"),
)
.select(
col("window.start").alias("window_start"),
col("window.end").alias("window_end"),
col("page_url"),
col("view_count"),
col("unique_users"),
col("click_count"),
(col("click_count") / col("view_count")).alias("click_through_rate"),
)
)
# Write to Delta Lake (Update mode for aggregations)
(
page_views_5min
.writeStream
.format("delta")
.outputMode("update")
.option("checkpointLocation", "/mnt/checkpoints/page_views_5min")
.trigger(processingTime="30 seconds")
.start("/mnt/analytics/gold/page_views_5min")
)
# βββ Sliding Window: Rolling 10-minute average with 5-minute slide βββ
rolling_metrics = (
click_stream
.withWatermark("event_time", "15 minutes")
.groupBy(
window(col("event_time"), "10 minutes", "5 minutes"),
col("device_type")
)
.agg(
count("*").alias("total_events"),
avg(when(col("action") == "click", 1).otherwise(0)).alias("avg_click_rate"),
countDistinct("session_id").alias("active_sessions"),
)
)
(
rolling_metrics
.writeStream
.format("delta")
.outputMode("update")
.option("checkpointLocation", "/mnt/checkpoints/rolling_metrics")
.trigger(processingTime="30 seconds")
.start("/mnt/analytics/gold/rolling_metrics")
)
# βββ Session Window: User session analytics βββ
# Custom session window implementation
from pyspark.sql.window import Window
session_events = (
click_stream
.withColumn("session_start",
first(col("event_time")).over(
Window.partitionBy("session_id")
.orderBy("event_time")
.rowsBetween(Window.unboundedPreceding, 0)
)
)
.withColumn("gap_minutes",
(col("event_time").cast("long") - col("session_start").cast("long")) / 60
)
.withColumn("new_session",
when(col("gap_minutes") > 30, 1).otherwise(0)
)
.withColumn("session_group",
sum("new_session").over(
Window.partitionBy("user_id").orderBy("event_time")
)
)
)
session_analytics = (
session_events
.groupBy(
col("user_id"),
col("session_id"),
col("session_start")
)
.agg(
min("event_time").alias("session_start"),
max("event_time").alias("session_end"),
count("*").alias("event_count"),
countDistinct("page_url").alias("pages_visited"),
(max("event_time").cast("long") - min("event_time").cast("long")).alias(
"session_duration_seconds"
),
)
)
Example 2: Real-Time Anomaly Detection with Stateful Processing
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("RealTime-AnomalyDetection") \
.getOrCreate()
# Read sensor data stream
sensor_schema = StructType([
StructField("sensor_id", StringType()),
StructField("metric_name", StringType()),
StructField("value", DoubleType()),
StructField("timestamp", TimestampType()),
])
sensor_stream = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "sensor-metrics")
.load()
.select(from_json(col("value").cast("string"), sensor_schema).alias("data"))
.select("data.*")
)
# βββ Rolling Statistics for Anomaly Detection βββ
windowed_stats = (
sensor_stream
.withWatermark("timestamp", "5 minutes")
.groupBy(
window(col("timestamp"), "15 minutes", "1 minute"),
col("sensor_id"),
col("metric_name"),
)
.agg(
avg("value").alias("avg_value"),
stddev("value").alias("stddev_value"),
min("value").alias("min_value"),
max("value").alias("max_value"),
count("*").alias("reading_count"),
)
.withColumn("upper_bound", col("avg_value") + 3 * col("stddev_value"))
.withColumn("lower_bound", col("avg_value") - 3 * col("stddev_value"))
)
# βββ Detect Anomalies in Real-Time βββ
anomaly_stream = (
sensor_stream
.join(
windowed_stats,
(sensor_stream["sensor_id"] == windowed_stats["sensor_id"]) &
(sensor_stream["metric_name"] == windowed_stats["metric_name"]) &
(sensor_stream["timestamp"] >= windowed_stats["window.start"]) &
(sensor_stream["timestamp"] <= windowed_stats["window.end"]),
"left"
)
.withColumn("is_anomaly",
(col("value") > col("upper_bound")) |
(col("value") < col("lower_bound"))
)
.filter(col("is_anomaly") == True)
.select(
sensor_stream["sensor_id"],
sensor_stream["metric_name"],
sensor_stream["value"],
sensor_stream["timestamp"],
col("avg_value").alias("expected_value"),
col("stddev_value").alias("expected_stddev"),
(abs(col("value") - col("avg_value")) / col("stddev_value")).alias("z_score"),
)
)
# Write anomalies to Delta Lake + send alerts
(
anomaly_stream
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/mnt/checkpoints/anomaly_alerts")
.trigger(processingTime="10 seconds")
.start("/mnt/analytics/gold/anomaly_alerts")
)
# βββ Real-Time Dashboard Query (Read from Materialized View) βββ
dashboard_metrics = spark.read.format("delta").load("/mnt/analytics/gold/page_views_5min")
# Top pages in last hour
top_pages = (
dashboard_metrics
.filter(col("window_start") >= current_timestamp() - expr("INTERVAL 1 HOUR"))
.groupBy("page_url")
.agg(
sum("view_count").alias("total_views"),
sum("click_count").alias("total_clicks"),
avg("click_through_rate").alias("avg_ctr"),
)
.orderBy(desc("total_views"))
.limit(20)
)
top_pages.show()
Example 3: Streaming JOIN between Two Data Streams
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("StreamStream-Join") \
.getOrCreate()
# Stream 1: Click events
clicks = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "click-events")
.load()
.select(
from_json(col("value").cast("string"), click_schema).alias("data")
)
.select("data.*")
.withColumn("event_time", col("timestamp"))
.withWatermark("event_time", "10 minutes")
)
# Stream 2: Purchase events
purchases = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "purchase-events")
.load()
.select(
from_json(col("value").cast("string"), purchase_schema).alias("data")
)
.select("data.*")
.withColumn("event_time", col("timestamp"))
.withWatermark("event_time", "10 minutes")
)
# Stream-Stream Join: Match clicks to purchases within 30-minute window
click_purchase_joined = (
clicks.alias("c")
.join(
purchases.alias("p"),
expr("""
c.user_id = p.user_id AND
p.event_time BETWEEN c.event_time AND c.event_time + INTERVAL 30 MINUTES
"""),
"left"
)
.select(
col("c.user_id"),
col("c.page_url").alias("clicked_page"),
col("c.event_time").alias("click_time"),
col("p.product_id"),
col("p.amount").alias("purchase_amount"),
col("p.event_time").alias("purchase_time"),
(col("p.event_time").cast("long") - col("c.event_time").cast("long")).alias(
"time_to_purchase_seconds"
),
)
)
# Write joined stream to Delta Lake
(
click_purchase_joined
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/mnt/checkpoints/click_purchase_join")
.trigger(processingTime="30 seconds")
.start("/mnt/analytics/gold/click_purchase_attribution")
)
Performance Metrics
| Metric | Batch Processing | Streaming (Micro-Batch) | Streaming (Continuous) | Improvement |
|---|---|---|---|---|
| End-to-End Latency | 1-24 hours | 10-60 seconds | 100-500 ms | 99.9% reduction |
| Throughput (events/sec) | 100K (batch) | 500K (streaming) | 1M (continuous) | 10x improvement |
| Cost per Million Events | 0.10 | $0.08 | 84% reduction | |
| State Size (1B events) | N/A (stateless) | 2-10 GB (RocksDB) | 2-10 GB (RocksDB) | Bounded |
| Recovery Time | Re-run entire batch | Replay from checkpoint | Replay from checkpoint | Same |
| Late Event Handling | N/A (re-process) | Watermark-based | Watermark-based | New capability |
| Concurrent Queries | 1 (batch) | Multiple (streaming) | Multiple (streaming) | Scaling |
| Exactly-Once | Difficult | Checkpoint-based | Checkpoint-based | Guaranteed |
| Resource Utilization | 100% during batch | 30-50% (micro-batch) | 60-80% (continuous) | More efficient |
| Complex Event Processing | Not supported | Window aggregations | Window aggregations | New capability |
Best Practices
-
Use watermarks for all windowed aggregations β Without watermarks, state grows unbounded. Set watermarks to slightly larger than the maximum expected event lateness (e.g., 10 minutes for events that typically arrive within 5 minutes).
-
Tune micro-batch size for latency vs. throughput β Smaller trigger intervals (1-10 seconds) reduce latency but increase overhead; larger intervals (30-60 seconds) improve throughput but increase latency. Profile your workload to find the optimal balance.
-
Use Delta Lake as the primary streaming sink β Delta Lake provides exactly-once semantics, ACID transactions, time travel, and schema evolution. It integrates natively with Structured Streaming as both source and sink.
-
Implement idempotent writes β Design streaming writes to be idempotent so that replaying from checkpoints produces the same results. Delta Lake's MERGE operation is inherently idempotent.
-
Monitor state size growth β For stateful operations (windowed aggregations, stream-stream joins), monitor RocksDB state size. Set TTL on state entries and configure periodic state cleanup to prevent OOM errors.
-
Use AQE for streaming β Enable Adaptive Query Execution (
spark.sql.streaming.microBatchPartitions=200and AQE settings) to dynamically optimize partition counts and handle data skew in streaming aggregations. -
Separate raw and aggregated streams β Write raw events to a Bronze layer first, then process aggregations in a separate streaming query. This provides fault isolation and enables reprocessing.
-
Implement dead-letter queues β Route events that fail deserialization or validation to a dead-letter topic/table for manual inspection rather than blocking the entire pipeline.
-
Use structured streaming for CDC β Combine Debezium CDC with Structured Streaming to apply database changes to Delta Lake targets in near-real-time with exactly-once semantics.
-
Optimize for your query pattern β For dashboards that query recent data, partition by time and use Z-ORDER on frequently filtered columns. For point-in-time queries, leverage Delta Lake time travel.
See also: Snowflake Time Travel (snowflake/02), Kafka CDC (kafka/04), Airflow DAGs (airflow/02)