18. Garbage Collection Tuning in PySpark
DfGarbage Collection (GC)
Garbage collection is the JVM's automatic memory management process that reclaims memory occupied by objects that are no longer referenced. Long GC pauses cause task delays and can lead to out-of-memory (OOM) errors.
DfG1 Garbage Collector
The G1 (Garbage-First) collector divides the heap into regions and prioritizes collecting regions with the most garbage. It provides predictable pause times via -XX:MaxGCPauseMillis and is recommended for Spark 2.3+ workloads.
Optimal Young Generation Size
Here,
- =Young generation size (NewGen)
- =Number of short-lived objects per task
- =Average object size in bytes
- =Survival ratio (fraction surviving to old gen)
Use G1GC for Spark 2.3+: -XX:+UseG1GC -XX:MaxGCPauseMillis=200. The default Parallel GC can cause long pauses (>1s) which trigger task timeouts and speculation.
Monitor GC via Spark UI β Executors β GC Time. If GC time exceeds 10% of task time, increase executor memory or reduce the number of cached objects. Use spark.executor.extraJavaOptions to tune GC parameters.
ThGC Pause Impact
Theorem: A GC pause of duration P affects all tasks running on the executor during that pause. If P exceeds spark.task.maxFailures Γ spark.task.timeout, the task will be retried, causing redundant computation. The total wasted work is P Γ N_{tasks\_affected}.
- G1GC recommended for Spark 2.3+; target GC overhead < 10%
- Young gen: short-lived objects; Old gen: long-lived (cached) objects
- Monitor GC time in Spark UI; tune via
spark.executor.extraJavaOptions - Off-heap storage (Tungsten) reduces GC pressure by moving data outside JVM heap
ποΈ Memory Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β PYSPARK MEMORY ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β EXECUTOR MEMORYεΈε± β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Total Executor Memory (spark.executor.memory) β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Reserved Memory (300MB) β β β β
β β β β β’ System overhead β β β β
β β β β β’ Internal structures β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β User Memory (40% of remaining) β β β β
β β β β β’ User data structures β β β β
β β β β β’ UDFs, accumulators β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Unified Memory (60% of remaining) β β β β
β β β β βββββββββββββββββββββββββββββββββββββββββββ β β β β
β β β β β Storage Memory β β β β β
β β β β β β’ Cached data β β β β β
β β β β β β’ Broadcast variables β β β β β
β β β β βββββββββββββββββββββββββββββββββββββββββββ β β β β
β β β β βββββββββββββββββββββββββββββββββββββββββββ β β β β
β β β β β Execution Memory β β β β β
β β β β β β’ Shuffles β β β β β
β β β β β β’ Joins β β β β β
β β β β β β’ Sorts β β β β β
β β β β βββββββββββββββββββββββββββββββββββββββββββ β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β MEMORY OVERHEAD β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Off-Heap Memory (spark.memory.offHeap.enabled) β β β
β β β β’ Native memory allocation β β β
β β β β’ Reduces GC pressure β β β
β β β β’ Better for large datasets β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Memory Overhead (spark.executor.memoryOverhead) β β β
β β β β’ Off-heap, native memory β β β
β β β β’ Default: max(384MB, 0.10 * executorMemory) β β β
β β β β’ For Py4J, Netty, and other native code β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Garbage Collection Process
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β GARBAGE COLLECTION PROCESS β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β GENERATIONAL GC MODEL β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Young Generation (Eden + Survivor Spaces) β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Eden Space β Survivor Spaces β β β β
β β β β (New objects) β (Surviving objects) β β β β
β β β β βββββββββββββββββ β βββββββββββββββββββββββ β β β β
β β β β β βββββββββββββ β β β ββββββββββββββββ β β β β β
β β β β β βββββββββββββ β β β ββββββββββββββββ β β β β β
β β β β β βββββββββββββ β β β ββββββββββββββββ β β β β β
β β β β βββββββββββββββββ β βββββββββββββββββββββββ β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β β Minor GC: Fast, frequent (ms) β β β
β β β β’ Scans only young generation β β β
β β β β’ Copies surviving objects to old gen β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Old Generation (Tenured Space) β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Long-lived objects β β β β
β β β β βββββββββββββββββββββββββββββββββββββββββββ β β β β
β β β β β βββββββββββββββββββββββββββββββββββ β β β β β
β β β β β βββββββββββββββββββββββββββββββββββ β β β β β
β β β β β βββββββββββββββββββββββββββββββββββ β β β β β
β β β β βββββββββββββββββββββββββββββββββββββββββββ β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β β Major GC: Slow, infrequent (seconds) β β β
β β β β’ Scans entire heap β β β
β β β β’ Can cause stop-the-world pauses β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β GC PAUSE TIMELINE β β
β β β β
β β Time ββββββββββββββββββββββββββββββββββββββββββββββββββββββΆ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Minor GC: β β β β β β β β β β β β β β β β
β β β (Short, frequent pauses) β β β
β β β β β β
β β β Major GC: βββββββββββββββββββββββββββββββββββββββββ β β β
β β β (Long, infrequent pauses) β β β
β β β β β β
β β β Target: Minimize major GC pauses β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π‘οΈ OOM Prevention Strategy
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β OOM PREVENTION STRATEGY β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β MEMORY PRESSURE INDICATORS β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Warning Signs: β β β
β β β β’ι’ηΉη Full GC (> 5% of time) β β β
β β β β’ GC pause times increasing β β β
β β β β’ Heap usage consistently > 80% β β β
β β β β’ OOM errors in logs β β β
β β β β’ Task failures due to memory β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β PREVENTION TECHNIQUES β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β 1. Memory Configuration β β β
β β β β’ Increase executor memory β β β
β β β β’ Enable off-heap memory β β β
β β β β’ Tune memory fractions β β β
β β β β β β
β β β 2. Data Skew Handling β β β
β β β β’ Detect and handle skewed partitions β β β
β β β β’ Use salting techniques β β β
β β β β’ Repartition data β β β
β β β β β β
β β β 3. GC Algorithm Selection β β β
β β β β’ Use G1GC for large heaps β β β
β β β β’ Configure GC threads β β β
β β β β’ Tune GC intervals β β β
β β β β β β
β β β 4. Code Optimization β β β
β β β β’ Reduce object creation β β β
β β β β’ Use primitive types β β β
β β β β’ Implement proper caching β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β MONITORING AND ALERTING β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Key Metrics: β β β
β β β β’ Heap usage (used vs max) β β β
β β β β’ GC count and time β β β
β β β β’ GC overhead percentage β β β
β β β β’ Memory allocation rate β β β
β β β β’ Object promotion rate β β β
β β β β β β
β β β Alert Thresholds: β β β
β β β β’ Heap usage > 80% for 5 minutes β β β
β β β β’ GC overhead > 10% β β β
β β β β’ Full GC count > 10 per hour β β β
β β β β’ GC pause time > 1 second β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Detailed Explanation
Garbage collection (GC) tuning is a critical aspect of PySpark performance optimization, directly impacting application responsiveness, throughput, and stability. Understanding the JVM memory model, GC algorithms, and their configuration options is essential for preventing out-of-memory (OOM) errors and minimizing GC-related performance degradation.
The JVM uses a generational garbage collection model, dividing the heap into young and old generations. The young generation contains newly created objects and is collected frequently by minor GC cycles. The old generation stores long-lived objects and is collected by major GC cycles, which are more expensive but less frequent. PySpark applications typically create many temporary objects during data processing, making GC behavior particularly important.
Memory configuration in PySpark involves several parameters: executor memory, memory overhead, memory fractions for execution and storage, and off-heap memory settings. The executor memory sets the total heap size, while memory overhead accounts for native memory usage by Py4J and Netty. Memory fractions control how the unified memory region is divided between execution and storage, impacting both performance and stability.
GC algorithm selection significantly affects performance. The default Parallel GC is optimized for throughput but can cause long pause times. The G1GC (Garbage-First Garbage Collector) is designed for large heaps and provides more predictable pause times, making it suitable for most PySpark workloads. The ZGC and Shenandoah GC offer ultra-low pause times but may have higher overhead.
OOM prevention requires a multi-faceted approach. Memory configuration should be tuned based on workload characteristics, with sufficient headroom for peak usage. Data skew handling techniques, such as salting and repartitioning, prevent memory imbalances across partitions. Code optimization reduces unnecessary object creation and memory consumption.
Monitoring GC behavior is essential for proactive tuning. Key metrics include heap usage, GC count and time, GC overhead percentage, and object promotion rates. These metrics should be tracked over time to identify trends and potential issues. Alerting thresholds should be set based on application requirements and historical behavior.
Advanced techniques include off-heap memory allocation, which reduces GC pressure by moving data outside the JVM heap, and memory-mapped files, which leverage the operating system's virtual memory management. These techniques can significantly improve performance for memory-intensive workloads.
Best practices for GC tuning include: starting with default configurations and tuning based on observed behavior, using appropriate GC algorithms for the workload, monitoring GC metrics continuously, implementing proper memory management in code, and planning for failure scenarios with sufficient memory reserves.
π Key Concepts Table
| Concept | Description | Configuration |
|---|---|---|
| Young Generation | Area for new objects | -XX:NewSize, -XX:MaxNewSize |
| Old Generation | Area for long-lived objects | -XX:MaxTenuringThreshold |
| GC Algorithm | Strategy for garbage collection | -XX:+UseG1GC, -XX:+UseParallelGC |
| GC Threads | Number of GC worker threads | -XX:ParallelGCThreads |
| Heap Size | Total memory allocation | -Xmx, -Xms |
| Off-Heap Memory | Memory outside JVM heap | spark.memory.offHeap.enabled |
π» Code Examples
Basic GC Configuration
from pyspark.sql import SparkSession
# Configure Spark with GC settings
spark = SparkSession.builder \
.appName("GCTuning") \
.config("spark.executor.memory", "16g") \
.config("spark.executor.memoryOverhead", "4g") \
.config("spark.driver.memory", "8g") \
.config("spark.driver.memoryOverhead", "2g") \
.config("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:MaxGCPauseMillis=200 -XX:InitiatingHeapOccupancyPercent=35") \
.config("spark.driver.extraJavaOptions", "-XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:MaxGCPauseMillis=200") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()
# Monitor GC activity
def monitor_gc():
"""Monitor garbage collection activity"""
import psutil
import time
# Get JVM process
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
if 'java' in proc.info['name'].lower():
print(f"JVM Process: {proc.info['pid']}")
print(f"Memory Usage: {proc.memory_info().rss / 1024 / 1024 / 1024:.2f} GB")
break
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
# Run test workload
def test_workload():
"""Test workload to generate GC activity"""
df = spark.range(10000000).repartition(100)
# Perform operations that generate garbage
result = df.withColumn("squared", col("id") * col("id")) \
.filter(col("id") % 2 == 0) \
.groupBy(col("id") % 10).count() \
.collect()
print(f"Test workload completed: {len(result)} groups")
# Execute test
monitor_gc()
test_workload()
spark.stop()
Advanced GC Tuning
from pyspark.sql import SparkSession
# Advanced GC configuration for large heaps
spark = SparkSession.builder \
.appName("AdvancedGCTuning") \
.config("spark.executor.memory", "32g") \
.config("spark.executor.memoryOverhead", "8g") \
.config("spark.executor.extraJavaOptions",
"-XX:+UseG1GC "
"-XX:G1HeapRegionSize=32m "
"-XX:MaxGCPauseMillis=300 "
"-XX:InitiatingHeapOccupancyPercent=30 "
"-XX:G1ReservePercent=15 "
"-XX:G1NewSizePercent=30 "
"-XX:G1MaxNewSizePercent=60 "
"-XX:+ParallelRefProcEnabled "
"-XX:ParallelGCThreads=8 "
"-XX:ConcGCThreads=4") \
.config("spark.driver.extraJavaOptions",
"-XX:+UseG1GC "
"-XX:G1HeapRegionSize=16m "
"-XX:MaxGCPauseMillis=200") \
.config("spark.memory.fraction", "0.8") \
.config("spark.memory.storageFraction", "0.3") \
.getOrCreate()
# Memory-efficient data processing
def process_large_dataset():
"""Process large dataset with memory optimization"""
# Read data
df = spark.read.parquet("/path/to/large/dataset")
# Optimize partitions to reduce memory pressure
df_optimized = df.repartition(200, "partition_key")
# Process with checkpointing to reduce lineage
df_optimized.checkpoint()
# Perform operations
result = df_optimized \
.withColumn("processed", col("value") * 2) \
.filter(col("processed") > 100) \
.groupBy("category") \
.agg(
sum("processed").alias("total"),
count("*").alias("count")
)
# Write results
result.write.mode("overwrite").parquet("/path/to/results")
print("Large dataset processing completed")
# Execute processing
process_large_dataset()
spark.stop()
Memory Monitoring and Alerting
from pyspark.sql import SparkSession
import time
import threading
spark = SparkSession.builder \
.appName("MemoryMonitoring") \
.getOrCreate()
class MemoryMonitor:
def __init__(self, warning_threshold=0.8, critical_threshold=0.9):
self.warning_threshold = warning_threshold
self.critical_threshold = critical_threshold
self.monitoring = False
self.thread = None
def start_monitoring(self, interval=5):
"""Start memory monitoring in background thread"""
self.monitoring = True
self.thread = threading.Thread(target=self._monitor_loop, args=(interval,))
self.thread.daemon = True
self.thread.start()
def stop_monitoring(self):
"""Stop memory monitoring"""
self.monitoring = False
if self.thread:
self.thread.join()
def _monitor_loop(self, interval):
"""Main monitoring loop"""
while self.monitoring:
self._check_memory()
time.sleep(interval)
def _check_memory(self):
"""Check memory usage and alert if needed"""
try:
# Get JVM memory info via Spark
spark_context = spark.sparkContext
jvm = spark_context._jsc
# Get memory pool information
memory_pools = jvm.status().getMemoryPools()
for pool in memory_pools:
pool_name = pool.getName()
used = pool.getMemoryUsed()
max_size = pool.getMaxSize()
if max_size > 0:
usage_ratio = used / max_size
if usage_ratio > self.critical_threshold:
print(f"CRITICAL: {pool_name} memory usage: {usage_ratio:.2%}")
elif usage_ratio > self.warning_threshold:
print(f"WARNING: {pool_name} memory usage: {usage_ratio:.2%}")
# Get GC information
gc_pools = jvm.status().getGarbageCollectorPools()
for gc_pool in gc_pools:
gc_count = gc_pool.getCollectionCount()
gc_time = gc_pool.getCollectionTime()
print(f"GC Pool: {gc_pool.getName()}, Count: {gc_count}, Time: {gc_time}ms")
except Exception as e:
print(f"Error monitoring memory: {e}")
# Create and start monitor
monitor = MemoryMonitor(warning_threshold=0.7, critical_threshold=0.85)
monitor.start_monitoring(interval=10)
# Run workload
def run_workload():
"""Run workload to test monitoring"""
df = spark.range(10000000).repartition(100)
result = df.groupBy(col("id") % 100).count().collect()
print(f"Workload completed: {len(result)} groups")
run_workload()
# Stop monitor
monitor.stop_monitoring()
spark.stop()
Object Pool for Reduced GC Pressure
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import threading
spark = SparkSession.builder \
.appName("ObjectPool") \
.getOrCreate()
class ObjectPool:
"""Thread-safe object pool to reduce GC pressure"""
def __init__(self, create_func, max_size=100):
self.create_func = create_func
self.max_size = max_size
self.pool = []
self.lock = threading.Lock()
self.stats = {"created": 0, "reused": 0, "discarded": 0}
def acquire(self):
"""Acquire object from pool"""
with self.lock:
if self.pool:
self.stats["reused"] += 1
return self.pool.pop()
else:
self.stats["created"] += 1
return self.create_func()
def release(self, obj):
"""Release object back to pool"""
with self.lock:
if len(self.pool) < self.max_size:
self.pool.append(obj)
else:
self.stats["discarded"] += 1
def get_stats(self):
"""Get pool statistics"""
with self.lock:
return self.stats.copy()
# Create object pool for reusable objects
def create_reusable_object():
"""Create a reusable object"""
return {"data": [], "metadata": {}}
object_pool = ObjectPool(create_reusable_object, max_size=50)
# Process data with object pooling
def process_with_pooling(df):
"""Process data using object pooling"""
def process_partition(iterator):
"""Process partition with object reuse"""
obj = object_pool.acquire()
try:
for row in iterator:
# Reuse object for each row
obj["data"].clear()
obj["data"].append(row.id)
obj["metadata"]["processed"] = True
# Process data
yield (row.id, obj["data"][0] * 2)
finally:
object_pool.release(obj)
# Apply partition processing
result_rdd = df.rdd.mapPartitions(process_partition)
return spark.createDataFrame(result_rdd, ["id", "processed_value"])
# Test object pooling
df = spark.range(1000000)
result = process_with_pooling(df)
# Get pool statistics
stats = object_pool.get_stats()
print(f"Object Pool Statistics:")
print(f" Created: {stats['created']}")
print(f" Reused: {stats['reused']}")
print(f" Discarded: {stats['discarded']}")
print(f" Reuse Rate: {stats['reused'] / (stats['created'] + stats['reused']):.2%}")
spark.stop()
π Performance Metrics
| Metric | Target | Warning | Critical | Optimization |
|---|---|---|---|---|
| GC Overhead | < 5% | 5-10% | > 10% | Increase memory, tune GC |
| GC Pause Time | < 100ms | 100-500ms | > 500ms | Use G1GC, tune region size |
| Full GC Count | < 1/hour | 1-5/hour | > 5/hour | Increase heap, reduce allocation |
| Heap Usage | < 70% | 70-85% | > 85% | Increase memory, optimize code |
| Object Promotion Rate | < 10% | 10-20% | > 20% | Tune tenuring threshold |
π Best Practices
- Start with default GC settings - Only tune after observing performance issues
- Use G1GC for large heaps - Better pause time predictability than Parallel GC
- Monitor GC metrics continuously - Track heap usage, GC count, and pause times
- Tune memory fractions carefully - Balance between execution and storage memory
- Handle data skew proactively - Prevent memory imbalances across partitions
- Reduce object creation - Use object pools, primitive types, and efficient data structures
- Enable off-heap memory - Reduce GC pressure for large datasets
- Plan for peak loads - Ensure sufficient memory for worst-case scenarios
- Test with realistic workloads - Validate GC settings with production-like data
- Document GC configurations - Maintain clear documentation of settings and rationale
π Related Topics
- 17-cluster-management.mdx: Cluster resource allocation and management
- 19-spark-submit.mdx: Deployment configurations and parameters
- 20-monitoring-metrics.mdx: Monitoring GC and memory metrics
- 12-state-management.mdx: State management and memory usage
See Also
- Kafka Streams (kafka/03): JVM memory management in Kafka Streams
- Data Engineering Streaming (data-engineering/022): Memory tuning for streaming applications