πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Spark Memory: Execution, Storage, Off-Heap, Unified Memory

Apache SparkMemory Management⭐ Premium

Advertisement

Spark Memory: Execution, Storage, Off-Heap, Unified Memory

Difficulty: Expert | Companies: Netflix, LinkedIn, Twitter, Apple, Amazon

ℹ️Interview Context

Memory management is critical for Spark performance tuning. Interviewers expect understanding of the unified memory model, spill mechanics, and how to diagnose memory-related issues from Spark UI.

Question

Explain Spark's unified memory model in detail. How do execution memory and storage memory interact? What happens when execution needs more memory than allocated? Describe the complete memory layout of a Spark executor with both on-heap and off-heap regions.


Detailed Answer

1. Spark Executor Memory Layout

Executor JVM Heap (e.g., 8 GB)Reserved Memory (300 MB) β€” fixed, not configurableUsed by Spark internals (metadata, accumulator state)User Memory (40% of usable) β€” for user data structuresUDF objects Β· Accumulators Β· User-defined data structuresSize = (heap βˆ’ 300MB) Γ— (1 βˆ’ spark.memory.fraction)Unified Memory Region (60% of usable) β€” managed by SparkExecution Memoryβ€’ Shuffle buffersβ€’ Join hash tablesβ€’ Sort buffersβ€’ Aggregation buffersStorage Memoryβ€’ Cached RDD/DataFrame partitionsβ€’ Broadcast variablesβ€’ Unroll memoryβ€’ Uncompressed cache storage

2. Unified Memory Model β€” Mathematical Analysis

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("MemoryManagement") \
    .config("spark.executor.memory", "8g") \
    .config("spark.memory.fraction", "0.6") \
    .config("spark.memory.storageFraction", "0.5") \
    .getOrCreate()

# Memory calculation example:
executor_heap = 8 * 1024  # 8 GB in MB
reserved_memory = 300  # MB
usable_memory = executor_heap - reserved_memory  # 7,744 MB
unified_memory = usable_memory * 0.6  # 4,646 MB
user_memory = usable_memory * 0.4  # 3,098 MB
storage_memory = unified_memory * 0.5  # 2,323 MB
execution_memory = unified_memory * 0.5  # 2,323 MB

print(f"Executor Heap: {executor_heap} MB")
print(f"Reserved: {reserved_memory} MB")
print(f"User Memory: {user_memory} MB")
print(f"Unified Memory: {unified_memory} MB")
print(f"  Storage: {storage_memory} MB")
print(f"  Execution: {execution_memory} MB")

3. Memory Borrowing Rules

The key innovation of unified memory is bidirectional borrowing:

# Rule 1: Execution can borrow from Storage
# If execution needs more memory than its partition,
# it can evict cached data to make room.

# Rule 2: Storage CANNOT borrow from Execution
# Once execution memory is allocated for an operation,
# storage cannot evict it.

# Rule 3: Execution blocks are harder to evict than storage blocks
# Execution: must complete operation before releasing memory
# Storage: can be dropped (LRU eviction) without recomputation

# The eviction policy:
# Storage can use up to: min(unified_memory, storage_memory + execution_memory_free)
# If execution needs more: evict storage blocks (LRU order)
# If storage needs more: cannot evict execution blocks
# Mathematical model for memory allocation:
# 
# Let:
# U = unified memory total (e.g., 4,646 MB)
# S = storage memory threshold (U Γ— storageFraction)
# E = execution memory threshold (U Γ— (1 - storageFraction))
# s = current storage usage
# e = current execution usage
#
# Available for storage: min(S, U - e)
# Available for execution: min(E, U - s)
#
# If execution needs X bytes:
#   If X ≀ (E - e): allocate from execution pool
#   If X > (E - e): evict storage to make room
#     Evict: min(s, X - (E - e)) bytes from storage
#     New storage: max(0, s - (X - (E - e)))

4. Memory Allocation Strategies

# Strategy 1: Dynamic Allocation (default)
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "2")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "100")
spark.conf.set("spark.dynamicAllocation.executorIdleTimeout", "60s")
spark.conf.set("spark.dynamicAllocation.schedulerBacklogTimeout", "1s")

# Dynamic allocation adjusts executor count based on workload
# Memory per executor scales with executor count

# Strategy 2: Static Allocation
spark.conf.set("spark.dynamicAllocation.enabled", "false")
spark.conf.set("spark.executor.instances", "50")
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.executor.cores", "4")

# Strategy 3: Off-Heap for Large Datasets
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "16g")

# Off-heap advantages:
# 1. No GC pressure β€” memory managed by Spark, not JVM
# 2. Predictable performance β€” no GC pauses
# 3. Larger memory capacity β€” not limited by JVM heap

5. GC Tuning for Spark Executors

# Garbage Collection is the #1 cause of performance issues
# in Spark applications with large heaps

# G1GC Configuration (recommended for heaps > 4GB):
spark.conf.set("spark.executor.extraJavaOptions", 
    "-XX:+UseG1GC "
    "-XX:G1HeapRegionSize=16m "
    "-XX:InitiatingHeapOccupancyPercent=35 "
    "-XX:ConcGCThreads=4 "
    "-XX:+ParallelRefProcEnabled "
    "-XX:MaxGCPauseMillis=200 "
    "-XX:+UnlockDiagnosticVMOptions "
    "-XX:+G1SummarizeConcMark "
    "-XX:G1MixedGCCountTarget=8 "
    "-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps")

<div className="my-6 flex justify-center">
<svg viewBox="0 0 800 100" width="100%" style={{ maxWidth: 700 }} xmlns="http://www.w3.org/2000/svg">
  <defs>
    <linearGradient id="mm-g1-eden" x1="0" y1="0" x2="0" y2="1">
      <stop offset="0%" stopColor="#22c55e"/>
      <stop offset="100%" stopColor="#16a34a"/>
    </linearGradient>
    <linearGradient id="mm-g1-surr" x1="0" y1="0" x2="0" y2="1">
      <stop offset="0%" stopColor="#3b82f6"/>
      <stop offset="100%" stopColor="#2563eb"/>
    </linearGradient>
    <linearGradient id="mm-g1-old" x1="0" y1="0" x2="0" y2="1">
      <stop offset="0%" stopColor="#8b5cf6"/>
      <stop offset="100%" stopColor="#7c3aed"/>
    </linearGradient>
    <filter id="mm-g1-shadow">
      <feDropShadow dx="0" dy="2" stdDeviation="3" floodOpacity="0.15"/>
    </filter>
  </defs>
  <rect x="10" y="10" width="780" height="80" rx="14" fill="#f8fafc" filter="url(#mm-g1-shadow)" stroke="#cbd5e1" strokeWidth="1"/>
  <text x="400" y="32" textAnchor="middle" fill="#334155" fontFamily="Inter,system-ui,sans-serif" fontSize="12" fontWeight="700">G1 Heap</text>
  <rect x="30" y="42" width="180" height="40" rx="10" fill="url(#mm-g1-eden)" filter="url(#mm-g1-shadow)"/>
  <text x="120" y="67" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui,sans-serif" fontSize="11" fontWeight="600">Region 0 (Eden)</text>
  <rect x="220" y="42" width="180" height="40" rx="10" fill="url(#mm-g1-surr)" filter="url(#mm-g1-shadow)"/>
  <text x="310" y="67" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui,sans-serif" fontSize="11" fontWeight="600">Region 1 (Survivor)</text>
  <rect x="410" y="42" width="180" height="40" rx="10" fill="url(#mm-g1-old)" filter="url(#mm-g1-shadow)"/>
  <text x="500" y="67" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui,sans-serif" fontSize="11" fontWeight="600">Region 2 (Old)</text>
  <text x="650" y="67" textAnchor="middle" fill="#6b7280" fontFamily="Inter,system-ui,sans-serif" fontSize="14" fontWeight="700">...</text>
</svg>
</div>
# Each region: 1MB - 32MB (G1HeapRegionSize)

# GC metrics to monitor:
# - Full GC count: should be 0 (if > 0, increase heap)
# - Young GC count: < 10 per task is good
# - GC pause time: < 200ms is acceptable
# - Total GC time: < 5% of task time is healthy

# Diagnostic commands:
# spark.executor.extraJavaOptions += "-XX:+PrintAdaptiveSizePolicy"
# Shows memory allocation decisions

6. Memory Spill Analysis

# Memory spill occurs when execution memory exceeds allocation
# Data is sorted and written to disk

# Spill metrics (available in Spark UI β†’ Stages tab):
# - Shuffle spill (memory): bytes evicted from memory to disk
# - Shuffle spill (disk): bytes actually written to disk
# - Memory vs disk ratio indicates memory pressure

# To minimize spills:
# 1. Increase executor memory
# 2. Reduce number of concurrent tasks per executor
# 3. Optimize data structures (use primitive types)
# 4. Use Kryo serialization for cached data

# Spill detection in code:
import psutil
import os

def get_executor_memory_usage():
    """Monitor memory usage during task execution."""
    process = psutil.Process(os.getpid())
    mem_info = process.memory_info()
    return {
        'rss_mb': mem_info.rss / 1024 / 1024,
        'vms_mb': mem_info.vms / 1024 / 1024
    }

# In executor:
# memory_before = get_executor_memory_usage()
# ... perform operation ...
# memory_after = get_executor_memory_usage()
# memory_delta = memory_after['rss_mb'] - memory_before['rss_mb']

7. Off-Heap Memory Management

# Off-heap memory bypasses JVM heap entirely
# Managed by Spark's MemoryManager, not garbage collector

# Off-heap allocation:
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "8g")

<div className="my-6 flex justify-center">
<svg viewBox="0 0 800 100" width="100%" style={{ maxWidth: 650 }} xmlns="http://www.w3.org/2000/svg">
  <defs>
    <linearGradient id="mm-off-exec" x1="0" y1="0" x2="0" y2="1">
      <stop offset="0%" stopColor="#3b82f6"/>
      <stop offset="100%" stopColor="#2563eb"/>
    </linearGradient>
    <linearGradient id="mm-off-store" x1="0" y1="0" x2="0" y2="1">
      <stop offset="0%" stopColor="#10b981"/>
      <stop offset="100%" stopColor="#059669"/>
    </linearGradient>
    <filter id="mm-off-shadow">
      <feDropShadow dx="0" dy="2" stdDeviation="3" floodOpacity="0.15"/>
    </filter>
  </defs>
  <rect x="10" y="10" width="780" height="80" rx="14" fill="#f8fafc" filter="url(#mm-off-shadow)" stroke="#cbd5e1" strokeWidth="1"/>
  <text x="400" y="32" textAnchor="middle" fill="#334155" fontFamily="Inter,system-ui,sans-serif" fontSize="12" fontWeight="700">Off-Heap Region (8 GB)</text>
  <rect x="30" y="45" width="370" height="38" rx="10" fill="url(#mm-off-exec)" filter="url(#mm-off-shadow)"/>
  <text x="215" y="69" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui,sans-serif" fontSize="11" fontWeight="600">Execution (4.8 GB)</text>
  <rect x="410" y="45" width="370" height="38" rx="10" fill="url(#mm-off-store)" filter="url(#mm-off-shadow)"/>
  <text x="595" y="69" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui,sans-serif" fontSize="11" fontWeight="600">Storage (3.2 GB)</text>
</svg>
</div>

# Advantages over on-heap:
# 1. No GC overhead β€” memory freed manually by Spark
# 2. No object headers β€” UnsafeRow is compact
# 3. Predictable latency β€” no GC pauses
# 4. Larger capacity β€” not limited by JVM heap size

# Disadvantages:
# 1. Memory leaks are harder to detect
# 2. No JVM safety β€” buffer overflow possible
# 3. Debugging more complex β€” standard JVM tools don't work
# 4. Requires careful configuration

# Verify off-heap usage:
spark.catalog.clearCache()
df.cache()  # Caches in storage memory (on-heap or off-heap)
df.count()  # Materializes cache

# Check via Spark UI β†’ Executors tab β†’ Storage Memory
# Or programmatically:
storage_info = spark._jsc.sc().getExecutorStorageStatus()
for status in storage_info:
    print(f"Executor {status.executorId()}: "
          f"Used={status.memUsed() / 1024 / 1024:.1f}MB, "
          f"Max={status.maxMem() / 1024 / 1024:.1f}MB")

8. Memory Diagnostics

# Enable memory debugging:
spark.conf.set("spark.unsafe.exceptionOnMemoryLeak", "true")
spark.conf.set("spark.memory.debugFill", "true")

# Memory leak detection:
# Spark tracks all memory allocations via MemoryManager
# If allocations exceed deallocations β†’ memory leak
# Exception thrown: "Memory leak detected in task executor"

# Common memory leak causes:
# 1. Accumulators not being reset between stages
# 2. Broadcast variables not being unpersisted
# 3. Custom AccumulatorV2 implementations
# 4. ThreadLocal variables in executors

# Memory profiling:
spark.conf.set("spark.executor.memoryOverhead", "1g")
# memoryOverhead is for off-heap JVM overhead (stack, code cache, etc.)

# Total executor memory requirement:
# Total = spark.executor.memory + spark.executor.memoryOverhead
# For 8GB heap + 1GB overhead = 9GB container
# On YARN: must allocate 9GB container
# On K8s: pod memory limit should be 9GB

⚠️Common Pitfall

A frequent mistake is setting spark.executor.memory without accounting for spark.memory.offHeap.size. If both are enabled, the total container memory must accommodate both. For 8GB on-heap + 8GB off-heap, you need a 16GB+ container.

πŸ’‘Interview Tip

When discussing memory management, mention that Spark 3.x introduced improved memory accounting for complex types (arrays, maps, structs) which previously caused underestimation of memory usage.


Summary

Memory RegionPurposeConfigurable?Borrowable?
ReservedSpark internalsNo (300MB fixed)No
UserUser data structuresYes (via memory.fraction)No
ExecutionSort/join/agg buffersYes (via memory.fraction)Yes (from storage)
StorageCached dataYes (via storageFraction)No (cannot borrow)
Off-HeapTungsten operationsYes (via offHeap.size)Bidirectional

The unified memory model provides flexible memory management while preventing OOM errors through intelligent eviction and borrowing policies.

Advertisement