13. Window Operations in PySpark
DfTumbling Window
A tumbling window is a fixed-size, non-overlapping window that partitions the event timeline into equal intervals. Each event belongs to exactly one window: window_start = floor(event_time / window_size) Γ window_size.
DfSliding Window
A sliding window has a fixed size but slides by a step interval smaller than the window size. Events can belong to multiple overlapping windows, increasing computation proportional to window_size / slide_interval.
DfSession Window
A session window groups events based on activity gaps. The window closes when no events arrive within a specified gap threshold. Window size is dynamic and depends on event arrival patterns.
Sliding Window Overlap Factor
Here,
- =Number of windows each event appears in (on average)
- =Window size
- =Slide interval
Tumbling windows are the most efficient because each event maps to exactly one window. Sliding windows cause each event to appear in multiple windows, increasing computation and state size proportionally.
Use watermarks with window operations to bound state growth. Without a watermark, Spark must retain state indefinitely to handle arbitrarily late data, which causes unbounded memory growth.
ThWindow State Growth
Theorem: For sliding windows with overlap factor F_{overlap}, the number of active windows at any time is βS_{window} / S_{slide}β, and state storage grows proportionally. State cleanup occurs only when the watermark passes the window end time.
- Tumbling: non-overlapping, fixed size, most efficient (1 window per event)
- Sliding: overlapping, fixed size, higher computation proportional to overlap factor
- Session: dynamic size, gap-based, highest complexity
- Watermarks bound state growth by triggering window cleanup
ποΈ Window Types Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β WINDOW TYPES IN PYSPARK β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β TUMBLING WINDOWS β β
β β β β
β β Time ββββββββββββββββββββββββββββββββββββββββββββββββββββββΆ β β
β β β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β β Window 1β β Window 2β β Window 3β β Window 4β β Window 5β β β
β β β 0-5min β β 5-10min β β 10-15minβ β 15-20minβ β 20-25minβ β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β β β
β β β’ Fixed-size, non-overlapping windows β β
β β β’ Each event belongs to exactly one window β β
β β β’ Simple and efficient β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β SLIDING WINDOWS β β
β β β β
β β Time ββββββββββββββββββββββββββββββββββββββββββββββββββββββΆ β β
β β β β
β β βββββββββββββββββββ β β
β β β Window 1 β β β
β β β 0-10min β β β
β β βββββββββββββββββββ β β
β β βββββββββββββββββββ β β
β β β Window 2 β β β
β β β 2-12min β β β
β β βββββββββββββββββββ β β
β β βββββββββββββββββββ β β
β β β Window 3 β β β
β β β 4-14min β β β
β β βββββββββββββββββββ β β
β β β β
β β β’ Overlapping windows with configurable slide interval β β
β β β’ Each event can belong to multiple windows β β
β β β’ Useful for rolling aggregations β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β SESSION WINDOWS β β
β β β β
β β Time ββββββββββββββββββββββββββββββββββββββββββββββββββββββΆ β β
β β β β
β β Events: * ** * *** * ** * β β
β β β β β β β β β β β
β β βΌ βΌ βΌ βΌ βΌ βΌ βΌ β β
β β βββββββββββββββ ββββββββββββββββββββ βββββββββββββ β β
β β β Session 1 β β Session 2 β β Session 3 β β β
β β β (gap <10m) β β (gap <10m) β β (gap <10m)β β β
β β βββββββββββββββ ββββββββββββββββββββ βββββββββββββ β β
β β β β
β β β’ Dynamic window size based on activity gaps β β
β β β’ Windows merge when gap threshold is exceeded β β
β β β’ Ideal for user session analysis β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Window Aggregation Flow
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β WINDOW AGGREGATION FLOW β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Input Events: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β (t=1, v=10) (t=2, v=20) (t=3, v=15) (t=4, v=25) β β
β β β β β β β β
β β βΌ βΌ βΌ βΌ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β WINDOW ASSIGNMENT β β
β β β β
β β Tumbling (5min): β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Window [0-5]: (t=1,v=10), (t=2,v=20), (t=3,v=15), β β β
β β β (t=4,v=25) β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β Sliding (5min window, 2min slide): β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Window [0-5]: (t=1,v=10), (t=2,v=20), (t=3,v=15), β β β
β β β (t=4,v=25) β β β
β β β Window [2-7]: (t=2,v=20), (t=3,v=15), (t=4,v=25) β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β AGGREGATION RESULTS β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Window Start β Window End β Sum β Count β Average β β β
β β βββββββββββββββββΌβββββββββββββΌββββββββΌββββββββΌββββββββββ β β β
β β β 00:00 β 00:05 β 70 β 4 β 17.5 β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β Output Mode: Complete (all results) or Update (changes only) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π§ Watermark Integration Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β WATERMARK INTEGRATION WITH WINDOWS β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Event Timeline: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Time ββββββββββββββββββββββββββββββββββββββββββββββββββββββΆ β β
β β β β
β β Events: * * * * * * * * * * β β
β β t=1 t=2 t=3 t=4 t=5 t=6 t=7 t=8 t=9 t=10 β β
β β β β
β β Watermark (threshold=3): β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β Events older than (max_event_time - 3) are considered late β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β State Management with Watermark: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β Without Watermark: β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β State: [t=1, t=2, t=3, t=4, t=5, t=6, t=7, t=8, ...] β β β
β β β Size: Grows indefinitely β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β With Watermark (threshold=3): β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β At t=6: State = [t=3, t=4, t=5, t=6] (cleaned t=1,2) β β β
β β β At t=8: State = [t=5, t=6, t=7, t=8] (cleaned t=3,4) β β β
β β β Size: Bounded by watermark threshold β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Late Data Handling: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β Late Event (arrives after watermark): β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Event at t=2 arrives when watermark is at t=7 β β β
β β β Difference: 7 - 2 = 5 > threshold (3) β β β
β β β Action: Event is dropped or handled separately β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β Options for late data: β β
β β β’ Drop late events (default) β β
β β β’ Side output for late events β β
β β β’ Update existing windows with late events β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Detailed Explanation
Window operations in PySpark are fundamental for time-based aggregations in streaming data. They allow you to group events into temporal windows and perform aggregations like sum, count, average, and more over these windows. Understanding the different window types and their characteristics is crucial for building effective real-time analytics pipelines.
Tumbling windows are the simplest form of windowing. They divide time into fixed-size, non-overlapping intervals. Each event belongs to exactly one window based on its event time. Tumbling windows are ideal for periodic reporting, such as calculating hourly sales totals or minute-by-minute sensor readings. The window size determines the granularity of the aggregation.
Sliding windows provide more flexibility by allowing overlapping windows. A sliding window is defined by two parameters: the window size and the slide interval. The window size determines the duration of each window, while the slide interval determines how often a new window starts. When the slide interval is smaller than the window size, windows overlap, and events can belong to multiple windows. This is useful for rolling averages or moving statistics.
Session windows are dynamic windows that group events based on activity patterns. They define a maximum gap between events; if the gap exceeds this threshold, a new session starts. Session windows are particularly useful for analyzing user behavior, where sessions represent periods of continuous activity. They automatically handle varying session lengths and can merge sessions that are close together.
Watermarking is essential for window operations in streaming scenarios. It defines a threshold for handling late-arriving data. Events that arrive after the watermark threshold are considered too late to be included in aggregations. Watermarking allows the system to clean up old state and provides memory guarantees. The watermark threshold should be chosen based on the expected lateness of events in the data.
Window aggregation in Spark Streaming involves several steps: event time extraction, window assignment, state management, and output generation. The system maintains state for each window and updates it as new events arrive. When a window closes (based on the watermark), the aggregated results are emitted to the sink.
The choice of window type depends on the use case. Tumbling windows are best for periodic reporting, sliding windows for rolling calculations, and session windows for activity-based analysis. Each type has different characteristics in terms of state management, late data handling, and output patterns.
Performance considerations include window size, state size, and watermark configuration. Larger windows or smaller watermarks can lead to larger state sizes, which may impact memory usage and checkpoint duration. It's important to balance the window configuration with the available resources and the expected data volume.
Advanced features include multi-window aggregations, where multiple window types are applied simultaneously, and window joins, where data from different windows is joined together. These features enable complex real-time analytics use cases.
π Key Concepts Table
| Window Type | Size | Overlap | Use Case | State Management |
|---|---|---|---|---|
| Tumbling | Fixed | No | Periodic reporting | Simple, bounded |
| Sliding | Fixed | Yes | Rolling calculations | Moderate, multiple windows |
| Session | Dynamic | Depends on gap | User activity analysis | Complex, variable size |
| Global | Entire stream | N/A | Cumulative statistics | Unbounded without watermark |
π» Code Examples
Tumbling Window Aggregation
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("TumblingWindow") \
.getOrCreate()
# Read streaming data
stream_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sensor_data") \
.load() \
.selectExpr("CAST(value AS STRING) as json") \
.select(
from_json(col("json"), "device_id INT, temperature DOUBLE, timestamp TIMESTAMP").alias("data")
).select("data.*")
# Tumbling window aggregation (5-minute windows)
tumbling_agg = stream_df \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window("timestamp", "5 minutes"), # 5-minute tumbling window
"device_id"
).agg(
avg("temperature").alias("avg_temp"),
min("temperature").alias("min_temp"),
max("temperature").alias("max_temp"),
count("*").alias("reading_count")
).select(
col("window.start").alias("window_start"),
col("window.end").alias("window_end"),
"device_id", "avg_temp", "min_temp", "max_temp", "reading_count"
)
# Write to console
query = tumbling_agg.writeStream \
.outputMode("update") \
.format("console") \
.option("checkpointLocation", "/checkpoint/tumbling") \
.option("truncate", "false") \
.start()
query.awaitTermination()
Sliding Window with Multiple Aggregations
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("SlidingWindow") \
.getOrCreate()
# Read streaming data
stream_df = spark.readStream \
.format("rate") \
.option("rowsPerSecond", 100) \
.load() \
.withColumn("event_time", col("timestamp")) \
.withColumn("user_id", (col("value") % 10).cast(IntegerType())) \
.withColumn("amount", (col("value") * 1.5).cast(DoubleType()))
# Sliding window aggregation (10-minute window, 2-minute slide)
sliding_agg = stream_df \
.withWatermark("event_time", "15 minutes") \
.groupBy(
window("event_time", "10 minutes", "2 minutes"), # 10min window, 2min slide
"user_id"
).agg(
sum("amount").alias("total_amount"),
avg("amount").alias("avg_amount"),
count_distinct("user_id").alias("unique_users")
).select(
col("window.start").alias("window_start"),
col("window.end").alias("window_end"),
"user_id", "total_amount", "avg_amount", "unique_users"
)
# Write to console
query = sliding_agg.writeStream \
.outputMode("update") \
.format("console") \
.option("checkpointLocation", "/checkpoint/sliding") \
.option("truncate", "false") \
.start()
query.awaitTermination()
Session Window Analysis
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("SessionWindow") \
.getOrCreate()
# Read streaming data (user events)
stream_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "user_events") \
.load() \
.selectExpr("CAST(value AS STRING) as json") \
.select(
from_json(col("json"), "user_id INT, action STRING, event_time TIMESTAMP").alias("data")
).select("data.*")
# Session window aggregation (10-minute session gap)
session_agg = stream_df \
.withWatermark("event_time", "20 minutes") \
.groupBy(
session("event_time", "10 minutes"), # 10-minute session gap
"user_id"
).agg(
collect_list("action").alias("actions"),
count("*").alias("event_count"),
min("event_time").alias("session_start"),
max("event_time").alias("session_end")
).select(
col("session_start"),
col("session_end"),
"user_id", "actions", "event_count",
(unix_timestamp("session_end") - unix_timestamp("session_start")).alias("session_duration_seconds")
)
# Write to console
query = session_agg.writeStream \
.outputMode("update") \
.format("console") \
.option("checkpointLocation", "/checkpoint/session") \
.option("truncate", "false") \
.start()
query.awaitTermination()
Advanced: Multi-Window Aggregation
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("MultiWindowAggregation") \
.getOrCreate()
# Read streaming data
stream_df = spark.readStream \
.format("rate") \
.option("rowsPerSecond", 50) \
.load() \
.withColumn("event_time", col("timestamp")) \
.withColumn("metric_value", col("value").cast(DoubleType()))
# Define multiple window specifications
window_1min = window("event_time", "1 minute")
window_5min = window("event_time", "5 minutes")
window_15min = window("event_time", "15 minutes")
# Apply watermark
watermarked_df = stream_df.withWatermark("event_time", "30 minutes")
# Multi-window aggregation
multi_window_agg = watermarked_df \
.groupBy(
window_1min,
window_5min,
window_15min
).agg(
avg("metric_value").alias("avg_value"),
count("*").alias("count")
).select(
col("window_1min.start").alias("window_1min_start"),
col("window_1min.end").alias("window_1min_end"),
col("window_5min.start").alias("window_5min_start"),
col("window_5min.end").alias("window_5min_end"),
col("window_15min.start").alias("window_15min_start"),
col("window_15min.end").alias("window_15min_end"),
"avg_value", "count"
)
# Write to console
query = multi_window_agg.writeStream \
.outputMode("update") \
.format("console") \
.option("checkpointLocation", "/checkpoint/multi_window") \
.option("truncate", "false") \
.start()
query.awaitTermination()
π Performance Metrics
| Window Type | Latency | Throughput | State Size | Memory Usage |
|---|---|---|---|---|
| Tumbling (1min) | ~1min | High | Small | Low |
| Tumbling (5min) | ~5min | High | Medium | Medium |
| Sliding (10min/2min) | ~10min | Medium | Large | High |
| Session (10min gap) | Variable | Medium | Variable | Variable |
| Global | Unbounded | Low | Very Large | Very High |
π Best Practices
- Use appropriate window size - Balance between granularity and performance
- Configure watermarks properly - Essential for state management and late data handling
- Monitor state sizes - Track state growth to prevent memory issues
- Choose output mode wisely - Append for simple, Complete for full results
- Handle late data explicitly - Define strategy for events arriving after watermark
- Optimize window intervals - Use slide intervals that match business requirements
- Test with realistic data - Validate window behavior with production-like event patterns
- Consider session merging - For session windows, tune gap threshold based on activity patterns
- Use checkpointing - Essential for fault tolerance in windowed aggregations
- Profile performance - Monitor processing times and state sizes for optimization
π Related Topics
- 11-structured-streaming.mdx: Core streaming architecture and triggers
- 12-state-management.mdx: Stateful operations and checkpointing
- 14-merge-upsert.mdx: Delta Lake merge operations for windowed data
- 15-data-quality.mdx: Data validation in windowed aggregations
See Also
- Kafka Streams (kafka/03): Window operations in Kafka Streams
- Data Engineering Streaming (data-engineering/022): Windowed aggregation patterns in streaming