Memory Management & Tuning
Difficulty: Senior Level | Companies: Databricks, Netflix, Uber, Apple, Airbnb
Spark's Unified Memory Model
Spark 1.6+ introduced a Unified Memory Model that shares memory between execution and storage. Understanding this model is crucial for avoiding out-of-memory errors.
Memory Regions in an Executor
from pyspark.sql import SparkSession
# Configure memory regions explicitly
spark = SparkSession.builder \
.appName("MemoryTuning") \
.config("spark.executor.memory", "16g") \
.config("spark.executor.memoryOverhead", "4g") \
.config("spark.memory.fraction", "0.6") \
.config("spark.memory.storageFraction", "0.5") \
.config("spark.memory.offHeap.enabled", "true") \
.config("spark.memory.offHeap.size", "4g") \
.getOrCreate()
# Memory fraction = 0.6 means 60% of heap (9.6g) for execution+storage
# Storage fraction = 0.5 means 50% of unified (4.8g) reserved for storage
# Execution can borrow from storage, but not vice versa
βΉοΈ
Interview Insight: The formula for usable memory is: usableMemory = executorMemory * spark.memory.fraction. Execution memory gets priority over storage memory when competing for resources.
Diagnosing Memory Issues
Reading the Spark UI Memory Tab
# Create a memory-intensive operation to inspect
df = spark.read.parquet("hdfs://data/large-dataset")
# This will show memory usage in the Spark UI Storage tab
cached_df = df.cache() # Uses STORAGE memory
cached_df.count() # Materializes the cache
# Check how much memory is being used
import psutil
import os
def get_executor_memory_usage():
process = psutil.Process(os.getpid())
mem_info = process.memory_info()
return {
"rss_mb": mem_info.rss / 1024 / 1024,
"vms_mb": mem_info.vms / 1024 / 1024
}
print(get_executor_memory_usage())
Identifying Memory-Intensive Operations
from pyspark.sql import functions as F
# Problem: Collecting large datasets to driver
# BAD: Can cause OOM on driver
# large_list = df.collect() # DON'T DO THIS
# GOOD: Use take() or limit() for inspection
sample = df.take(10)
print(f"Schema: {df.schema}")
# Problem: Unnecessary caching
# BAD: Caching when not reusing
result = df.filter(F.col("status") == "active") \
.groupBy("category") \
.agg(F.sum("amount"))
# If 'result' is used only once, don't cache it
# GOOD: Cache only when reused multiple times
result.cache()
result.count() # Materialize
# Use in multiple operations
result.filter(F.col("sum_amount") > 1000).show()
result.groupBy().agg(F.avg("sum_amount")).show()
# Unpersist when done
result.unpersist()
GC Tuning Strategies
Understanding GC Pressure
spark = SparkSession.builder \
.appName("GCTuning") \
.config("spark.executor.memory", "16g") \
.config("spark.executor.extraJavaOptions",
"-XX:+UseG1GC "
"-XX:G1HeapRegionSize=16m "
"-XX:MaxGCPauseMillis=200 "
"-XX:InitiatingHeapOccupancyPercent=35 "
"-XX:ConcGCThreads=4 "
"-verbose:gc "
"-XX:+PrintGCDetails "
"-XX:+PrintGCTimeStamps "
"-Xloggc:/tmp/spark-gc.log") \
.getOrCreate()
# Monitor GC activity
def monitor_gc(spark_context):
# Access JVM metrics through Spark's internal API
# In production, use Spark's metrics system
gc_stats = spark_context._jsc.sc().getExecutorMemoryStatus()
return gc_stats
Reducing Object Creation
# Problem: Frequent object creation in UDFs
from pyspark.sql.types import StringType
from pyspark.sql import functions as F
import hashlib
# BAD: Creates new strings and objects per row
def process_string_bad(s):
if s is None:
return None
processed = s.strip().upper()
hash_val = hashlib.md5(processed.encode()).hexdigest()
return f"{processed}_{hash_val}"
# GOOD: Minimize object creation
def process_string_good(s):
if s is None:
return None
# Reuse operations, minimize intermediate objects
return s.strip().upper()
udf_bad = F.udf(process_string_bad, StringType())
udf_good = F.udf(process_string_good, StringType())
df = spark.read.parquet("hdfs://data/text-data")
df.withColumn("processed", udf_good(F.col("text"))).show(5)
Off-Heap Memory Management
# Off-heap memory avoids GC overhead
spark = SparkSession.builder \
.appName("OffHeapMemory") \
.config("spark.memory.offHeap.enabled", "true") \
.config("spark.memory.offHeap.size", "8g") \
.config("spark.sql.inMemoryColumnarStorage.compressed", "true") \
.config("spark.sql.inMemoryColumnarStorage.batchSize", "10000") \
.getOrCreate()
# Columnar storage benefits from off-heap
df = spark.read.parquet("hdfs://data/analytics")
# Cache in off-heap memory
df.cache() # Uses off-heap when enabled
df.count()
# Monitor off-heap usage via Spark UI
# Check "Storage" tab for cached data size
Storage Level Optimization
from pyspark import StorageLevel
# Different storage levels trade memory for CPU
# MEMORY_ONLY - Fastest access, uses heap memory
df.persist(StorageLevel.MEMORY_ONLY)
# MEMORY_AND_DISK - Spills to disk if memory insufficient
df.persist(StorageLevel.MEMORY_AND_DISK)
# MEMORY_ONLY_SER - Serialized, saves space but slower
df.persist(StorageLevel.MEMORY_ONLY_SER)
# MEMORY_AND_DISK_SER - Serialized with disk spill
df.persist(StorageLevel.MEMORY_AND_DISK_SER)
# OFF_HEAP - Uses off-heap memory, no GC
df.persist(StorageLevel.OFF_HEAP)
# Choose based on workload
# For analytical queries: MEMORY_AND_DISK (tolerates spills)
# For iterative ML: MEMORY_ONLY (fast repeated access)
# For large caches: OFF_HEAP (avoids GC)
β οΈ
Warning: Serializing data with MEMORY_ONLY_SER saves ~50% memory but adds CPU overhead for deserialization. Profile before switching storage levels.
Broadcast Variable Memory Considerations
# Broadcast variables are replicated to all executors
# Large broadcasts can cause memory pressure
# BAD: Broadcasting a large table
large_lookup = spark.table("huge_reference_table").collect()
broadcast_large = sc.broadcast(large_lookup) # Sent to ALL executors
# GOOD: Use broadcast join hint instead
from pyspark.sql.functions import broadcast
small_df = spark.table("small_reference_table")
large_df = spark.table("huge_fact_table")
# Spark handles the broadcast efficiently
result = large_df.join(broadcast(small_df), "key")
# Limit broadcast size
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10m") # 10MB limit
Memory-Aware Partitioning
from pyspark.sql import functions as F
# Ensure partition sizes fit in executor memory
df = spark.read.parquet("hdfs://data/events")
# Calculate optimal partition count
# Rule of thumb: 200MB per partition for large datasets
estimated_size_gb = 50 # GB
optimal_partitions = int(estimated_size_gb * 1024 / 200)
print(f"Recommended partitions: {optimal_partitions}")
# Repartition to optimal count
df_optimized = df.repartition(optimal_partitions)
# For joins, repartition both sides by join key
left = df.repartition(200, "user_id")
right = spark.table("user_profiles").repartition(200, "user_id")
result = left.join(right, "user_id")
βΉοΈ
Key Takeaway: Memory tuning requires understanding your workload patterns. Cache aggressively for iterative workloads, use off-heap for large caches, and always monitor GC activity in production.
Follow-Up Questions
- How does Spark's memory allocation differ between YARN and Kubernetes deployments?
- Explain the eviction policy when execution memory exceeds the unified memory region.
- What are the trade-offs between MEMORY_ONLY_SER and OFF_HEAP storage levels?
- How would you diagnose a memory leak in a long-running Spark Streaming application?
- Describe the impact of spark.sql.shuffle.partitions on executor memory usage.