🔄 PySpark RDD Fundamentals
DfResilient Distributed Dataset (RDD)
An RDD is an immutable, partitioned collection of elements that can be operated on in parallel. Each RDD is defined by five properties: a list of partitions, a function to compute each split, a list of dependencies on parent RDDs, an optional partitioner (for key-value RDDs), and an optional list of preferred locations for each split.
DfPartition
A partition is a logical chunk of data stored on a single node. The number of partitions determines parallelism — each partition is processed by one task on one executor core.
DfLineage
Lineage is the complete record of transformations used to build an RDD. It is stored as a DAG and enables fault tolerance by allowing Spark to recompute only the lost partitions without data replication.
Recovery Cost (Lineage Recomputation)
Here,
- =Total cost to recompute lost partitions
- =Cost of transformation T_i in the lineage
- =Data size processed at step i
- =Number of transformations in the lineage path
Narrow transformations (map, filter, flatMap) have 1:1 parent-child partition mapping and can be pipelined without shuffle. Wide transformations (groupByKey, reduceByKey, join) have M:N mapping and require a shuffle barrier — they cannot be pipelined.
The optimal partition size is 128MB–200MB. Too few partitions cause underutilization of cores; too many cause excessive task scheduling overhead. Use repartition() to increase partitions or coalesce() to decrease without full shuffle.
ThFault Tolerance via Lineage
Theorem: Any lost partition of an RDD can be recomputed from its lineage in at most O(L × D) time, where L is the lineage depth (number of transformations) and D is the data size at that partition. This guarantees correctness without data replication, unlike systems like HDFS which use 3× replication.
- RDDs are immutable, partitioned, distributed collections with lineage-based fault tolerance
- Narrow transformations can be pipelined; wide transformations require shuffle barriers
- Optimal partition size is 128MB–200MB; partition count = max(dataSize/partitionSize, totalCores)
- Recovery cost is proportional to lineage depth × data size per partition
- Use
persist()for reuse; usecoalesce()to reduce partitions without full shuffle
🏗️ Architecture Diagram
┌─────────────────────────────────────────────────────────────────┐
│ RDD ARCHITECTURE OVERVIEW │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Partition │ │ Partition │ │ Partition │ │ Partition │ │
│ │ 0 │ │ 1 │ │ 2 │ │ 3 │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Resilient Distributed Dataset │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Block 0 │ │ Block 1 │ │ Block 2 │ │ Block 3 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Lineage Graph │ │
│ │ RDD_0 ──► RDD_1 ──► RDD_2 ──► RDD_3 (checkpoint) │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ RDD TRANSFORMATION DAG (Directed Acyclic Graph) │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Input RDD ──► map() ──► filter() ──► flatMap() ──► RDD_4 │
│ │ │ │
│ │ ┌────────────────────────────┘ │
│ │ ▼ │
│ │ RDD_4 ──► reduceByKey() ──► RDD_5 (Action) │
│ │ │ │
│ ▼ ▼ │
│ sc.textFile() collect() │
│ (Lazy Build) (Trigger) │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ LINEAGE = Complete record of transformations │ │
│ │ • Enables fault tolerance (recompute lost partitions) │ │
│ │ • No data duplication needed │ │
│ │ • Optimization via DAG scheduler │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ RDD MEMORY MODEL & STORAGE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────── Executor Memory ──────────────────────┐ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Storage │ │ Execution │ │ User │ │ │
│ │ │ Memory │ │ Memory │ │ Memory │ │ │
│ │ │ (50%) │ │ (50%) │ │ │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ │ │ │
│ │ Storage Memory: │ │
│ │ ├── Cached RDDs (MEMORY_ONLY) │ │
│ │ ├── Broadcast variables │ │
│ │ └── Unroll memory for deserialization │ │
│ │ │ │
│ │ Execution Memory: │ │
│ │ ├── Shuffle read/write buffers │ │
│ │ ├── Join hash tables │ │
│ │ ├── Sort operations │ │
│ │ └── Aggregation buffers │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ Storage/Execution memory can borrow from each other │
│ (unified memory management in Spark 1.6+) │
└─────────────────────────────────────────────────────────────────┘
📚 Detailed Explanation
1. What is an RDD?
An RDD (Resilient Distributed Dataset) is the fundamental data structure in Apache Spark. It represents an immutable, partitioned collection of elements that can be operated on in parallel. The key properties that define an RDD are:
- Resilient: Fault-tolerant with lineage information, enabling recomputation of lost partitions
- Distributed: Data is distributed across multiple nodes in the cluster
- Dataset: A collection of partitioned data with primitive values or values of values (e.g., tuples, objects)
2. RDD Internals: Partitioning and Blocks
Each RDD is divided into partitions, which are logical chunks of data that can be processed independently. The number of partitions determines the degree of parallelism. A partition maps to a block on a single node, and each block is stored in memory or on disk. The default block size is 128 MB in HDFS, which Spark respects when reading data.
3. Lineage and Fault Tolerance
The lineage of an RDD is the complete sequence of transformations used to build it. Spark stores this information as a Directed Acyclic Graph (DAG). When a partition is lost due to node failure, Spark uses the lineage to recompute only the lost partition, rather than replicating the entire dataset. This is more efficient than data replication used in systems like Hadoop MapReduce.
4. Lazy Evaluation Model
Spark uses lazy evaluation, meaning transformations are not executed immediately. Instead, Spark builds a DAG of transformations and only executes them when an action is called. This allows Spark to optimize the execution plan by:
- Combining multiple transformations into a single stage
- Pipelining narrow transformations
- Reducing data shuffling
- Planning shuffle operations efficiently
5. Narrow vs Wide Transformations
Transformations are divided into two categories:
- Narrow transformations: Each partition of the parent RDD is used by at most one partition of the child RDD (e.g., map, filter, union). These can be pipelined.
- Wide transformations: Each partition of the parent RDD may be depended on by multiple child partitions (e.g., groupByKey, reduceByKey, join). These require a shuffle across the network.
6. RDD Actions and Execution
Actions trigger computation and return results to the driver or write data to external storage. Examples include collect(), count(), first(), take(), reduce(), and saveAsTextFile(). When an action is called, Spark's DAG scheduler creates stages based on shuffle boundaries, and the task scheduler executes tasks across the cluster.
7. RDD Persistence and Caching
RDDs can be persisted or cached in memory or disk using various storage levels:
MEMORY_ONLY: Store as deserialized objects in JVM heapMEMORY_AND_DISK: Spill to disk if not enough memoryMEMORY_ONLY_SER: Store as serialized objects (space efficient)MEMORY_AND_DISK_SER: Serialized with disk spillDISK_ONLY: Only on diskOFF_HEAP: Outside JVM (Tachyon/Alluxio)
8. RDD vs DataFrame vs Dataset
While RDDs provide low-level control, they lack optimizations available in DataFrames and Datasets:
- RDD: No schema, no query optimization, JVM objects only
- DataFrame: Schema-aware, Catalyst optimizer, Tungsten execution engine
- Dataset: Type-safe API with Catalyst optimizations (Scala/Java only)
9. Performance Considerations
- Serialization overhead is significant; use Kryo serialization
- Avoid
collect()on large datasets; usetake()orforeach() - Partition data appropriately to minimize shuffle
- Cache RDDs that are reused across multiple actions
- Use
persist()with appropriate storage levels
🔑 Key Concepts Table
| Concept | Description | Example |
|---|---|---|
| Partition | Logical chunk of data for parallel processing | rdd.getNumPartitions() |
| Lineage | DAG of transformations for fault tolerance | rdd.toDebugString() |
| Narrow Transform | 1:1 partition mapping, no shuffle | map(), filter(), flatMap() |
| Wide Transform | M:N partition mapping, requires shuffle | groupByKey(), reduceByKey(), join() |
| Lazy Evaluation | Transforms built but not executed until action | Build DAG → Action triggers execution |
| Action | Triggers computation, returns result | collect(), count(), first() |
| Cache/Persist | Store RDD in memory/disk for reuse | rdd.cache() or rdd.persist() |
| Checkpoint | Write RDD to reliable storage, truncate lineage | rdd.checkpoint() |
| Broadcast | Read-only variable cached on each executor | sc.broadcast(variable) |
| Accumulator | Write-only variable for aggregations | sc.accumulator(0) |
💻 Code Examples
Example 1: Basic RDD Operations
from pyspark import SparkContext, SparkConf
# Initialize Spark Context
conf = SparkConf().setAppName("RDDFundamentals").setMaster("local[*]")
sc = SparkContext(conf=conf)
# Create RDD from collection
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data, 4) # 4 partitions
# Basic transformations
squared_rdd = rdd.map(lambda x: x ** 2)
even_rdd = rdd.filter(lambda x: x % 2 == 0)
flattened = rdd.flatMap(lambda x: [x, x * 10])
# Actions
print(f"Count: {rdd.count()}") # 10
print(f"First: {rdd.first()}") # 1
print(f"Sum: {rdd.reduce(lambda a, b: a + b)}") # 55
print(f"Collected: {squared_rdd.collect()}") # [1, 4, 9, ..., 100]
# Inspect partitions
print(f"Number of partitions: {rdd.getNumPartitions()}")
print(f"Debug string:\n{rdd.toDebugString().decode()}")
Example 2: Advanced Transformations with Key-Value Pairs
# Create key-value RDD
pairs = sc.parallelize([("a", 1), ("b", 2), ("a", 3), ("b", 4), ("c", 5)])
# GroupByKey - collects all values for each key (expensive!)
grouped = pairs.groupByKey()
for key, values in grouped.collect():
print(f"{key}: {list(values)}") # a: [1, 3], b: [2, 4], c: [5]
# ReduceByKey - combines values per partition first (more efficient)
reduced = pairs.reduceByKey(lambda a, b: a + b)
print(reduced.collect()) # [('a', 4), ('b', 6), ('c', 5)]
# CombineByKey - most flexible aggregation
combined = pairs.combineByKey(
lambda v: (v, 1), # createCombiner
lambda acc, v: (acc[0] + v, acc[1] + 1), # mergeValue
lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]) # mergeCombiners
)
# Calculate average
averages = combined.mapValues(lambda x: x[0] / x[1])
print(averages.collect()) # [('a', 2.0), ('b', 3.0), ('c', 5.0)]
Example 3: RDD Persistence and Checkpointing
from pyspark import StorageLevel
# Create expensive RDD
expensive_rdd = sc.parallelize(range(1000000)).map(lambda x: x ** 3).filter(lambda x: x % 7 == 0)
# Cache with different storage levels
expensive_rdd.cache() # MEMORY_ONLY (default)
# expensive_rdd.persist(StorageLevel.MEMORY_AND_DISK)
# expensive_rdd.persist(StorageLevel.MEMORY_ONLY_SER)
# First action triggers computation and caching
count = expensive_rdd.count()
print(f"Count: {count}")
# Second action uses cached version (much faster!)
sum_val = expensive_rdd.reduce(lambda a, b: a + b)
print(f"Sum: {sum_val}")
# Checkpointing - truncates lineage
sc.setCheckpointDir("hdfs:///checkpoint_dir")
expensive_rdd.checkpoint()
expensive_rdd.count() # Triggers checkpoint
# After checkpoint, lineage is truncated
print(f"Debug after checkpoint:\n{expensive_rdd.toDebugString().decode()}")
Example 4: Broadcast Variables and Accumulators
# Broadcast variable - large lookup table
lookup = {"a": 1, "b": 2, "c": 3, "d": 4, "e": 5}
broadcast_lookup = sc.broadcast(lookup)
# Use broadcast in transformation
data = sc.parallelize(["a", "b", "c", "d", "e", "a", "b"])
mapped = data.map(lambda x: broadcast_lookup.value.get(x, 0))
print(mapped.collect()) # [1, 2, 3, 4, 5, 1, 2]
# Accumulator - distributed counter
counter = sc.accumulator(0)
def increment_counter(x):
global counter
counter.add(1)
return x
data.foreach(increment_counter)
print(f"Counter value: {counter.value}") # 7
📊 Performance Metrics
| Metric | RDD (Python) | RDD (Scala) | DataFrame |
|---|---|---|---|
| Serialization Speed | 100 MB/s | 500 MB/s | 800 MB/s |
| Deserialization Speed | 80 MB/s | 400 MB/s | 700 MB/s |
| Memory Usage | 3x object size | 1.5x object size | 1x (Tungsten) |
| Shuffle Speed | 50 MB/s | 200 MB/s | 350 MB/s |
| GC Overhead | High | Medium | Low |
| Type Safety | Runtime | Compile-time | Runtime |
| Query Optimization | None | None | Catalyst |
| Python Function Call Overhead | ~100ns | N/A | N/A |
| Partition Processing | Per-record | Per-record | Vectorized |
✅ Best Practices
1. Minimize Data Shuffling
# BAD: Triggers full shuffle
grouped = rdd.groupByKey().mapValues(sum)
# GOOD: Combines per partition first
reduced = rdd.reduceByKey(lambda a, b: a + b)
2. Use Appropriate Serialization
# Configure Kryo serialization
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrationRequired", "true")
conf.registerKryoClass([MyClass1, MyClass2])
3. Partition Wisely
# Repartition for parallelism
rdd = rdd.repartition(100) # Increase partitions for more parallelism
# Coalesce to reduce partitions without full shuffle
rdd = rdd.coalesce(10) # Reduce partitions
4. Avoid collect() on Large Data
# BAD: Brings all data to driver
all_data = rdd.collect()
# GOOD: Use take or foreach
first_10 = rdd.take(10)
rdd.foreach(process_record) # Process on executors
5. Cache Strategically
# Only cache if RDD is reused multiple times
if rdd.is_cached:
rdd.unpersist()
# Use appropriate storage level
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) # For large datasets
6. Monitor and Tune
# Check partition distribution
print(f"Partition count: {rdd.getNumPartitions()}")
print(f"Partition sizes: {rdd.mapPartitionsWithIndex(lambda i, it: [(i, sum(1 for _ in it))]).collect()}")
See Also
- Kafka Streams (kafka/03): Stream processing with RDD-based DStreams
- Data Engineering Streaming (data-engineering/022): RDD-based streaming pipeline architecture