πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Memory Management & Tuning

Apache SparkPerformance⭐ Premium

Advertisement

Memory Management & Tuning

Difficulty: Senior Level | Companies: Databricks, Netflix, Uber, Apple, Airbnb

Spark's Unified Memory Model

Spark 1.6+ introduced a Unified Memory Model that shares memory between execution and storage. Understanding this model is crucial for avoiding out-of-memory errors.

Memory Regions in an Executor

from pyspark.sql import SparkSession

# Configure memory regions explicitly
spark = SparkSession.builder \
    .appName("MemoryTuning") \
    .config("spark.executor.memory", "16g") \
    .config("spark.executor.memoryOverhead", "4g") \
    .config("spark.memory.fraction", "0.6") \
    .config("spark.memory.storageFraction", "0.5") \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size", "4g") \
    .getOrCreate()

# Memory fraction = 0.6 means 60% of heap (9.6g) for execution+storage
# Storage fraction = 0.5 means 50% of unified (4.8g) reserved for storage
# Execution can borrow from storage, but not vice versa

ℹ️

Interview Insight: The formula for usable memory is: usableMemory = executorMemory * spark.memory.fraction. Execution memory gets priority over storage memory when competing for resources.

Diagnosing Memory Issues

Reading the Spark UI Memory Tab

# Create a memory-intensive operation to inspect
df = spark.read.parquet("hdfs://data/large-dataset")

# This will show memory usage in the Spark UI Storage tab
cached_df = df.cache()  # Uses STORAGE memory
cached_df.count()  # Materializes the cache

# Check how much memory is being used
import psutil
import os

def get_executor_memory_usage():
    process = psutil.Process(os.getpid())
    mem_info = process.memory_info()
    return {
        "rss_mb": mem_info.rss / 1024 / 1024,
        "vms_mb": mem_info.vms / 1024 / 1024
    }

print(get_executor_memory_usage())

Identifying Memory-Intensive Operations

from pyspark.sql import functions as F

# Problem: Collecting large datasets to driver
# BAD: Can cause OOM on driver
# large_list = df.collect()  # DON'T DO THIS

# GOOD: Use take() or limit() for inspection
sample = df.take(10)
print(f"Schema: {df.schema}")

# Problem: Unnecessary caching
# BAD: Caching when not reusing
result = df.filter(F.col("status") == "active") \
    .groupBy("category") \
    .agg(F.sum("amount"))

# If 'result' is used only once, don't cache it

# GOOD: Cache only when reused multiple times
result.cache()
result.count()  # Materialize

# Use in multiple operations
result.filter(F.col("sum_amount") > 1000).show()
result.groupBy().agg(F.avg("sum_amount")).show()

# Unpersist when done
result.unpersist()

GC Tuning Strategies

Understanding GC Pressure

spark = SparkSession.builder \
    .appName("GCTuning") \
    .config("spark.executor.memory", "16g") \
    .config("spark.executor.extraJavaOptions", 
            "-XX:+UseG1GC "
            "-XX:G1HeapRegionSize=16m "
            "-XX:MaxGCPauseMillis=200 "
            "-XX:InitiatingHeapOccupancyPercent=35 "
            "-XX:ConcGCThreads=4 "
            "-verbose:gc "
            "-XX:+PrintGCDetails "
            "-XX:+PrintGCTimeStamps "
            "-Xloggc:/tmp/spark-gc.log") \
    .getOrCreate()

# Monitor GC activity
def monitor_gc(spark_context):
    # Access JVM metrics through Spark's internal API
    # In production, use Spark's metrics system
    gc_stats = spark_context._jsc.sc().getExecutorMemoryStatus()
    return gc_stats

Reducing Object Creation

# Problem: Frequent object creation in UDFs
from pyspark.sql.types import StringType
from pyspark.sql import functions as F
import hashlib

# BAD: Creates new strings and objects per row
def process_string_bad(s):
    if s is None:
        return None
    processed = s.strip().upper()
    hash_val = hashlib.md5(processed.encode()).hexdigest()
    return f"{processed}_{hash_val}"

# GOOD: Minimize object creation
def process_string_good(s):
    if s is None:
        return None
    # Reuse operations, minimize intermediate objects
    return s.strip().upper()

udf_bad = F.udf(process_string_bad, StringType())
udf_good = F.udf(process_string_good, StringType())

df = spark.read.parquet("hdfs://data/text-data")
df.withColumn("processed", udf_good(F.col("text"))).show(5)

Off-Heap Memory Management

# Off-heap memory avoids GC overhead
spark = SparkSession.builder \
    .appName("OffHeapMemory") \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size", "8g") \
    .config("spark.sql.inMemoryColumnarStorage.compressed", "true") \
    .config("spark.sql.inMemoryColumnarStorage.batchSize", "10000") \
    .getOrCreate()

# Columnar storage benefits from off-heap
df = spark.read.parquet("hdfs://data/analytics")

# Cache in off-heap memory
df.cache()  # Uses off-heap when enabled
df.count()

# Monitor off-heap usage via Spark UI
# Check "Storage" tab for cached data size

Storage Level Optimization

from pyspark import StorageLevel

# Different storage levels trade memory for CPU
# MEMORY_ONLY - Fastest access, uses heap memory
df.persist(StorageLevel.MEMORY_ONLY)

# MEMORY_AND_DISK - Spills to disk if memory insufficient
df.persist(StorageLevel.MEMORY_AND_DISK)

# MEMORY_ONLY_SER - Serialized, saves space but slower
df.persist(StorageLevel.MEMORY_ONLY_SER)

# MEMORY_AND_DISK_SER - Serialized with disk spill
df.persist(StorageLevel.MEMORY_AND_DISK_SER)

# OFF_HEAP - Uses off-heap memory, no GC
df.persist(StorageLevel.OFF_HEAP)

# Choose based on workload
# For analytical queries: MEMORY_AND_DISK (tolerates spills)
# For iterative ML: MEMORY_ONLY (fast repeated access)
# For large caches: OFF_HEAP (avoids GC)

⚠️

Warning: Serializing data with MEMORY_ONLY_SER saves ~50% memory but adds CPU overhead for deserialization. Profile before switching storage levels.

Broadcast Variable Memory Considerations

# Broadcast variables are replicated to all executors
# Large broadcasts can cause memory pressure

# BAD: Broadcasting a large table
large_lookup = spark.table("huge_reference_table").collect()
broadcast_large = sc.broadcast(large_lookup)  # Sent to ALL executors

# GOOD: Use broadcast join hint instead
from pyspark.sql.functions import broadcast
small_df = spark.table("small_reference_table")
large_df = spark.table("huge_fact_table")

# Spark handles the broadcast efficiently
result = large_df.join(broadcast(small_df), "key")

# Limit broadcast size
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10m")  # 10MB limit

Memory-Aware Partitioning

from pyspark.sql import functions as F

# Ensure partition sizes fit in executor memory
df = spark.read.parquet("hdfs://data/events")

# Calculate optimal partition count
# Rule of thumb: 200MB per partition for large datasets
estimated_size_gb = 50  # GB
optimal_partitions = int(estimated_size_gb * 1024 / 200)
print(f"Recommended partitions: {optimal_partitions}")

# Repartition to optimal count
df_optimized = df.repartition(optimal_partitions)

# For joins, repartition both sides by join key
left = df.repartition(200, "user_id")
right = spark.table("user_profiles").repartition(200, "user_id")
result = left.join(right, "user_id")

ℹ️

Key Takeaway: Memory tuning requires understanding your workload patterns. Cache aggressively for iterative workloads, use off-heap for large caches, and always monitor GC activity in production.

Follow-Up Questions

  • How does Spark's memory allocation differ between YARN and Kubernetes deployments?
  • Explain the eviction policy when execution memory exceeds the unified memory region.
  • What are the trade-offs between MEMORY_ONLY_SER and OFF_HEAP storage levels?
  • How would you diagnose a memory leak in a long-running Spark Streaming application?
  • Describe the impact of spark.sql.shuffle.partitions on executor memory usage.

Advertisement