π PySpark Serialization and Kryo
DfSerialization
Serialization is the process of converting an object's state into a byte stream for storage or network transfer. In Spark, serialization is critical for shuffling data between executors and for caching RDDs/DataFrames.
DfKryo Serialization
Kryo serialization is a fast, compact binary serialization library that is 10x faster and 5x more compact than Java serialization. It requires class registration and cannot handle all Java types (e.g., no support for ClosureCleaner).
Serialization Time Formula
Here,
- =Number of objects to serialize
- =Time to traverse the object graph
- =Time to encode each object to bytes
- =Buffer allocation and I/O time
Java serialization traverses the entire object graph and writes class descriptors for each object. Kryo uses pre-registered class IDs (1βbyte) instead of full class names, dramatically reducing both size and traversal time.
Always register classes with Kryo: conf.registerKryoClass([MyClass1, MyClass2]). Unregistered classes use class names (full path) which defeats Kryo's size advantage and can cause java.io.StreamCorruptedException.
ThKryo Serialization Efficiency
Theorem: Kryo achieves serialization speedup of β₯ 10x and size reduction of β₯ 5x compared to Java serialization for typical Spark workloads. The speedup comes from avoiding object graph traversal and using compact class ID encoding.
- Kryo: 10x faster, 5x smaller than Java serialization; requires class registration
- Java serialization: slower, larger, but handles all Java types
- Always set
spark.serializertoKryoSerializerfor production workloads - Register custom classes to avoid full class name serialization overhead
ποΈ Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SERIALIZATION ARCHITECTURE OVERVIEW β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β JAVA SERIALIZATION β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Object Graph β β β
β β β βββββββββββ βββββββββββ βββββββββββ β β β
β β β β Object A ββββ Object B ββββ Object C β β β β
β β β βββββββββββ βββββββββββ βββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Java Serialization Process β β β
β β β β β β
β β β 1. Traverse object graph β β β
β β β 2. Write class metadata (full class name) β β β
β β β 3. Write field values β β β
β β β 4. Recursively serialize referenced objects β β β
β β β β β β
β β β Output: β β β
β β β βββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Header: ac ed 00 05 β β β β
β β β β Class: 00 00 00 00 73 72 [full name] β β β β
β β β β Fields: [type][name][value]... β β β β
β β β β References: [handle][object]... β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β OVERHEAD: β β
β β β’ Full class name in every serialization β β
β β β’ Object header overhead β β
β β β’ Reference tracking β β
β β β’ Generous but verbose format β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β KRYO SERIALIZATION β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Object Graph β β β
β β β βββββββββββ βββββββββββ βββββββββββ β β β
β β β β Object A ββββ Object B ββββ Object C β β β β
β β β βββββββββββ βββββββββββ βββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Kryo Serialization Process β β β
β β β β β β
β β β 1. Register classes with IDs β β β
β β β 2. Write class ID (not full name) β β β
β β β 3. Write field values using serializers β β β
β β β 4. Use references for duplicate objects β β β
β β β β β β
β β β Output: β β β
β β β βββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Header: 00 β β β β
β β β β Class: 01 [registered ID] β β β β
β β β β Fields: [value]... (no names!) β β β β
β β β β References: [handle][object]... β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β ADVANTAGE: β β
β β β’ Compact class IDs instead of full names β β
β β β’ Smaller output size (2-10x smaller) β β
β β β’ Faster serialization/deserialization β β
β β β’ Lower memory usage β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β SERIALIZATION COMPARISON β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Metric β Java β Kryo β Ratio β β β
β β ββββββββββββββββββββΌββββββββββββΌβββββββββββΌββββββββ β β
β β β Size (1KB obj) β 1024 bytesβ 128 bytesβ 8x β β β
β β β Speed β 100 MB/s β 800 MB/s β 8x β β β
β β β Memory β 3x object β 1.5x β 2x β β β
β β β Compatibility β Universal β Required β - β β β
β β ββββββββββββββββββββ΄ββββββββββββ΄βββββββββββ΄ββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SHUFFLE SERIALIZATION FLOW β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β MAP TASK (Write Shuffle) β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β 1. Process data with transformations β β β
β β β βββββββββββββββββββββββββββββββββββββββ β β β
β β β β Input β Transform β Output β β β β
β β β βββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β 2. Partition output by key β β β
β β β βββββββ βββββββ βββββββ βββββββ β β β
β β β β P0 β β P1 β β P2 β β P3 β β β β
β β β βββββββ βββββββ βββββββ βββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β 3. Serialize partition data β β β
β β β βββββββββββββββββββββββββββββββββββββββ β β β
β β β β Object β [Serializer] β byte[] β β β β
β β β β β β β β
β β β β Java: [class_name][fields] β β β β
β β β β Kryo: [class_id][fields] β β β β
β β β βββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β 4. Write to shuffle files β β β
β β β βββββββββββββββββββββββββββββββββββββββ β β β
β β β β /tmp/blockmgr-xxx/shuffle_0_0.data β β β β
β β β β /tmp/blockmgr-xxx/shuffle_0_0.index β β β β
β β β βββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β REDUCE TASK (Read Shuffle) β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β 1. Fetch shuffle data from mappers β β β
β β β βββββββββββββββββββββββββββββββββββββββ β β β
β β β β Mapper 0 β byte[] β β β β
β β β β Mapper 1 β byte[] β β β β
β β β β Mapper 2 β byte[] β β β β
β β β βββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β 2. Deserialize data β β β
β β β βββββββββββββββββββββββββββββββββββββββ β β β
β β β β byte[] β [Deserializer] β Object β β β β
β β β β β β β β
β β β β Java: [class_name][fields] β β β β
β β β β Kryo: [class_id][fields] β β β β
β β β βββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β 3. Merge and process β β β
β β β βββββββββββββββββββββββββββββββββββββββ β β β
β β β β merge(data) β result β β β β
β β β βββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β IMPACT: Serialization is on the critical path for shuffle β
β Faster serialization = faster shuffles = faster jobs β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β KRYO CONFIGURATION DECISION TREE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Step 1: Choose Serializer β β
β β β β
β β Is performance critical? β β
β β β β β
β β ββ YES β Use Kryo β β
β β β conf.set("spark.serializer", β β
β β β "org.apache.spark.serializer. β β
β β β KryoSerializer") β β
β β β β β
β β ββ NO β Use Java (default) β β
β β conf.set("spark.serializer", β β
β β "org.apache.spark.serializer. β β
β β JavaSerializer") β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Step 2: Register Classes (if using Kryo) β β
β β β β
β β Do you have custom classes? β β
β β β β β
β β ββ YES β Register them β β
β β β conf.registerKryoClasses([ β β
β β β classOf[MyClass1], β β
β β β classOf[MyClass2] β β
β β β ]) β β
β β β β β
β β ββ NO β Use default serializers β β
β β Spark registers common classes β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Step 3: Configure Registration β β
β β β β
β β Do you want strict registration? β β
β β β β β
β β ββ YES β Enable registration required β β
β β β conf.set("spark.kryo.registrationRequired",β β
β β β "true") β β
β β β (Catches unregistered classes) β β
β β β β β
β β ββ NO β Allow unregistered classes β β
β β conf.set("spark.kryo.registrationRequired",β β
β β "false") β β
β β (Uses default serializers) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Step 4: Tune Buffer Sizes β β
β β β β
β β Need to optimize buffer sizes? β β
β β β β β
β β ββ YES β Configure buffers β β
β β β conf.set("spark.kryo.buffer.max", β β
β β β "64m") β β
β β β conf.set("spark.kryo.buffer", β β
β β β "64k") β β
β β β β β
β β ββ NO β Use defaults β β
β β (buffer: 64KB, max: 64MB) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Step 5: Test and Validate β β
β β β β
β β β‘ Measure serialization time β β
β β β‘ Measure output size β β
β β β‘ Check for ClassNotFoundException β β
β β β‘ Verify data integrity β β
β β β‘ Monitor GC behavior β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Detailed Explanation
1. Why Serialization Matters
Serialization is the process of converting objects into a byte format for storage or transmission. In Spark, serialization is critical for:
Shuffle Operations:
- Data must be serialized before network transfer
- Faster serialization = faster shuffles
- Smaller serialized size = less network I/O
Caching/Persistence:
- Cached DataFrames are serialized
- Serialization affects cache size and access speed
- Memory efficiency depends on serialization format
Data Transfer:
- Driver-executor communication
- Inter-executor communication
- External storage writes
2. Java Serialization
Java serialization is the default in Spark. It uses the built-in Serializable interface.
Characteristics:
- Universal compatibility
- Verbose output (includes class metadata)
- Slow compared to alternatives
- High memory overhead
When to Use:
- Simple applications
- When compatibility is more important than performance
- When using only built-in Spark classes
Configuration:
conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
3. Kryo Serialization
Kryo is a high-performance serialization library for Java. It's significantly faster and more compact than Java serialization.
Characteristics:
- 2-10x smaller output than Java
- 2-10x faster serialization/deserialization
- Requires class registration for best performance
- Not all Java classes supported out of the box
When to Use:
- Performance-critical applications
- Large datasets with many shuffle operations
- When memory efficiency is important
Configuration:
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
4. Kryo Registration
Class registration is the process of mapping classes to integer IDs for compact serialization.
Why Register:
- Without registration, Kryo writes full class name (like Java)
- With registration, Kryo writes integer ID (much smaller)
- Registration enables custom serializers for specific classes
How to Register:
# Register classes
conf.registerKryoClasses([MyClass1, MyClass2, MyClass3])
# Or use KryoRegistrator
conf.set("spark.kryo.registrator", "com.example.MyRegistrator")
Built-in Registrations: Spark registers many common classes automatically:
java.lang.Stringjava.util.ArrayListscala.collection.immutable.List- All Spark internal classes
5. Kryo Buffer Configuration
Kryo uses buffers for serialization/deserialization:
Buffer Settings:
spark.kryo.buffer: Initial buffer size (default: 64KB)spark.kryo.buffer.max: Maximum buffer size (default: 64MB)spark.kryo.compress: Enable compression (default: false)
When to Adjust:
- Large objects: Increase max buffer
- Many small objects: Decrease initial buffer
- Limited memory: Enable compression
6. Serialization in Shuffle
During shuffle, Spark serializes partition data:
Write Phase:
- Partition data by key
- Serialize each partition using configured serializer
- Write serialized bytes to shuffle files
Read Phase:
- Fetch shuffle files from mappers
- Deserialize bytes using configured serializer
- Merge and process data
Performance Impact:
- Serialization is on the critical path
- Faster serialization = faster shuffles
- Smaller output = less network I/O
7. Serialization in Caching
When caching DataFrames, serialization format matters:
MEMORY_ONLY:
- Deserialized (no serialization)
- Fastest access, highest memory
MEMORY_ONLY_SER:
- Serialized (Kryo or Java)
- Slower access, lower memory
MEMORY_AND_DISK:
- Memory: Deserialized
- Disk: Serialized
8. Custom Serializers
For complex classes, custom serializers can optimize serialization:
Implementing Custom Serializer:
from com.esotericsoftware.kryo import Serializer
from com.esotericsoftware.kryo.io import Input, Output
class MyClassSerializer(Serializer):
def write(self, kryo, output, obj):
output.writeInt(obj.id)
output.writeString(obj.name)
def read(self, kryo, input, cls):
obj = cls()
obj.id = input.readInt()
obj.name = input.readString()
return obj
Registering Custom Serializer:
conf.registerKryoClasses([MyClass])
kryo.register(MyClass, MyClassSerializer())
9. Common Serialization Issues
Issue 1: ClassNotFoundException
# Cause: Class not registered with Kryo
# Solution: Register the class
conf.registerKryoClasses([MyClass])
Issue 2: SerializationException
# Cause: Class not serializable
# Solution: Implement Serializable or use Kryo serializer
class MyClass(Serializable):
pass
Issue 3: Buffer overflow
# Cause: Object too large for buffer
# Solution: Increase buffer size
conf.set("spark.kryo.buffer.max", "128m")
10. Performance Tuning
Tuning Checklist:
- Enable Kryo serialization
- Register all custom classes
- Enable registration required for debugging
- Tune buffer sizes based on object sizes
- Consider compression for large datasets
- Monitor serialization metrics in Spark UI
π Key Concepts Table
| Aspect | Java Serialization | Kryo Serialization | Improvement |
|---|---|---|---|
| Output Size | 1024 bytes | 128 bytes | 8x smaller |
| Speed | 100 MB/s | 800 MB/s | 8x faster |
| Memory | 3x object | 1.5x object | 2x less |
| Compatibility | Universal | Requires registration | - |
| Compression | GZIP | LZ4, Snappy | 2-4x |
| Registration | Not needed | Recommended | - |
| Configuration | Default | spark.serializer | - |
π» Code Examples
Example 1: Basic Kryo Configuration
from pyspark import SparkConf, SparkContext
# Configure Kryo serialization
conf = SparkConf()
conf.setMaster("local[*]")
conf.setAppName("KryoExample")
# Enable Kryo serializer
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
# Register classes
from pyspark.sql.types import *
conf.registerKryoClasses([
StructType,
StructField,
StringType,
IntegerType,
DoubleType
])
# Create SparkContext
sc = SparkContext(conf=conf)
# Test serialization
data = [(i, f"item_{i}", i * 1.0) for i in range(100000)]
rdd = sc.parallelize(data, 10)
# Operations will use Kryo for serialization
result = rdd.map(lambda x: (x[0] % 10, x[2])).reduceByKey(lambda a, b: a + b)
print(result.collect())
Example 2: Kryo with Custom Classes
from pyspark import SparkConf, SparkContext
from dataclasses import dataclass
from typing import List
# Custom class
@dataclass
class Person:
id: int
name: str
age: int
scores: List[float]
# Configure Kryo
conf = SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrationRequired", "true")
# Register custom class
conf.registerKryoClasses([Person])
# Create SparkContext
sc = SparkContext(conf=conf)
# Create RDD of custom objects
persons = [
Person(i, f"person_{i}", 20 + i % 50, [float(i) * 0.1])
for i in range(100000)
]
rdd = sc.parallelize(persons, 10)
# Process with Kryo serialization
result = rdd.filter(lambda p: p.age > 30).map(lambda p: (p.age, 1)).reduceByKey(lambda a, b: a + b)
print(result.collect())
Example 3: Serialization Benchmark
import time
from pyspark import SparkConf, SparkContext
def benchmark_serializer(serializer_name, num_records=1000000):
conf = SparkConf()
conf.setMaster("local[*]")
conf.setAppName(f"Benchmark-{serializer_name}")
conf.set("spark.serializer", serializer_name)
sc = SparkContext(conf=conf)
# Create test data
data = [(i, f"item_{i % 1000}", float(i) * 0.1) for i in range(num_records)]
rdd = sc.parallelize(data, 20)
# Benchmark serialization (shuffle operation)
start = time.time()
result = rdd.map(lambda x: (x[0] % 100, x[2])).reduceByKey(lambda a, b: a + b)
result.count() # Trigger execution
elapsed = time.time() - start
sc.stop()
return elapsed
# Benchmark both serializers
java_time = benchmark_serializer("org.apache.spark.serializer.JavaSerializer")
kryo_time = benchmark_serializer("org.apache.spark.serializer.KryoSerializer")
print(f"Java Serialization: {java_time:.2f}s")
print(f"Kryo Serialization: {kryo_time:.2f}s")
print(f"Speedup: {java_time / kryo_time:.1f}x")
Example 4: Kryo with Compression
from pyspark import SparkConf, SparkContext
# Configure Kryo with compression
conf = SparkConf()
conf.setMaster("local[*]")
conf.setAppName("KryoCompression")
# Enable Kryo
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
# Enable compression
conf.set("spark.kryo.compress", "true")
conf.set("spark.kryo.compress.codec", "lz4") # lz4, snappy, or zlib
# Buffer configuration
conf.set("spark.kryo.buffer", "64k")
conf.set("spark.kryo.buffer.max", "64m")
sc = SparkContext(conf=conf)
# Test with large data
data = [(i, "x" * 1000) for i in range(100000)] # Large strings
rdd = sc.parallelize(data, 10)
# Shuffle with compression
result = rdd.map(lambda x: (x[0] % 10, x[1])).groupByKey()
result.count()
sc.stop()
π Performance Metrics
| Metric | Java | Kryo | Kryo+Compression | Improvement |
|---|---|---|---|---|
| Shuffle Size (1GB) | 1000MB | 150MB | 80MB | 12.5x |
| Shuffle Time | 45s | 12s | 15s | 3x |
| Cache Size (1GB) | 3000MB | 500MB | 250MB | 12x |
| GC Time | 500ms | 150ms | 100ms | 5x |
| Serialization Speed | 100MB/s | 800MB/s | 600MB/s | 6x |
| Deserialization Speed | 80MB/s | 700MB/s | 500MB/s | 6x |
| Memory Overhead | 3x | 1.5x | 1.2x | 2.5x |
| CPU Usage | High | Medium | Low | 2x |
β Best Practices
1. Enable Kryo for Performance
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
2. Register Custom Classes
conf.registerKryoClasses([MyClass1, MyClass2, MyClass3])
3. Enable Registration Required for Debugging
conf.set("spark.kryo.registrationRequired", "true")
# Catches unregistered classes early
4. Tune Buffer Sizes
# For large objects
conf.set("spark.kryo.buffer.max", "128m")
# For many small objects
conf.set("spark.kryo.buffer", "32k")
5. Use Compression for Large Datasets
conf.set("spark.kryo.compress", "true")
conf.set("spark.kryo.compress.codec", "lz4")
6. Monitor Serialization Metrics
# Check Spark UI for:
# - Shuffle Write Size
# - Shuffle Read Time
# - Serialization Time
# - GC Time
7. Test Serialization
# Verify classes are serializable
from pyspark.serializers import MarshalSerializer
try:
sc._jvm.org.apache.spark.serializer.JavaSerializer().newInstance().serialize(obj)
print("Object is serializable")
except Exception as e:
print(f"Serialization error: {e}")
See Also
- Kafka Streams (kafka/03): Serialization in Kafka message processing
- Data Engineering Streaming (data-engineering/022): Serialization optimization in streaming pipelines