13. Window Operations in PySpark

Free Lesson

Advertisement

13. Window Operations in PySpark

DfTumbling Window

A tumbling window is a fixed-size, non-overlapping window that partitions the event timeline into equal intervals. Each event belongs to exactly one window: window_start = floor(event_time / window_size) Γ— window_size.

DfSliding Window

A sliding window has a fixed size but slides by a step interval smaller than the window size. Events can belong to multiple overlapping windows, increasing computation proportional to window_size / slide_interval.

DfSession Window

A session window groups events based on activity gaps. The window closes when no events arrive within a specified gap threshold. Window size is dynamic and depends on event arrival patterns.

Wi=leftlfloorfracEtimeSwindowrightrfloortimesSwindowW_i = \\left\\lfloor \\frac{E_{time}}{S_{window}} \\right\\rfloor \\times S_{window}

Sliding Window Overlap Factor

Foverlap=fracSwindowSslideF_{overlap} = \\frac{S_{window}}{S_{slide}}

Here,

  • FoverlapF_{overlap}=Number of windows each event appears in (on average)
  • SwindowS_{window}=Window size
  • SslideS_{slide}=Slide interval

Tumbling windows are the most efficient because each event maps to exactly one window. Sliding windows cause each event to appear in multiple windows, increasing computation and state size proportionally.

Use watermarks with window operations to bound state growth. Without a watermark, Spark must retain state indefinitely to handle arbitrarily late data, which causes unbounded memory growth.

ThWindow State Growth

Theorem: For sliding windows with overlap factor F_{overlap}, the number of active windows at any time is ⌈S_{window} / S_{slide}βŒ‰, and state storage grows proportionally. State cleanup occurs only when the watermark passes the window end time.

  • Tumbling: non-overlapping, fixed size, most efficient (1 window per event)
  • Sliding: overlapping, fixed size, higher computation proportional to overlap factor
  • Session: dynamic size, gap-based, highest complexity
  • Watermarks bound state growth by triggering window cleanup

πŸ—οΈ Window Types Architecture

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    WINDOW TYPES IN PYSPARK                               β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    TUMBLING WINDOWS                              β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  Time ─────────────────────────────────────────────────────▢    β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β”‚
β”‚  β”‚  β”‚ Window 1β”‚ β”‚ Window 2β”‚ β”‚ Window 3β”‚ β”‚ Window 4β”‚ β”‚ Window 5β”‚  β”‚   β”‚
β”‚  β”‚  β”‚ 0-5min  β”‚ β”‚ 5-10min β”‚ β”‚ 10-15minβ”‚ β”‚ 15-20minβ”‚ β”‚ 20-25minβ”‚  β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  β€’ Fixed-size, non-overlapping windows                          β”‚   β”‚
β”‚  β”‚  β€’ Each event belongs to exactly one window                      β”‚   β”‚
β”‚  β”‚  β€’ Simple and efficient                                         β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    SLIDING WINDOWS                               β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  Time ─────────────────────────────────────────────────────▢    β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                            β”‚   β”‚
β”‚  β”‚  β”‚    Window 1     β”‚                                            β”‚   β”‚
β”‚  β”‚  β”‚    0-10min      β”‚                                            β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                            β”‚   β”‚
β”‚  β”‚       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                       β”‚   β”‚
β”‚  β”‚       β”‚    Window 2     β”‚                                       β”‚   β”‚
β”‚  β”‚       β”‚    2-12min      β”‚                                       β”‚   β”‚
β”‚  β”‚       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                       β”‚   β”‚
β”‚  β”‚            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                  β”‚   β”‚
β”‚  β”‚            β”‚    Window 3     β”‚                                  β”‚   β”‚
β”‚  β”‚            β”‚    4-14min      β”‚                                  β”‚   β”‚
β”‚  β”‚            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                  β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  β€’ Overlapping windows with configurable slide interval         β”‚   β”‚
β”‚  β”‚  β€’ Each event can belong to multiple windows                    β”‚   β”‚
β”‚  β”‚  β€’ Useful for rolling aggregations                              β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    SESSION WINDOWS                               β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  Time ─────────────────────────────────────────────────────▢    β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  Events:  *    **    *      ***    *     **    *                 β”‚   β”‚
β”‚  β”‚           β”‚    β”‚     β”‚      β”‚      β”‚     β”‚     β”‚                 β”‚   β”‚
β”‚  β”‚           β–Ό    β–Ό     β–Ό      β–Ό      β–Ό     β–Ό     β–Ό                 β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”‚   β”‚
β”‚  β”‚  β”‚  Session 1  β”‚ β”‚    Session 2     β”‚ β”‚ Session 3 β”‚            β”‚   β”‚
β”‚  β”‚  β”‚  (gap <10m) β”‚ β”‚    (gap <10m)    β”‚ β”‚ (gap <10m)β”‚            β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜            β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  β€’ Dynamic window size based on activity gaps                   β”‚   β”‚
β”‚  β”‚  β€’ Windows merge when gap threshold is exceeded                 β”‚   β”‚
β”‚  β”‚  β€’ Ideal for user session analysis                              β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ”„ Window Aggregation Flow

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    WINDOW AGGREGATION FLOW                               β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                         β”‚
β”‚  Input Events:                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚  (t=1, v=10)  (t=2, v=20)  (t=3, v=15)  (t=4, v=25)           β”‚   β”‚
β”‚  β”‚      β”‚             β”‚             β”‚             β”‚                 β”‚   β”‚
β”‚  β”‚      β–Ό             β–Ό             β–Ό             β–Ό                 β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                              β”‚                                          β”‚
β”‚                              β–Ό                                          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    WINDOW ASSIGNMENT                             β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  Tumbling (5min):                                               β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚   β”‚
β”‚  β”‚  β”‚ Window [0-5]: (t=1,v=10), (t=2,v=20), (t=3,v=15),     β”‚    β”‚   β”‚
β”‚  β”‚  β”‚              (t=4,v=25)                                 β”‚    β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  Sliding (5min window, 2min slide):                             β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚   β”‚
β”‚  β”‚  β”‚ Window [0-5]: (t=1,v=10), (t=2,v=20), (t=3,v=15),     β”‚    β”‚   β”‚
β”‚  β”‚  β”‚              (t=4,v=25)                                 β”‚    β”‚   β”‚
β”‚  β”‚  β”‚ Window [2-7]: (t=2,v=20), (t=3,v=15), (t=4,v=25)       β”‚    β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                              β”‚                                          β”‚
β”‚                              β–Ό                                          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    AGGREGATION RESULTS                           β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚   β”‚
β”‚  β”‚  β”‚  Window Start β”‚ Window End β”‚ Sum   β”‚ Count β”‚ Average    β”‚    β”‚   β”‚
β”‚  β”‚  │───────────────┼────────────┼───────┼───────┼──────────  β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  00:00        β”‚ 00:05      β”‚ 70    β”‚ 4     β”‚ 17.5       β”‚    β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  Output Mode: Complete (all results) or Update (changes only)  β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ’§ Watermark Integration Diagram

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    WATERMARK INTEGRATION WITH WINDOWS                    β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                         β”‚
β”‚  Event Timeline:                                                       β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚  Time ─────────────────────────────────────────────────────▢    β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  Events:    *  *     *  *     *  *     *  *     *  *            β”‚   β”‚
β”‚  β”‚            t=1 t=2  t=3 t=4  t=5 t=6  t=7 t=8  t=9 t=10       β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  Watermark (threshold=3):                                       β”‚   β”‚
β”‚  β”‚  ═══════════════════════════════════════════════════════════    β”‚   β”‚
β”‚  β”‚  Events older than (max_event_time - 3) are considered late    β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                         β”‚
β”‚  State Management with Watermark:                                      β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  Without Watermark:                                             β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚   β”‚
β”‚  β”‚  β”‚  State: [t=1, t=2, t=3, t=4, t=5, t=6, t=7, t=8, ...]  β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  Size: Grows indefinitely                                β”‚    β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  With Watermark (threshold=3):                                  β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚   β”‚
β”‚  β”‚  β”‚  At t=6: State = [t=3, t=4, t=5, t=6] (cleaned t=1,2) β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  At t=8: State = [t=5, t=6, t=7, t=8] (cleaned t=3,4) β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  Size: Bounded by watermark threshold                   β”‚    β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                         β”‚
β”‚  Late Data Handling:                                                   β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  Late Event (arrives after watermark):                          β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚   β”‚
β”‚  β”‚  β”‚  Event at t=2 arrives when watermark is at t=7          β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  Difference: 7 - 2 = 5 > threshold (3)                 β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  Action: Event is dropped or handled separately         β”‚    β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  Options for late data:                                         β”‚   β”‚
β”‚  β”‚  β€’ Drop late events (default)                                   β”‚   β”‚
β”‚  β”‚  β€’ Side output for late events                                  β”‚   β”‚
β”‚  β”‚  β€’ Update existing windows with late events                     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ“š Detailed Explanation

Window operations in PySpark are fundamental for time-based aggregations in streaming data. They allow you to group events into temporal windows and perform aggregations like sum, count, average, and more over these windows. Understanding the different window types and their characteristics is crucial for building effective real-time analytics pipelines.

Tumbling windows are the simplest form of windowing. They divide time into fixed-size, non-overlapping intervals. Each event belongs to exactly one window based on its event time. Tumbling windows are ideal for periodic reporting, such as calculating hourly sales totals or minute-by-minute sensor readings. The window size determines the granularity of the aggregation.

Sliding windows provide more flexibility by allowing overlapping windows. A sliding window is defined by two parameters: the window size and the slide interval. The window size determines the duration of each window, while the slide interval determines how often a new window starts. When the slide interval is smaller than the window size, windows overlap, and events can belong to multiple windows. This is useful for rolling averages or moving statistics.

Session windows are dynamic windows that group events based on activity patterns. They define a maximum gap between events; if the gap exceeds this threshold, a new session starts. Session windows are particularly useful for analyzing user behavior, where sessions represent periods of continuous activity. They automatically handle varying session lengths and can merge sessions that are close together.

Watermarking is essential for window operations in streaming scenarios. It defines a threshold for handling late-arriving data. Events that arrive after the watermark threshold are considered too late to be included in aggregations. Watermarking allows the system to clean up old state and provides memory guarantees. The watermark threshold should be chosen based on the expected lateness of events in the data.

Window aggregation in Spark Streaming involves several steps: event time extraction, window assignment, state management, and output generation. The system maintains state for each window and updates it as new events arrive. When a window closes (based on the watermark), the aggregated results are emitted to the sink.

The choice of window type depends on the use case. Tumbling windows are best for periodic reporting, sliding windows for rolling calculations, and session windows for activity-based analysis. Each type has different characteristics in terms of state management, late data handling, and output patterns.

Performance considerations include window size, state size, and watermark configuration. Larger windows or smaller watermarks can lead to larger state sizes, which may impact memory usage and checkpoint duration. It's important to balance the window configuration with the available resources and the expected data volume.

Advanced features include multi-window aggregations, where multiple window types are applied simultaneously, and window joins, where data from different windows is joined together. These features enable complex real-time analytics use cases.

πŸ“Š Key Concepts Table

Window TypeSizeOverlapUse CaseState Management
TumblingFixedNoPeriodic reportingSimple, bounded
SlidingFixedYesRolling calculationsModerate, multiple windows
SessionDynamicDepends on gapUser activity analysisComplex, variable size
GlobalEntire streamN/ACumulative statisticsUnbounded without watermark

πŸ’» Code Examples

Tumbling Window Aggregation

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("TumblingWindow") \
    .getOrCreate()

# Read streaming data
stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "sensor_data") \
    .load() \
    .selectExpr("CAST(value AS STRING) as json") \
    .select(
        from_json(col("json"), "device_id INT, temperature DOUBLE, timestamp TIMESTAMP").alias("data")
    ).select("data.*")

# Tumbling window aggregation (5-minute windows)
tumbling_agg = stream_df \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window("timestamp", "5 minutes"),  # 5-minute tumbling window
        "device_id"
    ).agg(
        avg("temperature").alias("avg_temp"),
        min("temperature").alias("min_temp"),
        max("temperature").alias("max_temp"),
        count("*").alias("reading_count")
    ).select(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        "device_id", "avg_temp", "min_temp", "max_temp", "reading_count"
    )

# Write to console
query = tumbling_agg.writeStream \
    .outputMode("update") \
    .format("console") \
    .option("checkpointLocation", "/checkpoint/tumbling") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

Sliding Window with Multiple Aggregations

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("SlidingWindow") \
    .getOrCreate()

# Read streaming data
stream_df = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 100) \
    .load() \
    .withColumn("event_time", col("timestamp")) \
    .withColumn("user_id", (col("value") % 10).cast(IntegerType())) \
    .withColumn("amount", (col("value") * 1.5).cast(DoubleType()))

# Sliding window aggregation (10-minute window, 2-minute slide)
sliding_agg = stream_df \
    .withWatermark("event_time", "15 minutes") \
    .groupBy(
        window("event_time", "10 minutes", "2 minutes"),  # 10min window, 2min slide
        "user_id"
    ).agg(
        sum("amount").alias("total_amount"),
        avg("amount").alias("avg_amount"),
        count_distinct("user_id").alias("unique_users")
    ).select(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        "user_id", "total_amount", "avg_amount", "unique_users"
    )

# Write to console
query = sliding_agg.writeStream \
    .outputMode("update") \
    .format("console") \
    .option("checkpointLocation", "/checkpoint/sliding") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

Session Window Analysis

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("SessionWindow") \
    .getOrCreate()

# Read streaming data (user events)
stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "user_events") \
    .load() \
    .selectExpr("CAST(value AS STRING) as json") \
    .select(
        from_json(col("json"), "user_id INT, action STRING, event_time TIMESTAMP").alias("data")
    ).select("data.*")

# Session window aggregation (10-minute session gap)
session_agg = stream_df \
    .withWatermark("event_time", "20 minutes") \
    .groupBy(
        session("event_time", "10 minutes"),  # 10-minute session gap
        "user_id"
    ).agg(
        collect_list("action").alias("actions"),
        count("*").alias("event_count"),
        min("event_time").alias("session_start"),
        max("event_time").alias("session_end")
    ).select(
        col("session_start"),
        col("session_end"),
        "user_id", "actions", "event_count",
        (unix_timestamp("session_end") - unix_timestamp("session_start")).alias("session_duration_seconds")
    )

# Write to console
query = session_agg.writeStream \
    .outputMode("update") \
    .format("console") \
    .option("checkpointLocation", "/checkpoint/session") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

Advanced: Multi-Window Aggregation

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("MultiWindowAggregation") \
    .getOrCreate()

# Read streaming data
stream_df = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 50) \
    .load() \
    .withColumn("event_time", col("timestamp")) \
    .withColumn("metric_value", col("value").cast(DoubleType()))

# Define multiple window specifications
window_1min = window("event_time", "1 minute")
window_5min = window("event_time", "5 minutes")
window_15min = window("event_time", "15 minutes")

# Apply watermark
watermarked_df = stream_df.withWatermark("event_time", "30 minutes")

# Multi-window aggregation
multi_window_agg = watermarked_df \
    .groupBy(
        window_1min,
        window_5min,
        window_15min
    ).agg(
        avg("metric_value").alias("avg_value"),
        count("*").alias("count")
    ).select(
        col("window_1min.start").alias("window_1min_start"),
        col("window_1min.end").alias("window_1min_end"),
        col("window_5min.start").alias("window_5min_start"),
        col("window_5min.end").alias("window_5min_end"),
        col("window_15min.start").alias("window_15min_start"),
        col("window_15min.end").alias("window_15min_end"),
        "avg_value", "count"
    )

# Write to console
query = multi_window_agg.writeStream \
    .outputMode("update") \
    .format("console") \
    .option("checkpointLocation", "/checkpoint/multi_window") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

πŸ“ˆ Performance Metrics

Window TypeLatencyThroughputState SizeMemory Usage
Tumbling (1min)~1minHighSmallLow
Tumbling (5min)~5minHighMediumMedium
Sliding (10min/2min)~10minMediumLargeHigh
Session (10min gap)VariableMediumVariableVariable
GlobalUnboundedLowVery LargeVery High

πŸ† Best Practices

  1. Use appropriate window size - Balance between granularity and performance
  2. Configure watermarks properly - Essential for state management and late data handling
  3. Monitor state sizes - Track state growth to prevent memory issues
  4. Choose output mode wisely - Append for simple, Complete for full results
  5. Handle late data explicitly - Define strategy for events arriving after watermark
  6. Optimize window intervals - Use slide intervals that match business requirements
  7. Test with realistic data - Validate window behavior with production-like event patterns
  8. Consider session merging - For session windows, tune gap threshold based on activity patterns
  9. Use checkpointing - Essential for fault tolerance in windowed aggregations
  10. Profile performance - Monitor processing times and state sizes for optimization

πŸ”— Related Topics

  • 11-structured-streaming.mdx: Core streaming architecture and triggers
  • 12-state-management.mdx: Stateful operations and checkpointing
  • 14-merge-upsert.mdx: Delta Lake merge operations for windowed data
  • 15-data-quality.mdx: Data validation in windowed aggregations

See Also

  • Kafka Streams (kafka/03): Window operations in Kafka Streams
  • Data Engineering Streaming (data-engineering/022): Windowed aggregation patterns in streaming

Advertisement

Need Expert PySpark Help?

Get personalized Spark optimization, cluster tuning, or production data pipeline consulting.

Advertisement