πΎ PySpark Caching and Persistence
DfCaching
Caching stores the results of an RDD or DataFrame in memory (or disk) to avoid recomputing them from lineage when reused. cache() is shorthand for persist(StorageLevel.MEMORY_AND_DISK).
DfStorage Level
A storage level defines where and how data is persisted: MEMORY_ONLY (deserialized in heap), MEMORY_AND_DISK (spill to disk), MEMORY_ONLY_SER (serialized), DISK_ONLY, or OFF_HEAP (outside JVM).
Storage Memory Requirement
Here,
- =Total memory needed to cache the dataset
- =Number of partitions
- =Size per partition (serialized or deserialized)
- =Compression ratio (1.0 if uncompressed, 0.3β0.7 typical)
Use MEMORY_AND_DISK for datasets that may not fully fit in memory β Spark will spill to disk rather than recompute. Use MEMORY_ONLY only when you are certain the data fits and recomputation cost is low.
Always call unpersist() on DataFrames that are no longer reused to free cache memory for other datasets. Cached data is pinned in memory and cannot be evicted by the unified memory manager until explicitly unpersisted.
ThCache vs Recompute Trade-off
Theorem: Caching is beneficial when N_{reuses} Γ Cost_{recompute} > Cost_{cache} where N_{reuses} is the number of times the data is accessed after caching, Cost_{recompute} is the time to recompute from lineage, and Cost_{cache} is the memory footprint cost.
cache()=persist(MEMORY_AND_DISK); usepersist()for custom storage levels- Cache only datasets reused 2+ times; always
unpersist()when done - Target cache hit ratio > 80%; monitor via Spark UI Storage tab
- Checkpointing truncates lineage for very deep DAGs
ποΈ Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β CACHING ARCHITECTURE OVERVIEW β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β DATAFRAME / RDD β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Logical Plan β Physical Plan β RDD lineage β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β CACHE / PERSIST β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β ββββββββββββββββ ββββββββββββββββ β β β
β β β β MEMORY β β DISK β β β β
β β β β (Heap) β β (Local) β β β β
β β β β β β β β β β
β β β β βββββββββββββ β βββββββββββββ β β β
β β β β βPartition ββ β βPartition ββ β β β
β β β β β 0 ββ β β 2 ββ β β β
β β β β βββββββββββββ β βββββββββββββ β β β
β β β β βββββββββββββ β βββββββββββββ β β β
β β β β βPartition ββ β βPartition ββ β β β
β β β β β 1 ββ β β 3 ββ β β β
β β β β βββββββββββββ β βββββββββββββ β β β
β β β ββββββββββββββββ ββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β STORAGE LEVEL DECISION TREE β β
β β β β
β β Is data smaller than memory? β β
β β β β β
β β ββ YES β MEMORY_ONLY (deserialized) β β
β β β MEMORY_ONLY_SER (serialized) β β
β β β β β
β β ββ NO β MEMORY_AND_DISK (spill to disk) β β
β β DISK_ONLY (only on disk) β β
β β β β
β β Need off-heap? β β
β β β β β
β β ββ YES β OFF_HEAP (Tachyon/Alluxio) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β UNIFIED MEMORY MANAGEMENT β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Executor Memory (JVM Heap) β β β
β β β β β β
β β β βββββββββββββββββββ¬ββββββββββββββββββ β β β
β β β β Reserved β Usable β β β β
β β β β (300MB) β β β β β
β β β βββββββββββββββββββ΄ββββββββββββββββββ β β β
β β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Unified Memory β β β β
β β β β ββββββββββββββββ ββββββββββββββββ β β β β
β β β β β Storage β β Execution β β β β β
β β β β β Memory β β Memory β β β β β
β β β β β (50%) β β (50%) β β β β β
β β β β β β β β β β β β
β β β β β Cached RDDs β β Shuffles β β β β β
β β β β β Broadcasts β β Joins β β β β β
β β β β β β β Sorts β β β β β
β β β β ββββββββββββββββ ββββββββββββββββ β β β β
β β β β β β β β
β β β β Storage and Execution can borrow from β β β β
β β β β each other (soft boundary) β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β STORAGE LEVELS COMPARISON β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β MEMORY_ONLY β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Data Format: Deserialized Java/Python objects β β β
β β β Storage: JVM Heap Memory β β β
β β β Speed: β
β
β
β
β
(Fastest access) β β β
β β β Space: β
β
βββ (3-5x overhead) β β β
β β β CPU: β
β
β
β
β
(No ser/deser) β β β
β β β GC: β
β
βββ (High GC pressure) β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β MEMORY_ONLY_SER β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Data Format: Serialized byte arrays β β β
β β β Storage: JVM Heap Memory β β β
β β β Speed: β
β
β
β
β (Serialization overhead) β β β
β β β Space: β
β
β
β
β (1.5-2x overhead) β β β
β β β CPU: β
β
β
ββ (Ser/deser per access) β β β
β β β GC: β
β
β
β
β (Less GC pressure) β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β MEMORY_AND_DISK β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Data Format: Deserialized (memory) / Serializedβ β β
β β β Storage: JVM Heap + Local Disk β β β
β β β Speed: β
β
β
β
β (Disk spill for overflow) β β β
β β β Space: β
β
β
ββ (Memory + Disk) β β β
β β β CPU: β
β
β
β
β (No ser for memory partitions) β β β
β β β GC: β
β
β
ββ (Medium GC pressure) β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β DISK_ONLY β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Data Format: Serialized byte arrays β β β
β β β Storage: Local Disk only β β β
β β β Speed: β
β
βββ (Disk I/O required) β β β
β β β Space: β
β
β
β
β
(Minimal memory usage) β β β
β β β CPU: β
β
β
ββ (Ser/deser per access) β β β
β β β GC: β
β
β
β
β
(No GC pressure) β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β OFF_HEAP β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Data Format: Serialized (Tachyon/Alluxio) β β β
β β β Storage: Off-heap memory (Alluxio) β β β
β β β Speed: β
β
β
β
β (Network/disk, but no GC) β β β
β β β Space: β
β
β
β
β
(Outside JVM) β β β
β β β CPU: β
β
βββ (Ser/deser + network) β β β
β β β GC: β
β
β
β
β
(Zero GC pressure) β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β CACHE EVICTION AND SPILL FLOW β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β MEMORY PRESSURE DETECTION β β
β β β β
β β Storage Memory Usage: ββββββββββββββββββββ (80%) β β
β β Execution Memory Need: ββββββββββββββββββββ (60%) β β
β β β β
β β Threshold: Storage can't exceed 50% without eviction β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β EVICTION DECISION β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β LRU (Least Recently Used) Policy β β β
β β β β β β
β β β Partitions accessed: β β β
β β β P0: 100 times (recent) β β β
β β β P1: 50 times (recent) β β β
β β β P2: 10 times (old) β EVICT β β β
β β β P3: 5 times (old) β EVICT β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β SPILL TO DISK (MEMORY_AND_DISK) β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Step 1: Serialize partition to byte array β β β
β β β Step 2: Write to local disk β β β
β β β Step 3: Update partition metadata β β β
β β β Step 4: Free memory β β β
β β β β β β
β β β Disk Path: /tmp/blockmgr-xxx/ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β RECOMPUTATION (MEMORY_ONLY) β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Step 1: Detect partition not in cache β β β
β β β Step 2: Recompute using lineage β β β
β β β Step 3: Store in cache β β β
β β β Step 4: Return to caller β β β
β β β β β β
β β β Cost: Depends on lineage complexity β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Detailed Explanation
1. Why Cache?
Caching is essential when a DataFrame or RDD is reused multiple times. Without caching, Spark recomputes the entire lineage from the source for each action, which is extremely inefficient.
Benefits:
- Avoids recomputation of expensive transformations
- Reduces I/O (read from cache instead of source)
- Improves iterative algorithm performance
- Enables interactive data exploration
2. Cache vs Persist
cache() is shorthand for persist(StorageLevel.MEMORY_ONLY):
- Stores data in JVM heap as deserialized objects
- Fastest access but highest memory overhead
- Good for small-to-medium datasets that fit in memory
persist() allows choosing a storage level:
MEMORY_ONLY: Same as cache()MEMORY_ONLY_SER: Serialized, saves spaceMEMORY_AND_DISK: Spill to disk if memory fullDISK_ONLY: Only on diskOFF_HEAP: Outside JVM
3. Storage Levels Deep Dive
MEMORY_ONLY:
- Data stored as Java/Python objects
- Fastest access (no deserialization)
- Highest memory overhead (3-5x)
- Best for small datasets, repeated access
MEMORY_ONLY_SER:
- Data stored as serialized byte arrays
- Lower memory overhead (1.5-2x)
- Serialization cost per access
- Good for memory-constrained environments
MEMORY_AND_DISK:
- Memory-first, spill to disk if needed
- Ensures all data is persisted
- Disk spill adds I/O overhead
- Good for datasets larger than memory
DISK_ONLY:
- All data on local disk
- Minimal memory usage
- Disk I/O required for access
- Good for very large datasets
OFF_HEAP:
- Data stored outside JVM (Alluxio/Tachyon)
- Zero GC pressure
- Network/disk access required
- Good for shared caching across applications
4. Unified Memory Management
Spark 1.6+ introduced unified memory management where storage and execution memory share a common pool with a soft boundary:
Memory Layout:
Executor Memory
βββ Reserved (300MB)
βββ Unified Memory
βββ Storage Memory (50%)
β βββ Cached RDDs
β βββ Broadcast variables
β βββ Unroll memory
βββ Execution Memory (50%)
βββ Shuffle buffers
βββ Join hash tables
βββ Sort buffers
βββ Aggregation buffers
Key Behaviors:
- Storage can borrow from execution (when execution is idle)
- Execution can borrow from storage (forces eviction)
- Execution has higher priority for memory
- Soft boundary allows flexible memory sharing
5. Cache Eviction Policies
When memory pressure occurs, Spark evicts cached data using LRU (Least Recently Used) policy:
Eviction Process:
- Monitor memory usage
- When storage exceeds 50% of unified memory
- Identify least recently used partitions
- Evict or spill to disk based on storage level
- Update block manager metadata
Factors Affecting Eviction:
- Access frequency
- Partition size
- Storage level (memory vs disk)
- Memory pressure from execution
6. Cache Metrics and Monitoring
Key Metrics to Monitor:
storageLevel: Current storage levelcachedPartitions: Number of cached partitionscacheSize: Total cached data sizecacheMemory: Memory used for cachingcacheDisk: Disk used for spillingcacheHitRatio: Percentage of cache hits
Spark UI Locations:
- Storage tab: Cached RDDs and DataFrames
- Executors tab: Memory usage per executor
- SQL tab: Cache usage in query plans
7. Common Cache Pitfalls
Pitfall 1: Caching too much
- Fills up memory, causes eviction
- Slows down execution (memory pressure)
Pitfall 2: Not caching when needed
- Recomputes expensive transformations
- Wastes CPU and I/O
Pitfall 3: Wrong storage level
- MEMORY_ONLY for too-large datasets β constant recomputation
- DISK_ONLY for frequently accessed data β disk I/O overhead
Pitfall 4: Forgetting to unpersist
- Memory leaks over time
- Slows down other applications
8. Cache Optimization Strategies
Strategy 1: Cache selectively
# Cache only what's reused
df.cache() # Cache this one
result = df.filter(...).groupBy(...).agg(...) # Don't cache intermediate
Strategy 2: Choose right storage level
# Small dataset, repeated access
df.cache() # MEMORY_ONLY
# Large dataset, fits in memory
df.persist(StorageLevel.MEMORY_ONLY_SER)
# Dataset larger than memory
df.persist(StorageLevel.MEMORY_AND_DISK)
Strategy 3: Monitor and adjust
# Check cache status
print(f"Cached: {df.is_cached}")
print(f"Storage level: {df.storageLevel}")
# Unpersist when done
df.unpersist()
π Key Concepts Table
| Storage Level | Location | Format | Speed | Space | GC | Use Case |
|---|---|---|---|---|---|---|
| MEMORY_ONLY | Heap | Deserialized | β β β β β | β β βββ | β β βββ | Small, repeated access |
| MEMORY_ONLY_SER | Heap | Serialized | β β β β β | β β β β β | β β β β β | Memory-constrained |
| MEMORY_AND_DISK | Heap + Disk | Mixed | β β β β β | β β β ββ | β β β ββ | Large datasets |
| DISK_ONLY | Disk | Serialized | β β βββ | β β β β β | β β β β β | Very large datasets |
| OFF_HEAP | Alluxio | Serialized | β β β β β | β β β β β | β β β β β | Shared caching |
| MEMORY_ONLY_2 | Heap | Deserialized | β β β β β | β β βββ | β β βββ | 2 replicas |
| MEMORY_AND_DISK_2 | Heap + Disk | Mixed | β β β β β | β β β ββ | β β β ββ | Fault tolerance |
π» Code Examples
Example 1: Basic Caching
from pyspark.sql import SparkSession
from pyspark import StorageLevel
spark = SparkSession.builder.appName("CachingPersistence").getOrCreate()
# Create expensive DataFrame
df = spark.range(1000000).withColumn(
"key", col("id") % 1000
).withColumn(
"value", col("id") * 1.0
)
# Cache the DataFrame (MEMORY_ONLY)
df.cache()
# First action triggers computation and caching
count = df.count()
print(f"Count: {count}")
# Subsequent actions use cached version
sum_val = df.groupBy("key").sum("value").count()
print(f"GroupBy count: {sum_val}")
# Check cache status
print(f"Is cached: {df.is_cached}")
print(f"Storage level: {df.storageLevel}")
# Unpersist when done
df.unpersist()
Example 2: Different Storage Levels
# MEMORY_ONLY_SER - Serialized, saves space
df_ser = df.persist(StorageLevel.MEMORY_ONLY_SER)
print(f"MEMORY_ONLY_SER: {df_ser.storageLevel}")
# MEMORY_AND_DISK - Spill to disk if needed
df_disk = df.persist(StorageLevel.MEMORY_AND_DISK)
print(f"MEMORY_AND_DISK: {df_disk.storageLevel}")
# DISK_ONLY - Only on disk
df_only_disk = df.persist(StorageLevel.DISK_ONLY)
print(f"DISK_ONLY: {df_only_disk.storageLevel}")
# Test performance
import time
start = time.time()
for _ in range(5):
df.count()
print(f"MEMORY_ONLY time: {time.time() - start:.2f}s")
start = time.time()
for _ in range(5):
df_ser.count()
print(f"MEMORY_ONLY_SER time: {time.time() - start:.2f}s")
Example 3: Cache Monitoring
# Enable cache metrics
spark.sparkContext.setLogLevel("INFO")
# Create and cache DataFrame
df = spark.range(1000000).cache()
# Trigger caching
df.count()
# Check storage info
storage_info = spark._jsc.sc().getRDDStorageInfo()
for rdd_info in storage_info:
print(f"RDD ID: {rdd_info.name()}")
print(f"Storage Level: {rdd_info.storageLevel()}")
print(f"Number of Partitions: {rdd_info.numCachedPartitions()}")
print(f"Memory Size: {rdd_info.memSize() / 1024 / 1024:.2f} MB")
print(f"Disk Size: {rdd_info.diskSize() / 1024 / 1024:.2f} MB")
Example 4: Cache in Iterative Algorithms
# Example: PageRank-like iterative computation
def iterative_computation(df, num_iterations=10):
# Cache input data
df.cache()
df.count() # Trigger caching
for i in range(num_iterations):
# Use cached data in each iteration
df = df.withColumn(
"value", col("value") * 0.85 + 0.15
)
# Trigger computation
df.count()
print(f"Iteration {i + 1} complete")
# Unpersist when done
df.unpersist()
return df
# Run iterative computation
result = iterative_computation(df)
π Performance Metrics
| Scenario | No Cache | MEMORY_ONLY | MEMORY_ONLY_SER | MEMORY_AND_DISK |
|---|---|---|---|---|
| 1GB, 5 actions | 25.0s | 5.0s | 7.5s | 6.0s |
| 10GB, 5 actions | 250.0s | 50.0s | 75.0s | 60.0s |
| 1GB, 10 actions | 50.0s | 5.5s | 8.0s | 6.5s |
| 10GB, 10 actions | 500.0s | 55.0s | 80.0s | 65.0s |
| Memory Usage | 0 | 3.5GB | 2.0GB | 3.5GB + Disk |
| GC Time | 0 | 500ms | 200ms | 500ms |
| Disk I/O | 0 | 0 | 0 | 2GB |
β Best Practices
1. Cache Selectively
# Cache DataFrame reused multiple times
df = expensive_computation()
df.cache()
# Use cached DataFrame
result1 = df.filter(...).collect()
result2 = df.groupBy(...).agg(...).collect()
# Unpersist when done
df.unpersist()
2. Choose Appropriate Storage Level
# Small dataset, fits in memory
df.cache() # MEMORY_ONLY
# Large dataset, serialized saves space
df.persist(StorageLevel.MEMORY_ONLY_SER)
# Very large dataset, spill to disk
df.persist(StorageLevel.MEMORY_AND_DISK)
3. Monitor Cache Usage
# Check cache status
print(f"Cached: {df.is_cached}")
print(f"Storage level: {df.storageLevel}")
# Check storage info in Spark UI
# Storage tab shows cached RDDs/DataFrames
4. Unpersist When Done
# Always unpersist when DataFrame no longer needed
df.unpersist()
# Or use context manager pattern
with df:
result = df.filter(...).collect()
# df is automatically unpersisted
5. Avoid Cache Bloat
# Don't cache everything
df1.cache() # Cache this
df2 = expensive_computation(df1) # Don't cache intermediate
df3 = another_computation(df2) # Don't cache this either
# Cache only what's reused
result = df3.filter(...).collect()
6. Use Checkpointing for Long Lineages
# For very long lineages, checkpoint to truncate
sc.setCheckpointDir("hdfs:///checkpoint")
df.checkpoint()
# Checkpoint truncates lineage, reduces memory
See Also
- Kafka Streams (kafka/03): Caching strategies in stream processing state stores
- Data Engineering Streaming (data-engineering/022): Persistence patterns in streaming pipelines