π PySpark Transformation Types
DfNarrow Transformation
A narrow transformation is one where each partition of the parent RDD contributes to at most one partition of the child RDD. No data movement (shuffle) is required. Examples: map, filter, flatMap, mapPartitions, union.
DfWide Transformation
A wide transformation (shuffle transformation) is one where each partition of the parent RDD may be depended on by multiple child partitions. Data must be redistributed across the network via shuffle. Examples: groupByKey, reduceByKey, join, distinct, repartition.
Shuffle Partition Size Estimate
Here,
- =Estimated size of each output partition after shuffle
- =Total number of rows
- =Average row width in bytes
- =Skew factor (1.0 = uniform, >1.0 = skewed)
- =Number of output partitions
Wide transformations introduce shuffle barriers in the DAG. The DAG Scheduler splits the execution plan into stages at each shuffle boundary. Within each stage, narrow transformations are pipelined into a single task.
Prefer reduceByKey over groupByKey when possible. reduceByKey performs the aggregation map-side (before shuffle), reducing data volume by typically 10xβ100x before the network transfer.
ThShuffle Bottleneck Theorem
Theorem: The performance of any wide transformation is bounded by max(partition_write_time, network_transfer_time, partition_read_time). This is why data skew (F_{skew} >> 1) causes stragglers β the slowest partition determines overall stage completion time.
- Narrow transformations: 1:1 partition mapping, no shuffle, pipelineable
- Wide transformations: M:N partition mapping, shuffle required, define stage boundaries
- Shuffle cost = write + network + read + sort per partition
reduceByKeyreduces shuffle volume by aggregating map-side before shuffle- Data skew (F_{skew}) causes stragglers and OOM in shuffle operations
ποΈ Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β TRANSFORMATION CLASSIFICATION β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β NARROW TRANSFORMATIONS β β
β β (No data movement between partitions) β β
β β β β
β β Partition 0 βββββββββββββββββββββββββββΊ Partition 0' β β
β β βββββββββββ map(), filter() βββββββββββ β β
β β β A B C β ββββββββββββββββββββββΊ β a b c β β β
β β βββββββββββ βββββββββββ β β
β β β β
β β Partition 1 βββββββββββββββββββββββββββΊ Partition 1' β β
β β βββββββββββ βββββββββββ β β
β β β D E F β ββββββββββββββββββββββΊ β d e f β β β
β β βββββββββββ βββββββββββ β β
β β β β
β β Partition 2 βββββββββββββββββββββββββββΊ Partition 2' β β
β β βββββββββββ βββββββββββ β β
β β β G H I β ββββββββββββββββββββββΊ β g h i β β β
β β βββββββββββ βββββββββββ β β
β β β β
β β KEY: Each parent partition maps to ONE child partition β β
β β PIPELINE: Can be executed in a single task β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β WIDE TRANSFORMATIONS β β
β β (Data movement between partitions = SHUFFLE) β β
β β β β
β β Partition 0 βββββββββ¬ββββββββββββββΊ Partition 0' β β
β β βββββββββββ β βββββββββββ β β
β β β A B C β ββββββββΌββββββββββΊ β A C β β β
β β βββββββββββ β βββββββββββ β β
β β β β β
β β Partition 1 ββββββββΌββββ¬ββββββββΊ Partition 1' β β
β β βββββββββββ β β βββββββββββ β β
β β β D E F β ββββββββΌββββΌββββββΊ β B D F β β β
β β βββββββββββ β β βββββββββββ β β
β β β β β β
β β Partition 2 ββββββββ΄ββββ΄ββββββββΊ Partition 2' β β
β β βββββββββββ β βββββββββββ β β
β β β G H I β ββββββββββββ΄ββββββΊ β E G H I β β β
β β βββββββββββ βββββββββββ β β
β β β β
β β KEY: One parent partition maps to MULTIPLE children β β
β β SHUFFLE: Data must be redistributed across network β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SHUFFLE OPERATION FLOW β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β STAGE 1: Map Phase (Write Shuffle) β β
β β β β
β β βββββββββββ βββββββββββ βββββββββββ β β
β β βPartitionβ βPartitionβ βPartitionβ β β
β β β 0 β β 1 β β 2 β β β
β β ββββββ¬ββββ ββββββ¬ββββ ββββββ¬ββββ β β
β β β β β β β
β β βΌ βΌ βΌ β β
β β βββββββββββ βββββββββββ βββββββββββ β β
β β β Shuffle β β Shuffle β β Shuffle β β β
β β β Writer β β Writer β β Writer β β β
β β ββββββ¬ββββ ββββββ¬ββββ ββββββ¬ββββ β β
β β β β β β β
β β βΌ βΌ βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Shuffle Write Files β β β
β β β βββββββββ βββββββββ βββββββββ βββββββββ β β β
β β β β reduceβ β reduceβ β reduceβ β reduceβ β β β
β β β β 0 β β 1 β β 2 β β 3 β β β β
β β β βββββββββ βββββββββ βββββββββ βββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β STAGE 2: Reduce Phase (Read Shuffle) β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Shuffle Read β β β
β β β βββββββββ βββββββββ βββββββββ βββββββββ β β β
β β β β reduceβ β reduceβ β reduceβ β reduceβ β β β
β β β β 0 β β 1 β β 2 β β 3 β β β β
β β β βββββββββ βββββββββ βββββββββ βββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β β β
β β βΌ βΌ βΌ βΌ β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β β Shuffle β β Shuffle β β Shuffle β β Shuffle β β β
β β β Reader β β Reader β β Reader β β Reader β β β
β β ββββββ¬ββββ ββββββ¬ββββ ββββββ¬ββββ ββββββ¬ββββ β β
β β β β β β β β
β β βΌ βΌ βΌ βΌ β β
β β βββββββββββ βββββββββββ βββββββββββ ββββββββββββ β
β β βPartitionβ βPartitionβ βPartitionβ βPartitionββ β
β β β 0 β β 1 β β 2 β β 3 ββ β
β β βββββββββββ βββββββββββ βββββββββββ ββββββββββββ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β SHUFFLE BOUNDARY: Marks separation between stages β
β EXPENSIVE: Network I/O + Disk I/O + Serialization β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β TRANSFORMATION PIPELINE EXAMPLE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Input: textFile("logs.txt") β
β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β
β β log_0 β β log_1 β β log_2 β β log_3 β β
β ββββββ¬βββββ ββββββ¬βββββ ββββββ¬βββββ ββββββ¬βββββ β
β β β β β β
β βΌ βΌ βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β NARROW: flatMap(line => line.split(" ")) β β
β β βββββββ βββββββ βββββββ βββββββ β β
β β βwordsβ βwordsβ βwordsβ βwordsβ β β
β β βββββββ βββββββ βββββββ βββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β β β
β βΌ βΌ βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β NARROW: filter(word => word.length > 3) β β
β β βββββββ βββββββ βββββββ βββββββ β β
β β βlong β βlong β βlong β βlong β β β
β β βββββββ βββββββ βββββββ βββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β β β
β βΌ βΌ βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β NARROW: map(word => (word, 1)) β β
β β βββββββ βββββββ βββββββ βββββββ β β
β β βpair β βpair β βpair β βpair β β β
β β βββββββ βββββββ βββββββ βββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β β β
β βΌ βΌ βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β WIDE: reduceByKey(_ + _) β SHUFFLE BOUNDARY β β
β β βββββββ βββββββ βββββββ βββββββ β β
β β β β β β β β β β β β
β β βββββββ βββββββ βββββββ βββββββ β β
β β βΌ βΌ βΌ βΌ β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β β (word, β β (word, β β (word, β β (word, β β β
β β β count)β β count)β β count)β β count)β β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β PIPELINE: NARROW transforms are pipelined (no shuffle) β
β STAGE BOUNDARY: Created at WIDE transformations β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Detailed Explanation
1. Narrow Transformations
Narrow transformations are operations where each input partition contributes to at most one output partition. Data stays within the same partition and no network shuffling is required. These operations can be pipelined together, meaning multiple narrow transformations can be executed in a single task without intermediate disk writes.
Characteristics:
- 1:1 mapping between input and output partitions
- No data movement across the network
- Can be pipelined (executed together in one stage)
- Efficient memory usage
- Low latency
Examples:
map(func): Apply function to each elementflatMap(func): Apply function that returns iteratorfilter(func): Keep elements where function returns TruemapPartitions(func): Apply function to each partitionmapPartitionsWithIndex(func): Like mapPartitions with partition indexunion(other): Return union of RDDssample(fraction): Random sample
2. Wide Transformations
Wide transformations are operations where a single input partition may be needed by multiple output partitions. This requires data to be redistributed across the network, a process called shuffle. Each wide transformation creates a new stage in the execution plan.
Characteristics:
- M:N mapping between input and output partitions
- Data movement across the network (shuffle)
- Creates stage boundaries
- Expensive (network I/O, disk I/O, serialization)
- Required for many important operations
Examples:
groupByKey(): Group values by keyreduceByKey(func): Reduce values by keyjoin(other): Join two RDDsrepartition(n): Reshuffle into n partitionsdistinct(): Remove duplicatessort(): Sort elementscoalesce(n): Reduce partitions (narrow if decreasing)
3. Shuffle Deep Dive
A shuffle is the process of redistributing data across partitions, typically across the cluster network. Shuffles are the most expensive operations in Spark and should be minimized.
Shuffle Stages:
- Map Phase: Each executor writes shuffle files to local disk
- Fetch Phase: Executors fetch shuffle data from other executors
- Reduce Phase: Process fetched data
Shuffle Write:
- Each mapper creates one file per reducer
- Data is partitioned by key using hash partitioning
- Written to local disk (not memory)
Shuffle Read:
- Each reducer fetches from all mappers
- Data is deserialized and merged
- Memory pressure can cause spill to disk
4. Shuffle Optimization Techniques
Broadcast Join:
# Avoid shuffle by broadcasting small table
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")
Partitioning:
# Pre-partition data to avoid shuffle
df = df.repartition(100, "key")
df.write.parquet("output/")
Bucketing:
# Bucket data by join key
df.write.bucketBy(100, "user_id").saveAsTable("users")
Coalesce vs Repartition:
# Coalesce: Reduce partitions without full shuffle (narrow)
df.coalesce(10)
# Repartition: Increase or decrease with full shuffle (wide)
df.repartition(100)
5. Stage Boundaries
Spark divides execution into stages at shuffle boundaries:
- Stage 0: Input + narrow transformations
- Stage 1: After first shuffle + narrow transformations
- Stage N: After N-th shuffle + narrow transformations
- Final Stage: Last shuffle + output
Task Scheduling:
- One task per partition per stage
- Tasks are scheduled to data locality
- Stages are executed when parent stages complete
6. Performance Implications
Narrow Transformations:
- Low latency (no network overhead)
- High throughput (can process data in-place)
- Good cache locality
- Minimal memory overhead
Wide Transformations:
- High latency (network round trips)
- Disk I/O for shuffle files
- Serialization/deserialization overhead
- Memory pressure from buffering
7. Common Shuffle Patterns
GroupBy + Aggregate:
# BAD: groupByKey + mapValues (two shuffles)
rdd.groupByKey().mapValues(sum)
# GOOD: reduceByKey (single shuffle)
rdd.reduceByKey(lambda a, b: a + b)
Join:
# Join always requires shuffle (unless broadcast)
joined = rdd1.join(rdd2)
Distinct:
# Requires shuffle to deduplicate across partitions
distinct = rdd.distinct()
π Key Concepts Table
| Concept | Type | Shuffle? | Example |
|---|---|---|---|
| map | Narrow | No | rdd.map(lambda x: x * 2) |
| filter | Narrow | No | rdd.filter(lambda x: x > 0) |
| flatMap | Narrow | No | rdd.flatMap(lambda x: x.split()) |
| mapPartitions | Narrow | No | rdd.mapPartitions(process_partition) |
| union | Narrow | No | rdd1.union(rdd2) |
| groupByKey | Wide | Yes | rdd.groupByKey() |
| reduceByKey | Wide | Yes | rdd.reduceByKey(lambda a,b: a+b) |
| join | Wide | Yes | rdd1.join(rdd2) |
| repartition | Wide | Yes | rdd.repartition(100) |
| distinct | Wide | Yes | rdd.distinct() |
| coalesce | Mixed | No (β) / Yes (β) | rdd.coalesce(10) |
| sort | Wide | Yes | rdd.sortByKey() |
π» Code Examples
Example 1: Narrow Transformations
from pyspark import SparkContext
sc = SparkContext("local", "TransformationTypes")
# Create RDD
rdd = sc.parallelize(range(1, 101), 4)
# Narrow transformations (no shuffle)
mapped = rdd.map(lambda x: x * 2) # Double each element
filtered = rdd.filter(lambda x: x % 2 == 0) # Keep even numbers
flatMapped = rdd.flatMap(lambda x: [x, x * 10]) # Expand each element
sampled = rdd.sample(False, 0.1) # 10% sample
mappedPartitions = rdd.mapPartitions(lambda it: [sum(it)]) # Sum per partition
# Execute actions
print(f"Mapped sum: {mapped.sum()}")
print(f"Filtered count: {filtered.count()}")
print(f"FlatMapped count: {flatMapped.count()}")
print(f"Sampled count: {sampled.count()}")
print(f"Per-partition sums: {mappedPartitions.collect()}")
# No shuffle means these can be pipelined
# Single stage execution
Example 2: Wide Transformations and Shuffle
# Wide transformations (require shuffle)
words = sc.parallelize(["hello world", "hello spark", "world spark"], 2)
# Split words (narrow)
word_pairs = words.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1))
# ReduceByKey (wide) - single shuffle
word_counts = word_pairs.reduceByKey(lambda a, b: a + b)
print(f"Word counts: {word_counts.collect()}")
# GroupByKey (wide) - less efficient
word_groups = word_pairs.groupByKey()
for word, counts in word_groups.collect():
print(f"{word}: {list(counts)}")
# Join (wide)
rdd1 = sc.parallelize([(1, "a"), (2, "b"), (3, "c")])
rdd2 = sc.parallelize([(1, "x"), (2, "y"), (3, "z")])
joined = rdd1.join(rdd2) # Shuffle required
print(f"Joined: {joined.collect()}")
# Distinct (wide)
duplicated = sc.parallelize([1, 2, 2, 3, 3, 3])
distinct = duplicated.distinct() # Shuffle required
print(f"Distinct: {distinct.collect()}")
Example 3: Shuffle Optimization
# Create large dataset
large_rdd = sc.parallelize(range(1000000), 8)
# BAD: groupByKey + mapValues (two shuffles)
word_pairs = large_rdd.map(lambda x: (x % 100, x))
grouped = word_pairs.groupByKey()
result_bad = grouped.mapValues(sum)
# GOOD: reduceByKey (single shuffle)
result_good = word_pairs.reduceByKey(lambda a, b: a + b)
# Compare execution plans
print("=== BAD (groupByKey) ===")
result_bad.toDebugString().decode()[:500]
print("\n=== GOOD (reduceByKey) ===")
result_good.toDebugString().decode()[:500]
# Performance comparison
import time
start = time.time()
_ = result_bad.collect()
print(f"groupByKey: {time.time() - start:.2f}s")
start = time.time()
_ = result_good.collect()
print(f"reduceByKey: {time.time() - start:.2f}s")
Example 4: Partitioning Strategies
# Repartition vs Coalesce
rdd = sc.parallelize(range(1000), 10)
print(f"Initial partitions: {rdd.getNumPartitions()}")
# Coalesce: Reduce partitions without full shuffle
coalesced = rdd.coalesce(5)
print(f"After coalesce(5): {coalesced.getNumPartitions()}")
# Repartition: Increase or decrease with full shuffle
repartitioned = rdd.repartition(20)
print(f"After repartition(20): {repartitioned.getNumPartitions()}")
# Partition by key
key_value_rdd = sc.parallelize([(i % 10, i) for i in range(100)])
partitioned = key_value_rdd.partitionBy(5, lambda k: k % 5)
# Check partition distribution
def count_per_partition(rdd):
return rdd.mapPartitionsWithIndex(
lambda idx, it: [(idx, sum(1 for _ in it))]
).collect()
print(f"Partition counts: {count_per_partition(partitioned)}")
π Performance Metrics
| Operation | Narrow (ms) | Wide (ms) | Shuffle Size (MB) | Network (MB/s) |
|---|---|---|---|---|
| map() | 45 | N/A | 0 | 0 |
| filter() | 35 | N/A | 0 | 0 |
| flatMap() | 50 | N/A | 0 | 0 |
| groupByKey() | N/A | 850 | 250 | 80 |
| reduceByKey() | N/A | 620 | 180 | 120 |
| join() | N/A | 1200 | 400 | 100 |
| repartition() | N/A | 750 | 300 | 90 |
| distinct() | N/A | 680 | 200 | 110 |
| sort() | N/A | 900 | 350 | 85 |
β Best Practices
1. Minimize Wide Transformations
# BAD: Multiple wide transformations
result = rdd.groupByKey().mapValues(sum).sortByKey()
# GOOD: Combine operations
result = rdd.reduceByKey(lambda a, b: a + b).sortByKey()
2. Use reduceByKey Instead of groupByKey
# BAD: groupByKey collects all values to memory
grouped = rdd.groupByKey().mapValues(sum)
# GOOD: reduceByKey combines per partition first
reduced = rdd.reduceByKey(lambda a, b: a + b)
3. Broadcast Small Tables
from pyspark.sql.functions import broadcast
# Avoids shuffle for join with small table
result = large_df.join(broadcast(small_df), "key")
4. Repartition for Parallelism
# Increase partitions for more parallelism
rdd = rdd.repartition(100)
# Decrease partitions with coalesce (no shuffle)
rdd = rdd.coalesce(10)
5. Cache Intermediate Results
# Cache if RDD is reused across multiple actions
rdd.cache() # or persist() with specific storage level
# Uncache when no longer needed
rdd.unpersist()
6. Monitor Shuffle Metrics
# Check shuffle metrics in Spark UI
# Look for:
# - Shuffle Read Size / Records
# - Shuffle Write Size / Records
# - Shuffle Spill (Memory/Disk)
See Also
- Kafka Streams (kafka/03): Transformation types in stream processing
- Data Engineering Streaming (data-engineering/022): Shuffle optimization in streaming pipelines