Time Series Analysis in PySpark

Free Lesson

Advertisement

⏰ Time Series Analysis in PySpark

Architecture Diagram

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  TIME SERIES PROCESSING PIPELINE                        β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                         β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”‚
β”‚   β”‚  Raw Time    │────▢│  Temporal    │────▢│  Window      β”‚          β”‚
β”‚   β”‚  Series Data β”‚     β”‚  Alignment   β”‚     β”‚  Functions   β”‚          β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜          β”‚
β”‚                               β”‚                     β”‚                   β”‚
β”‚                               β–Ό                     β–Ό                   β”‚
β”‚                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”‚
β”‚                    β”‚  Gap Detection   β”‚   β”‚  Rolling         β”‚         β”‚
β”‚                    β”‚  & Filling       β”‚   β”‚  Statistics      β”‚         β”‚
β”‚                    β”‚  ─────────────   β”‚   β”‚  ─────────────   β”‚         β”‚
β”‚                    β”‚  Forward Fill    β”‚   β”‚  Moving Average  β”‚         β”‚
β”‚                    β”‚  Backward Fill   β”‚   β”‚  Exponential     β”‚         β”‚
β”‚                    β”‚  Interpolation   β”‚   β”‚  Weighted Avg    β”‚         β”‚
β”‚                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β”‚
β”‚                             β”‚                      β”‚                    β”‚
β”‚                             β–Ό                      β–Ό                    β”‚
β”‚                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”‚
β”‚                    β”‚  Resampling      β”‚   β”‚  Seasonal        β”‚         β”‚
β”‚                    β”‚  (Down/Up)       β”‚   β”‚  Decomposition   β”‚         β”‚
β”‚                    β”‚  ─────────────   β”‚   β”‚  ─────────────   β”‚         β”‚
│                    │  Hourly→Daily    │   │  Trend           │         │
│                    │  Daily→Weekly    │   │  Seasonality     │         │
│                    │  Irregular→Reg   │   │  Residual        │         │
β”‚                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β”‚
β”‚                             β”‚                      β”‚                    β”‚
β”‚                             β–Ό                      β–Ό                    β”‚
β”‚                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”‚
β”‚                    β”‚  Anomaly         β”‚   β”‚  Forecasting     β”‚         β”‚
β”‚                    β”‚  Detection       β”‚   β”‚  Features        β”‚         β”‚
β”‚                    β”‚  ─────────────   β”‚   β”‚  ─────────────   β”‚         β”‚
β”‚                    β”‚  Z-Score         β”‚   β”‚  Lag Features    β”‚         β”‚
β”‚                    β”‚  IQR             β”‚   β”‚  Time Features   β”‚         β”‚
β”‚                    β”‚  Isolation Forestβ”‚   β”‚  Fourier Terms   β”‚         β”‚
β”‚                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β”‚
β”‚                                                                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    WINDOW FUNCTION TYPES                                 β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                         β”‚
β”‚   TUMbling WINDOW (Non-overlapping)                                     β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”                        β”‚
β”‚   β”‚ 00-06  β”‚ β”‚ 06-12  β”‚ β”‚ 12-18  β”‚ β”‚ 18-24  β”‚                        β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜                        β”‚
β”‚   Each row belongs to exactly ONE window                                β”‚
β”‚                                                                         β”‚
β”‚   SLIDING WINDOW (Overlapping)                                          β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                                       β”‚
β”‚   β”‚  00-06     β”‚                                                       β”‚
β”‚   β”‚     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                                 β”‚
β”‚   β”‚     β”‚  02-08     β”‚                                                 β”‚
β”‚   β”‚     β”‚     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                           β”‚
β”‚   β”‚     β”‚     β”‚  04-10     β”‚                                           β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                           β”‚
β”‚   Each row belongs to MULTIPLE windows                                  β”‚
β”‚                                                                         β”‚
β”‚   SESSION Window (Activity-based)                                       β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                          β”‚
β”‚   β”‚  A  β”‚  β”‚   B   β”‚  β”‚C β”‚  β”‚     D      β”‚                          β”‚
β”‚   β””β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                          β”‚
β”‚   Windows defined by gaps in activity (gap threshold)                  β”‚
β”‚                                                                         β”‚
β”‚   GLOBAL Window (All rows)                                              β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
β”‚   β”‚  All data in a single window                                 β”‚    β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
β”‚                                                                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                 TEMPORAL DATA ALIGNMENT PATTERNS                        β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                         β”‚
β”‚   Irregular Time Series          Regular Time Series                    β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”               β”‚
β”‚   β”‚ 10:00  10:03  10:15 β”‚       β”‚ 10:00 10:05 10:10   β”‚               β”‚
β”‚   β”‚  ↓      ↓      ↓   β”‚       β”‚  ↓      ↓      ↓    β”‚               β”‚
β”‚   β”‚  ●      ●      ●   β”‚       β”‚  ●      ●      ●    β”‚               β”‚
β”‚   β”‚     ●       ●       β”‚  ───▢ β”‚  ●      ●      ●    β”‚               β”‚
β”‚   β”‚  10:02  10:08  10:20β”‚       β”‚ 10:15 10:20 10:25   β”‚               β”‚
β”‚   β”‚  ↓      ↓      ↓   β”‚       β”‚  ↓      ↓      ↓    β”‚               β”‚
β”‚   β”‚  ●      ●      ●   β”‚       β”‚  ●      ●      ●    β”‚               β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜               β”‚
β”‚   Timestamps vary                Fixed interval (5-min)                β”‚
β”‚                                                                         β”‚
β”‚   Interpolation Methods:                                                β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚   β”‚  Linear: ──────────/──────────/──────────                  β”‚     β”‚
β”‚   β”‚  Nearest: ─────────|──────────|──────────                  β”‚     β”‚
β”‚   β”‚  Cubic:   ~~~~~~~~~/~~~~~~~~~~/~~~~~~~~~~                  β”‚     β”‚
β”‚   β”‚  Zero:    ─────────|──────────|──────────                  β”‚     β”‚
β”‚   β”‚  Spline:  ~~~~~~~~~/~~~~~~~~~~/~~~~~~~~~~                  β”‚     β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
β”‚                                                                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Detailed Explanation

Time series analysis in PySpark requires careful handling of temporal semantics that differ significantly from standard relational operations. The fundamental challenge is that time series data has an inherent ordering that must be preserved throughout the processing pipeline. Unlike standard DataFrame operations which are unordered by default, time series operations must respect the temporal sequence and handle gaps, irregularities, and varying frequencies.

Window functions are the primary tool for time series computation in PySpark. The Window specification defines a frame over which aggregate functions operate. For time series, the most common pattern is to order by a timestamp column and define a range-based or row-based frame. Range-based frames (Window.orderBy("timestamp").rangeBetween(-3600, 0)) define the window in terms of the actual values (e.g., one hour back), while row-based frames (Window.orderBy("timestamp").rowsBetween(-5, 0)) define the window in terms of row counts (e.g., 5 rows back).

The distinction between range and row frames is critical. Range frames handle irregular time series correctlyβ€”when timestamps are not evenly spaced, a range frame of one hour will include all records within the past hour regardless of how many records fall in that period. Row frames always include exactly the specified number of rows, which may span different time durations depending on the data density. For most time series applications, range frames provide the correct semantics.

Gap detection and filling is essential for preparing time series data for analysis. Real-world sensor data, transaction logs, and event streams frequently have missing timestamps. PySpark provides multiple strategies for handling gaps: forward fill (carry the last known value forward), backward fill (propagate the next known value backward), and interpolation (estimate missing values based on surrounding values). The choice depends on the data characteristicsβ€”forward fill is appropriate for slowly changing metrics, while interpolation is better for continuously varying signals.

Resampling involves changing the frequency of a time series, either upsampling (increasing frequency by adding interpolated values) or downsampling (decreasing frequency by aggregating). In PySpark, downsampling is accomplished using window functions with tumbling windows, while upsampling requires creating the target timestamps and joining with interpolated values. The aggregation function for downsampling (mean, sum, last, first) must be chosen based on the metric semanticsβ€”sum is appropriate for counts, while mean or last is better for levels.

Seasonal decomposition separates a time series into trend, seasonal, and residual components. While PySpark doesn't have built-in decomposition like statsmodels, the components can be computed using window functions. The trend is typically a centered moving average, the seasonal component is the average deviation from the trend at each seasonal position, and the residual is what remains after removing trend and seasonal components. This decomposition is useful for anomaly detection, forecasting preprocessing, and pattern identification.

Mathematical Foundations

Definition: Time Series Decomposition

A time series YtY_t can be decomposed as:

Yt=Tt+St+Rt(additive)orYt=TtΓ—StΓ—Rt(multiplicative)Y_t = T_t + S_t + R_t \quad \text{(additive)} \qquad \text{or} \qquad Y_t = T_t \times S_t \times R_t \quad \text{(multiplicative)}

where TtT_t is the trend component, StS_t is the seasonal component with period pp (St+p=StS_{t+p} = S_t), and RtR_t is the residual.

Moving Average Smoothing

For window size ww (odd), the centered moving average is:

MAt=1wβˆ‘i=βˆ’(wβˆ’1)/2(wβˆ’1)/2Yt+i\text{MA}_t = \frac{1}{w} \sum_{i=-(w-1)/2}^{(w-1)/2} Y_{t+i}

This removes seasonal variation when ww equals the seasonal period.

Stationarity Theorem (Wold's Decomposition)

A weakly stationary process XtX_t (constant mean ΞΌ\mu, autocovariance Ξ³(h)\gamma(h) depending only on lag hh) can be represented as:

Xt=ΞΌ+βˆ‘j=0∞ψjΟ΅tβˆ’jX_t = \mu + \sum_{j=0}^{\infty} \psi_j \epsilon_{t-j}

where Ο΅t\epsilon_t are white noise and βˆ‘Οˆj2<∞\sum \psi_j^2 < \infty. Non-stationary series require differencing dd times: (1βˆ’B)dXt(1-B)^d X_t where BB is the backshift operator.

ARIMA Model

The ARIMA(p,d,q)(p, d, q) model is:

Ο•(B)(1βˆ’B)dXt=ΞΈ(B)Ο΅t\phi(B)(1-B)^d X_t = \theta(B)\epsilon_t

where Ο•(B)=1βˆ’Ο•1Bβˆ’β‹―βˆ’Ο•pBp\phi(B) = 1 - \phi_1 B - \cdots - \phi_p B^p and ΞΈ(B)=1+ΞΈ1B+β‹―+ΞΈqBq\theta(B) = 1 + \theta_1 B + \cdots + \theta_q B^q.

Interpolation Error Bound

For linear interpolation between points (ti,Yi)(t_i, Y_i) and (ti+1,Yi+1)(t_{i+1}, Y_{i+1}) at time tt:

∣Y(t)βˆ’Y^(t)βˆ£β‰€(ti+1βˆ’ti)28max⁑s∈[ti,ti+1]∣Yβ€²β€²(s)∣|Y(t) - \hat{Y}(t)| \leq \frac{(t_{i+1} - t_i)^2}{8} \max_{s \in [t_i, t_{i+1}]} |Y''(s)|

Key Insight

Spark's Window.rowsBetween() enables distributed moving average computation, but partitioning by time series ID is critical. Without proper partitioning, rows from different series mix across partitions, producing incorrect results.

Summary

Time series analysis on Spark leverages window functions for decomposition, moving averages, and lag/lead operations. Stationarity is required for ARIMA models and is tested via ADF/KPSS tests. Interpolation accuracy depends on the gap size relative to the underlying signal's smoothness.

Key Concepts Table

ConceptDescriptionPySpark Implementation
Tumbling WindowNon-overlapping fixed-size windowsWindow.orderBy("ts").rangeBetween(-interval, 0)
Sliding WindowOverlapping windows with fixed stepWindow.orderBy("ts").rowsBetween(-size, 0)
Session WindowActivity-based windows with gap thresholdCustom implementation with gap detection
Range FrameWindow defined by value rangerangeBetween(start, end)
Row FrameWindow defined by row countrowsBetween(start, end)
Forward FillCarry last known value forwardlast("value", ignorenulls=True)
Backward FillPropagate next known value backwardfirst("value", ignorenulls=True)
Linear InterpolationEstimate missing values linearlyCustom UDF with linear regression
ResamplingChange frequency of time seriesTumbling window + aggregation
Seasonal DecompositionSeparate trend/seasonal/residualCentered MA + seasonal averaging

Code Examples

Window Functions for Time Series

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("TimeSeriesAnalysis") \
    .getOrCreate()

# Create sensor data with irregular timestamps
data = [
    ("sensor_1", "2024-01-15 08:00:00", 22.5, 45.0),
    ("sensor_1", "2024-01-15 08:03:00", 22.7, 44.8),
    ("sensor_1", "2024-01-15 08:15:00", 23.1, 43.5),
    ("sensor_1", "2024-01-15 08:20:00", 22.9, 44.2),
    ("sensor_1", "2024-01-15 08:35:00", 23.5, 42.0),
    ("sensor_1", "2024-01-15 08:40:00", 23.2, 43.1),
    ("sensor_1", "2024-01-15 09:00:00", 24.0, 41.5),
    ("sensor_1", "2024-01-15 09:10:00", 23.8, 42.3),
]

df = spark.createDataFrame(data, ["sensor_id", "timestamp", "temperature", "humidity"])
df = df.withColumn("ts", col("timestamp").cast("timestamp"))

# Define window specifications
# 30-minute tumbling window
tumbling_window = Window \
    .partitionBy("sensor_id") \
    .orderBy(col("ts").cast("long")) \
    .rangeBetween(-1800, 0)  # 30 minutes in seconds

# 5-row sliding window
sliding_window = Window \
    .partitionBy("sensor_id") \
    .orderBy("ts") \
    .rowsBetween(-4, 0)  # Current row + 4 previous

# 1-hour range window
hourly_range = Window \
    .partitionBy("sensor_id") \
    .orderBy(col("ts").cast("long")) \
    .rangeBetween(-3600, 0)

# Apply window functions
result = df \
    .withColumn("rolling_avg_temp", avg("temperature").over(tumbling_window)) \
    .withColumn("rolling_max_temp", max("temperature").over(tumbling_window)) \
    .withColumn("rolling_min_humidity", min("humidity").over(tumbling_window)) \
    .withColumn("row_avg_temp", avg("temperature").over(sliding_window)) \
    .withColumn("hourly_stddev", stddev("temperature").over(hourly_range)) \
    .withColumn("temp_rank", rank().over(
        Window.partitionBy("sensor_id").orderBy(desc("temperature"))
    ))

result.show(truncate=False)

Gap Detection and Filling

# Generate expected timestamps (every 5 minutes)
from pyspark.sql.functions import sequence, explode, collect_list

# Create reference time series (regular 5-minute intervals)
min_ts = df.agg(min("ts")).first()[0]
max_ts = df.agg(max("ts")).first()[0]

reference_df = spark.sql(f"""
    SELECT explode(sequence(
        '{min_ts}',
        '{max_ts}',
        interval 5 minutes
    )) AS expected_ts
""")

# Detect gaps
gaps_df = df.select("sensor_id", "ts") \
    .withColumnRenamed("ts", "actual_ts") \
    .crossJoin(reference_df) \
    .withColumn("diff_seconds", 
        abs(col("expected_ts").cast("long") - col("actual_ts").cast("long"))
    ) \
    .filter(col("diff_seconds") > 120)  # More than 2 minutes off

print("Detected gaps:")
gaps_df.groupBy("sensor_id").count().show()

# Forward fill missing values
forward_fill_window = Window \
    .partitionBy("sensor_id") \
    .orderBy("ts") \
    .rowsBetween(Window.unboundedPreceding, 0)

filled_df = df \
    .withColumn("temp_filled", last("temperature", ignorenulls=True).over(forward_fill_window)) \
    .withColumn("humidity_filled", last("humidity", ignorenulls=True).over(forward_fill_window))

# Linear interpolation for temperature
@udf(returnType=DoubleType())
def linear_interpolate(values, timestamps, target_ts):
    """Linear interpolation between two points."""
    if values is None or len(values) < 2:
        return values[0] if values else None
    
    for i in range(len(timestamps) - 1):
        if timestamps[i] <= target_ts <= timestamps[i + 1]:
            t0, t1 = timestamps[i], timestamps[i + 1]
            v0, v1 = values[i], values[i + 1]
            if t1 == t0:
                return v0
            ratio = (target_ts - t0) / (t1 - t0)
            return v0 + ratio * (v1 - v0)
    return values[-1]

# Apply interpolation using window functions
interpolated_df = df \
    .withColumn("ts_long", col("ts").cast("long")) \
    .withColumn("temp_array", collect_list("temperature").over(
        Window.partitionBy("sensor_id").orderBy("ts")
        .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    )) \
    .withColumn("ts_array", collect_list("ts_long").over(
        Window.partitionBy("sensor_id").orderBy("ts")
        .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    )) \
    .withColumn("temp_interpolated", 
        linear_interpolate(col("temp_array"), col("ts_array"), col("ts_long"))
    )

Resampling and Seasonal Analysis

# Resample to hourly frequency
hourly_df = df \
    .withColumn("hour", date_trunc("hour", col("ts"))) \
    .groupBy("sensor_id", "hour") \
    .agg(
        avg("temperature").alias("avg_temp"),
        max("temperature").alias("max_temp"),
        min("temperature").alias("min_temp"),
        avg("humidity").alias("avg_humidity"),
        count("*").alias("readings_count")
    )

# Compute daily aggregates
daily_df = df \
    .withColumn("date", to_date(col("ts"))) \
    .groupBy("sensor_id", "date") \
    .agg(
        avg("temperature").alias("daily_avg_temp"),
        stddev("temperature").alias("daily_stddev_temp"),
        (max("temperature") - min("temperature")).alias("daily_range")
    )

# Seasonal decomposition (hourly pattern)
hour_of_day = Window \
    .partitionBy("sensor_id") \
    .orderBy(hour(col("ts"))) \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

seasonal_df = df \
    .withColumn("hour_of_day", hour(col("ts"))) \
    .withColumn("daily_avg", avg("temperature").over(
        Window.partitionBy("sensor_id", to_date(col("ts")))
    )) \
    .withColumn("hourly_seasonal", avg("temperature").over(hour_of_day)) \
    .withColumn("trend", avg("temperature").over(
        Window.partitionBy("sensor_id").orderBy("ts")
        .rowsBetween(-12, 12)  # 1-hour centered moving average
    )) \
    .withColumn("residual", col("temperature") - col("trend") - col("hourly_seasonal"))

# Anomaly detection using z-score
stats_df = df \
    .withColumn("rolling_mean", avg("temperature").over(hourly_range)) \
    .withColumn("rolling_std", stddev("temperature").over(hourly_range)) \
    .withColumn("z_score", 
        (col("temperature") - col("rolling_mean")) / col("rolling_std")
    ) \
    .withColumn("is_anomaly", abs(col("z_score")) > 3)

anomalies = stats_df.filter(col("is_anomaly"))
print(f"Detected {anomalies.count()} anomalies")
anomalies.show(truncate=False)

Performance Metrics

Operation1K Records100K Records10M Records100M Records
Tumbling Window Aggregation< 1 sec2-5 sec30-60 sec5-10 min
Sliding Window (size 100)< 1 sec3-8 sec45-90 sec8-15 min
Range Window (1 hour)< 1 sec2-5 sec20-40 sec3-8 min
Gap Detection< 1 sec1-3 sec10-20 sec2-5 min
Forward Fill< 1 sec1-2 sec8-15 sec1-3 min
Linear Interpolation< 1 sec5-10 sec60-120 sec10-20 min
Resampling (Downsample)< 1 sec1-3 sec10-20 sec2-5 min
Seasonal Decomposition< 1 sec3-8 sec30-60 sec5-10 min
Z-Score Anomaly Detection< 1 sec2-5 sec20-40 sec3-8 min
Timezone Conversion< 1 sec1-2 sec10-20 sec2-5 min

Best Practices

  1. Always cast timestamps to proper TimestampType before window operations to ensure correct time arithmetic
  2. Use range-based frames for irregular time series to handle missing timestamps correctly
  3. Cache intermediate results when performing multiple window operations on the same DataFrame to avoid recomputation
  4. Partition by time first, then by entity (sensor_id, user_id) for optimal data locality in window functions
  5. Use rangeBetween with seconds (long values) rather than string intervals for better performance
  6. Implement watermarks for streaming time series to handle late-arriving data gracefully
  7. Avoid UDFs for interpolation when possibleβ€”use built-in window functions with collect_list for better performance
  8. Set spark.sql.shuffle.partitions appropriately for time series joins to avoid skew from temporal patterns
  9. Use date_trunc for resampling instead of custom UDFs to leverage Spark's optimized datetime functions
  10. Monitor for temporal skew when windowingβ€”concentrated timestamps in certain periods can cause partition imbalance
  11. Use Sessionize pattern for session windows: sort by timestamp, compute gap, assign session ID using cumulative sum of gaps
  12. Pre-compute time features (hour, day_of_week, month) and persist them to avoid repeated extraction across analyses

See also: Snowflake Time Travel (snowflake/02), Kafka CDC (kafka/04), Airflow DAGs (airflow/02)

Advertisement

Need Expert PySpark Help?

Get personalized Spark optimization, cluster tuning, or production data pipeline consulting.

Advertisement