β° Time Series Analysis in PySpark
Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β TIME SERIES PROCESSING PIPELINE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β Raw Time ββββββΆβ Temporal ββββββΆβ Window β β
β β Series Data β β Alignment β β Functions β β
β ββββββββββββββββ ββββββββ¬ββββββββ ββββββββ¬ββββββββ β
β β β β
β βΌ βΌ β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β Gap Detection β β Rolling β β
β β & Filling β β Statistics β β
β β βββββββββββββ β β βββββββββββββ β β
β β Forward Fill β β Moving Average β β
β β Backward Fill β β Exponential β β
β β Interpolation β β Weighted Avg β β
β ββββββββββ¬ββββββββββ ββββββββββ¬ββββββββββ β
β β β β
β βΌ βΌ β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β Resampling β β Seasonal β β
β β (Down/Up) β β Decomposition β β
β β βββββββββββββ β β βββββββββββββ β β
β β HourlyβDaily β β Trend β β
β β DailyβWeekly β β Seasonality β β
β β IrregularβReg β β Residual β β
β ββββββββββ¬ββββββββββ ββββββββββ¬ββββββββββ β
β β β β
β βΌ βΌ β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β Anomaly β β Forecasting β β
β β Detection β β Features β β
β β βββββββββββββ β β βββββββββββββ β β
β β Z-Score β β Lag Features β β
β β IQR β β Time Features β β
β β Isolation Forestβ β Fourier Terms β β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β WINDOW FUNCTION TYPES β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β TUMbling WINDOW (Non-overlapping) β
β ββββββββββ ββββββββββ ββββββββββ ββββββββββ β
β β 00-06 β β 06-12 β β 12-18 β β 18-24 β β
β ββββββββββ ββββββββββ ββββββββββ ββββββββββ β
β Each row belongs to exactly ONE window β
β β
β SLIDING WINDOW (Overlapping) β
β ββββββββββββββ β
β β 00-06 β β
β β ββββββββββββββ β
β β β 02-08 β β
β β β ββββββββββββββ β
β β β β 04-10 β β
β βββββββββββββββββββββββββββ β
β Each row belongs to MULTIPLE windows β
β β
β SESSION Window (Activity-based) β
β βββββββ βββββββββ ββββ ββββββββββββββ β
β β A β β B β βC β β D β β
β βββββββ βββββββββ ββββ ββββββββββββββ β
β Windows defined by gaps in activity (gap threshold) β
β β
β GLOBAL Window (All rows) β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β All data in a single window β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β TEMPORAL DATA ALIGNMENT PATTERNS β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Irregular Time Series Regular Time Series β
β βββββββββββββββββββββββ βββββββββββββββββββββββ β
β β 10:00 10:03 10:15 β β 10:00 10:05 10:10 β β
β β β β β β β β β β β β
β β β β β β β β β β β β
β β β β β ββββΆ β β β β β β
β β 10:02 10:08 10:20β β 10:15 10:20 10:25 β β
β β β β β β β β β β β β
β β β β β β β β β β β β
β βββββββββββββββββββββββ βββββββββββββββββββββββ β
β Timestamps vary Fixed interval (5-min) β
β β
β Interpolation Methods: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Linear: ββββββββββ/ββββββββββ/ββββββββββ β β
β β Nearest: βββββββββ|ββββββββββ|ββββββββββ β β
β β Cubic: ~~~~~~~~~/~~~~~~~~~~/~~~~~~~~~~ β β
β β Zero: βββββββββ|ββββββββββ|ββββββββββ β β
β β Spline: ~~~~~~~~~/~~~~~~~~~~/~~~~~~~~~~ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Detailed Explanation
Time series analysis in PySpark requires careful handling of temporal semantics that differ significantly from standard relational operations. The fundamental challenge is that time series data has an inherent ordering that must be preserved throughout the processing pipeline. Unlike standard DataFrame operations which are unordered by default, time series operations must respect the temporal sequence and handle gaps, irregularities, and varying frequencies.
Window functions are the primary tool for time series computation in PySpark. The Window specification defines a frame over which aggregate functions operate. For time series, the most common pattern is to order by a timestamp column and define a range-based or row-based frame. Range-based frames (Window.orderBy("timestamp").rangeBetween(-3600, 0)) define the window in terms of the actual values (e.g., one hour back), while row-based frames (Window.orderBy("timestamp").rowsBetween(-5, 0)) define the window in terms of row counts (e.g., 5 rows back).
The distinction between range and row frames is critical. Range frames handle irregular time series correctlyβwhen timestamps are not evenly spaced, a range frame of one hour will include all records within the past hour regardless of how many records fall in that period. Row frames always include exactly the specified number of rows, which may span different time durations depending on the data density. For most time series applications, range frames provide the correct semantics.
Gap detection and filling is essential for preparing time series data for analysis. Real-world sensor data, transaction logs, and event streams frequently have missing timestamps. PySpark provides multiple strategies for handling gaps: forward fill (carry the last known value forward), backward fill (propagate the next known value backward), and interpolation (estimate missing values based on surrounding values). The choice depends on the data characteristicsβforward fill is appropriate for slowly changing metrics, while interpolation is better for continuously varying signals.
Resampling involves changing the frequency of a time series, either upsampling (increasing frequency by adding interpolated values) or downsampling (decreasing frequency by aggregating). In PySpark, downsampling is accomplished using window functions with tumbling windows, while upsampling requires creating the target timestamps and joining with interpolated values. The aggregation function for downsampling (mean, sum, last, first) must be chosen based on the metric semanticsβsum is appropriate for counts, while mean or last is better for levels.
Seasonal decomposition separates a time series into trend, seasonal, and residual components. While PySpark doesn't have built-in decomposition like statsmodels, the components can be computed using window functions. The trend is typically a centered moving average, the seasonal component is the average deviation from the trend at each seasonal position, and the residual is what remains after removing trend and seasonal components. This decomposition is useful for anomaly detection, forecasting preprocessing, and pattern identification.
Mathematical Foundations
Definition: Time Series Decomposition
A time series can be decomposed as:
where is the trend component, is the seasonal component with period (), and is the residual.
Moving Average Smoothing
For window size (odd), the centered moving average is:
This removes seasonal variation when equals the seasonal period.
Stationarity Theorem (Wold's Decomposition)
A weakly stationary process (constant mean , autocovariance depending only on lag ) can be represented as:
where are white noise and . Non-stationary series require differencing times: where is the backshift operator.
ARIMA Model
The ARIMA model is:
where and .
Interpolation Error Bound
For linear interpolation between points and at time :
Key Insight
Spark's Window.rowsBetween() enables distributed moving average computation, but partitioning by time series ID is critical. Without proper partitioning, rows from different series mix across partitions, producing incorrect results.
Summary
Time series analysis on Spark leverages window functions for decomposition, moving averages, and lag/lead operations. Stationarity is required for ARIMA models and is tested via ADF/KPSS tests. Interpolation accuracy depends on the gap size relative to the underlying signal's smoothness.
Key Concepts Table
| Concept | Description | PySpark Implementation |
|---|---|---|
| Tumbling Window | Non-overlapping fixed-size windows | Window.orderBy("ts").rangeBetween(-interval, 0) |
| Sliding Window | Overlapping windows with fixed step | Window.orderBy("ts").rowsBetween(-size, 0) |
| Session Window | Activity-based windows with gap threshold | Custom implementation with gap detection |
| Range Frame | Window defined by value range | rangeBetween(start, end) |
| Row Frame | Window defined by row count | rowsBetween(start, end) |
| Forward Fill | Carry last known value forward | last("value", ignorenulls=True) |
| Backward Fill | Propagate next known value backward | first("value", ignorenulls=True) |
| Linear Interpolation | Estimate missing values linearly | Custom UDF with linear regression |
| Resampling | Change frequency of time series | Tumbling window + aggregation |
| Seasonal Decomposition | Separate trend/seasonal/residual | Centered MA + seasonal averaging |
Code Examples
Window Functions for Time Series
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
spark = SparkSession.builder \
.appName("TimeSeriesAnalysis") \
.getOrCreate()
# Create sensor data with irregular timestamps
data = [
("sensor_1", "2024-01-15 08:00:00", 22.5, 45.0),
("sensor_1", "2024-01-15 08:03:00", 22.7, 44.8),
("sensor_1", "2024-01-15 08:15:00", 23.1, 43.5),
("sensor_1", "2024-01-15 08:20:00", 22.9, 44.2),
("sensor_1", "2024-01-15 08:35:00", 23.5, 42.0),
("sensor_1", "2024-01-15 08:40:00", 23.2, 43.1),
("sensor_1", "2024-01-15 09:00:00", 24.0, 41.5),
("sensor_1", "2024-01-15 09:10:00", 23.8, 42.3),
]
df = spark.createDataFrame(data, ["sensor_id", "timestamp", "temperature", "humidity"])
df = df.withColumn("ts", col("timestamp").cast("timestamp"))
# Define window specifications
# 30-minute tumbling window
tumbling_window = Window \
.partitionBy("sensor_id") \
.orderBy(col("ts").cast("long")) \
.rangeBetween(-1800, 0) # 30 minutes in seconds
# 5-row sliding window
sliding_window = Window \
.partitionBy("sensor_id") \
.orderBy("ts") \
.rowsBetween(-4, 0) # Current row + 4 previous
# 1-hour range window
hourly_range = Window \
.partitionBy("sensor_id") \
.orderBy(col("ts").cast("long")) \
.rangeBetween(-3600, 0)
# Apply window functions
result = df \
.withColumn("rolling_avg_temp", avg("temperature").over(tumbling_window)) \
.withColumn("rolling_max_temp", max("temperature").over(tumbling_window)) \
.withColumn("rolling_min_humidity", min("humidity").over(tumbling_window)) \
.withColumn("row_avg_temp", avg("temperature").over(sliding_window)) \
.withColumn("hourly_stddev", stddev("temperature").over(hourly_range)) \
.withColumn("temp_rank", rank().over(
Window.partitionBy("sensor_id").orderBy(desc("temperature"))
))
result.show(truncate=False)
Gap Detection and Filling
# Generate expected timestamps (every 5 minutes)
from pyspark.sql.functions import sequence, explode, collect_list
# Create reference time series (regular 5-minute intervals)
min_ts = df.agg(min("ts")).first()[0]
max_ts = df.agg(max("ts")).first()[0]
reference_df = spark.sql(f"""
SELECT explode(sequence(
'{min_ts}',
'{max_ts}',
interval 5 minutes
)) AS expected_ts
""")
# Detect gaps
gaps_df = df.select("sensor_id", "ts") \
.withColumnRenamed("ts", "actual_ts") \
.crossJoin(reference_df) \
.withColumn("diff_seconds",
abs(col("expected_ts").cast("long") - col("actual_ts").cast("long"))
) \
.filter(col("diff_seconds") > 120) # More than 2 minutes off
print("Detected gaps:")
gaps_df.groupBy("sensor_id").count().show()
# Forward fill missing values
forward_fill_window = Window \
.partitionBy("sensor_id") \
.orderBy("ts") \
.rowsBetween(Window.unboundedPreceding, 0)
filled_df = df \
.withColumn("temp_filled", last("temperature", ignorenulls=True).over(forward_fill_window)) \
.withColumn("humidity_filled", last("humidity", ignorenulls=True).over(forward_fill_window))
# Linear interpolation for temperature
@udf(returnType=DoubleType())
def linear_interpolate(values, timestamps, target_ts):
"""Linear interpolation between two points."""
if values is None or len(values) < 2:
return values[0] if values else None
for i in range(len(timestamps) - 1):
if timestamps[i] <= target_ts <= timestamps[i + 1]:
t0, t1 = timestamps[i], timestamps[i + 1]
v0, v1 = values[i], values[i + 1]
if t1 == t0:
return v0
ratio = (target_ts - t0) / (t1 - t0)
return v0 + ratio * (v1 - v0)
return values[-1]
# Apply interpolation using window functions
interpolated_df = df \
.withColumn("ts_long", col("ts").cast("long")) \
.withColumn("temp_array", collect_list("temperature").over(
Window.partitionBy("sensor_id").orderBy("ts")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
)) \
.withColumn("ts_array", collect_list("ts_long").over(
Window.partitionBy("sensor_id").orderBy("ts")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
)) \
.withColumn("temp_interpolated",
linear_interpolate(col("temp_array"), col("ts_array"), col("ts_long"))
)
Resampling and Seasonal Analysis
# Resample to hourly frequency
hourly_df = df \
.withColumn("hour", date_trunc("hour", col("ts"))) \
.groupBy("sensor_id", "hour") \
.agg(
avg("temperature").alias("avg_temp"),
max("temperature").alias("max_temp"),
min("temperature").alias("min_temp"),
avg("humidity").alias("avg_humidity"),
count("*").alias("readings_count")
)
# Compute daily aggregates
daily_df = df \
.withColumn("date", to_date(col("ts"))) \
.groupBy("sensor_id", "date") \
.agg(
avg("temperature").alias("daily_avg_temp"),
stddev("temperature").alias("daily_stddev_temp"),
(max("temperature") - min("temperature")).alias("daily_range")
)
# Seasonal decomposition (hourly pattern)
hour_of_day = Window \
.partitionBy("sensor_id") \
.orderBy(hour(col("ts"))) \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
seasonal_df = df \
.withColumn("hour_of_day", hour(col("ts"))) \
.withColumn("daily_avg", avg("temperature").over(
Window.partitionBy("sensor_id", to_date(col("ts")))
)) \
.withColumn("hourly_seasonal", avg("temperature").over(hour_of_day)) \
.withColumn("trend", avg("temperature").over(
Window.partitionBy("sensor_id").orderBy("ts")
.rowsBetween(-12, 12) # 1-hour centered moving average
)) \
.withColumn("residual", col("temperature") - col("trend") - col("hourly_seasonal"))
# Anomaly detection using z-score
stats_df = df \
.withColumn("rolling_mean", avg("temperature").over(hourly_range)) \
.withColumn("rolling_std", stddev("temperature").over(hourly_range)) \
.withColumn("z_score",
(col("temperature") - col("rolling_mean")) / col("rolling_std")
) \
.withColumn("is_anomaly", abs(col("z_score")) > 3)
anomalies = stats_df.filter(col("is_anomaly"))
print(f"Detected {anomalies.count()} anomalies")
anomalies.show(truncate=False)
Performance Metrics
| Operation | 1K Records | 100K Records | 10M Records | 100M Records |
|---|---|---|---|---|
| Tumbling Window Aggregation | < 1 sec | 2-5 sec | 30-60 sec | 5-10 min |
| Sliding Window (size 100) | < 1 sec | 3-8 sec | 45-90 sec | 8-15 min |
| Range Window (1 hour) | < 1 sec | 2-5 sec | 20-40 sec | 3-8 min |
| Gap Detection | < 1 sec | 1-3 sec | 10-20 sec | 2-5 min |
| Forward Fill | < 1 sec | 1-2 sec | 8-15 sec | 1-3 min |
| Linear Interpolation | < 1 sec | 5-10 sec | 60-120 sec | 10-20 min |
| Resampling (Downsample) | < 1 sec | 1-3 sec | 10-20 sec | 2-5 min |
| Seasonal Decomposition | < 1 sec | 3-8 sec | 30-60 sec | 5-10 min |
| Z-Score Anomaly Detection | < 1 sec | 2-5 sec | 20-40 sec | 3-8 min |
| Timezone Conversion | < 1 sec | 1-2 sec | 10-20 sec | 2-5 min |
Best Practices
- Always cast timestamps to proper TimestampType before window operations to ensure correct time arithmetic
- Use range-based frames for irregular time series to handle missing timestamps correctly
- Cache intermediate results when performing multiple window operations on the same DataFrame to avoid recomputation
- Partition by time first, then by entity (sensor_id, user_id) for optimal data locality in window functions
- Use
rangeBetweenwith seconds (long values) rather than string intervals for better performance - Implement watermarks for streaming time series to handle late-arriving data gracefully
- Avoid UDFs for interpolation when possibleβuse built-in window functions with collect_list for better performance
- Set
spark.sql.shuffle.partitionsappropriately for time series joins to avoid skew from temporal patterns - Use
date_truncfor resampling instead of custom UDFs to leverage Spark's optimized datetime functions - Monitor for temporal skew when windowingβconcentrated timestamps in certain periods can cause partition imbalance
- Use
Sessionizepattern for session windows: sort by timestamp, compute gap, assign session ID using cumulative sum of gaps - Pre-compute time features (hour, day_of_week, month) and persist them to avoid repeated extraction across analyses
See also: Snowflake Time Travel (snowflake/02), Kafka CDC (kafka/04), Airflow DAGs (airflow/02)