PySpark Transformation Types: Narrow vs Wide, Shuffle Operations

Free Lesson

Advertisement

πŸ”„ PySpark Transformation Types

DfNarrow Transformation

A narrow transformation is one where each partition of the parent RDD contributes to at most one partition of the child RDD. No data movement (shuffle) is required. Examples: map, filter, flatMap, mapPartitions, union.

DfWide Transformation

A wide transformation (shuffle transformation) is one where each partition of the parent RDD may be depended on by multiple child partitions. Data must be redistributed across the network via shuffle. Examples: groupByKey, reduceByKey, join, distinct, repartition.

Costshuffle=Npartitionstimes(Swrite+Snetwork+Sread)+SsortCost_{shuffle} = N_{partitions} \\times (S_{write} + S_{network} + S_{read}) + S_{sort}

Shuffle Partition Size Estimate

Spartition=fracNrowstimesWrowtimesFskewPoutputS_{partition} = \\frac{N_{rows} \\times W_{row} \\times F_{skew}}{P_{output}}

Here,

  • SpartitionS_{partition}=Estimated size of each output partition after shuffle
  • NrowsN_{rows}=Total number of rows
  • WrowW_{row}=Average row width in bytes
  • FskewF_{skew}=Skew factor (1.0 = uniform, >1.0 = skewed)
  • PoutputP_{output}=Number of output partitions

Wide transformations introduce shuffle barriers in the DAG. The DAG Scheduler splits the execution plan into stages at each shuffle boundary. Within each stage, narrow transformations are pipelined into a single task.

Prefer reduceByKey over groupByKey when possible. reduceByKey performs the aggregation map-side (before shuffle), reducing data volume by typically 10x–100x before the network transfer.

ThShuffle Bottleneck Theorem

Theorem: The performance of any wide transformation is bounded by max(partition_write_time, network_transfer_time, partition_read_time). This is why data skew (F_{skew} >> 1) causes stragglers β€” the slowest partition determines overall stage completion time.

  • Narrow transformations: 1:1 partition mapping, no shuffle, pipelineable
  • Wide transformations: M:N partition mapping, shuffle required, define stage boundaries
  • Shuffle cost = write + network + read + sort per partition
  • reduceByKey reduces shuffle volume by aggregating map-side before shuffle
  • Data skew (F_{skew}) causes stragglers and OOM in shuffle operations

πŸ—οΈ Architecture Diagram

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                TRANSFORMATION CLASSIFICATION                     β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚               NARROW TRANSFORMATIONS                     β”‚   β”‚
β”‚  β”‚  (No data movement between partitions)                   β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  Partition 0 ──────────────────────────► Partition 0'   β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    map(), filter()      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”‚   β”‚
β”‚  β”‚  β”‚ A B C   β”‚ ─────────────────────►  β”‚ a b c   β”‚       β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                          β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  Partition 1 ──────────────────────────► Partition 1'   β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”‚   β”‚
β”‚  β”‚  β”‚ D E F   β”‚ ─────────────────────►  β”‚ d e f   β”‚       β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                          β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  Partition 2 ──────────────────────────► Partition 2'   β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”‚   β”‚
β”‚  β”‚  β”‚ G H I   β”‚ ─────────────────────►  β”‚ g h i   β”‚       β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                          β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  KEY: Each parent partition maps to ONE child partition β”‚   β”‚
β”‚  β”‚  PIPELINE: Can be executed in a single task             β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                WIDE TRANSFORMATIONS                      β”‚   β”‚
β”‚  β”‚  (Data movement between partitions = SHUFFLE)            β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  Partition 0 ────────┬─────────────► Partition 0'       β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”‚            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”           β”‚   β”‚
β”‚  β”‚  β”‚ A B C   β”‚ ───────┼─────────► β”‚ A C     β”‚           β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜        β”‚            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜           β”‚   β”‚
β”‚  β”‚                     β”‚                                    β”‚   β”‚
β”‚  β”‚  Partition 1 ───────┼───┬───────► Partition 1'         β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”‚   β”‚        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”           β”‚   β”‚
β”‚  β”‚  β”‚ D E F   β”‚ ───────┼───┼─────► β”‚ B D F   β”‚           β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜        β”‚   β”‚        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜           β”‚   β”‚
β”‚  β”‚                     β”‚   β”‚                                β”‚   β”‚
β”‚  β”‚  Partition 2 ───────┴───┴───────► Partition 2'         β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”‚        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”           β”‚   β”‚
β”‚  β”‚  β”‚ G H I   β”‚ ───────────┴─────► β”‚ E G H I β”‚           β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜           β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  KEY: One parent partition maps to MULTIPLE children   β”‚   β”‚
β”‚  β”‚  SHUFFLE: Data must be redistributed across network     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    SHUFFLE OPERATION FLOW                         β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ STAGE 1: Map Phase (Write Shuffle)                      β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                β”‚   β”‚
β”‚  β”‚  β”‚Partitionβ”‚  β”‚Partitionβ”‚  β”‚Partitionβ”‚                β”‚   β”‚
β”‚  β”‚  β”‚   0    β”‚  β”‚   1    β”‚  β”‚   2    β”‚                β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”˜                β”‚   β”‚
β”‚  β”‚       β”‚            β”‚            β”‚                      β”‚   β”‚
β”‚  β”‚       β–Ό            β–Ό            β–Ό                      β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”‚   β”‚
β”‚  β”‚  β”‚ Shuffle β”‚  β”‚ Shuffle β”‚  β”‚ Shuffle β”‚              β”‚   β”‚
β”‚  β”‚  β”‚ Writer  β”‚  β”‚ Writer  β”‚  β”‚ Writer  β”‚              β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”˜              β”‚   β”‚
β”‚  β”‚       β”‚            β”‚            β”‚                      β”‚   β”‚
β”‚  β”‚       β–Ό            β–Ό            β–Ό                      β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β”‚
β”‚  β”‚  β”‚              Shuffle Write Files                 β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”     β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  β”‚ reduceβ”‚ β”‚ reduceβ”‚ β”‚ reduceβ”‚ β”‚ reduceβ”‚     β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  β”‚   0   β”‚ β”‚   1   β”‚ β”‚   2   β”‚ β”‚   3   β”‚     β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚  β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                          β”‚                                      β”‚
β”‚                          β–Ό                                      β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ STAGE 2: Reduce Phase (Read Shuffle)                    β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚              Shuffle Read                        β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”     β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚ reduceβ”‚ β”‚ reduceβ”‚ β”‚ reduceβ”‚ β”‚ reduceβ”‚     β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚   0   β”‚ β”‚   1   β”‚ β”‚   2   β”‚ β”‚   3   β”‚     β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚       β”‚            β”‚            β”‚            β”‚          β”‚   β”‚
β”‚  β”‚       β–Ό            β–Ό            β–Ό            β–Ό          β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚   β”‚
β”‚  β”‚  β”‚ Shuffle β”‚  β”‚ Shuffle β”‚  β”‚ Shuffle β”‚  β”‚ Shuffle β”‚ β”‚   β”‚
β”‚  β”‚  β”‚ Reader  β”‚  β”‚ Reader  β”‚  β”‚ Reader  β”‚  β”‚ Reader  β”‚ β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”˜ β”‚   β”‚
β”‚  β”‚       β”‚            β”‚            β”‚            β”‚        β”‚   β”‚
β”‚  β”‚       β–Ό            β–Ό            β–Ό            β–Ό        β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚   β”‚
β”‚  β”‚  β”‚Partitionβ”‚  β”‚Partitionβ”‚  β”‚Partitionβ”‚  β”‚Partitionβ”‚β”‚   β”‚
β”‚  β”‚  β”‚   0    β”‚  β”‚   1    β”‚  β”‚   2    β”‚  β”‚   3    β”‚β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β”‚  SHUFFLE BOUNDARY: Marks separation between stages             β”‚
β”‚  EXPENSIVE: Network I/O + Disk I/O + Serialization             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              TRANSFORMATION PIPELINE EXAMPLE                     β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  Input: textFile("logs.txt")                                   β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”‚
β”‚  β”‚ log_0   β”‚ β”‚ log_1   β”‚ β”‚ log_2   β”‚ β”‚ log_3   β”‚            β”‚
β”‚  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜            β”‚
β”‚       β”‚           β”‚           β”‚           β”‚                    β”‚
β”‚       β–Ό           β–Ό           β–Ό           β–Ό                    β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚ NARROW: flatMap(line => line.split(" "))                β”‚  β”‚
β”‚  β”‚ β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”                      β”‚  β”‚
β”‚  β”‚ β”‚wordsβ”‚ β”‚wordsβ”‚ β”‚wordsβ”‚ β”‚wordsβ”‚                      β”‚  β”‚
β”‚  β”‚ β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”˜                      β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚       β”‚           β”‚           β”‚           β”‚                    β”‚
β”‚       β–Ό           β–Ό           β–Ό           β–Ό                    β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚ NARROW: filter(word => word.length > 3)                 β”‚  β”‚
β”‚  β”‚ β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”                      β”‚  β”‚
β”‚  β”‚ β”‚long β”‚ β”‚long β”‚ β”‚long β”‚ β”‚long β”‚                      β”‚  β”‚
β”‚  β”‚ β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”˜                      β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚       β”‚           β”‚           β”‚           β”‚                    β”‚
β”‚       β–Ό           β–Ό           β–Ό           β–Ό                    β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚ NARROW: map(word => (word, 1))                          β”‚  β”‚
β”‚  β”‚ β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”                      β”‚  β”‚
β”‚  β”‚ β”‚pair β”‚ β”‚pair β”‚ β”‚pair β”‚ β”‚pair β”‚                      β”‚  β”‚
β”‚  β”‚ β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”˜                      β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚       β”‚           β”‚           β”‚           β”‚                    β”‚
β”‚       β–Ό           β–Ό           β–Ό           β–Ό                    β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚ WIDE: reduceByKey(_ + _)  ← SHUFFLE BOUNDARY           β”‚  β”‚
β”‚  β”‚ β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”                      β”‚  β”‚
β”‚  β”‚ β”‚     β”‚ β”‚     β”‚ β”‚     β”‚ β”‚     β”‚                      β”‚  β”‚
β”‚  β”‚ β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”˜                      β”‚  β”‚
β”‚  β”‚        β–Ό         β–Ό         β–Ό         β–Ό                  β”‚  β”‚
β”‚  β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚  β”‚
β”‚  β”‚ β”‚ (word, β”‚ β”‚ (word, β”‚ β”‚ (word, β”‚ β”‚ (word, β”‚      β”‚  β”‚
β”‚  β”‚ β”‚  count)β”‚ β”‚  count)β”‚ β”‚  count)β”‚ β”‚  count)β”‚      β”‚  β”‚
β”‚  β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚                                                                 β”‚
β”‚  PIPELINE: NARROW transforms are pipelined (no shuffle)        β”‚
β”‚  STAGE BOUNDARY: Created at WIDE transformations                β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ“š Detailed Explanation

1. Narrow Transformations

Narrow transformations are operations where each input partition contributes to at most one output partition. Data stays within the same partition and no network shuffling is required. These operations can be pipelined together, meaning multiple narrow transformations can be executed in a single task without intermediate disk writes.

Characteristics:

  • 1:1 mapping between input and output partitions
  • No data movement across the network
  • Can be pipelined (executed together in one stage)
  • Efficient memory usage
  • Low latency

Examples:

  • map(func): Apply function to each element
  • flatMap(func): Apply function that returns iterator
  • filter(func): Keep elements where function returns True
  • mapPartitions(func): Apply function to each partition
  • mapPartitionsWithIndex(func): Like mapPartitions with partition index
  • union(other): Return union of RDDs
  • sample(fraction): Random sample

2. Wide Transformations

Wide transformations are operations where a single input partition may be needed by multiple output partitions. This requires data to be redistributed across the network, a process called shuffle. Each wide transformation creates a new stage in the execution plan.

Characteristics:

  • M:N mapping between input and output partitions
  • Data movement across the network (shuffle)
  • Creates stage boundaries
  • Expensive (network I/O, disk I/O, serialization)
  • Required for many important operations

Examples:

  • groupByKey(): Group values by key
  • reduceByKey(func): Reduce values by key
  • join(other): Join two RDDs
  • repartition(n): Reshuffle into n partitions
  • distinct(): Remove duplicates
  • sort(): Sort elements
  • coalesce(n): Reduce partitions (narrow if decreasing)

3. Shuffle Deep Dive

A shuffle is the process of redistributing data across partitions, typically across the cluster network. Shuffles are the most expensive operations in Spark and should be minimized.

Shuffle Stages:

  1. Map Phase: Each executor writes shuffle files to local disk
  2. Fetch Phase: Executors fetch shuffle data from other executors
  3. Reduce Phase: Process fetched data

Shuffle Write:

  • Each mapper creates one file per reducer
  • Data is partitioned by key using hash partitioning
  • Written to local disk (not memory)

Shuffle Read:

  • Each reducer fetches from all mappers
  • Data is deserialized and merged
  • Memory pressure can cause spill to disk

4. Shuffle Optimization Techniques

Broadcast Join:

# Avoid shuffle by broadcasting small table
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")

Partitioning:

# Pre-partition data to avoid shuffle
df = df.repartition(100, "key")
df.write.parquet("output/")

Bucketing:

# Bucket data by join key
df.write.bucketBy(100, "user_id").saveAsTable("users")

Coalesce vs Repartition:

# Coalesce: Reduce partitions without full shuffle (narrow)
df.coalesce(10)

# Repartition: Increase or decrease with full shuffle (wide)
df.repartition(100)

5. Stage Boundaries

Spark divides execution into stages at shuffle boundaries:

  • Stage 0: Input + narrow transformations
  • Stage 1: After first shuffle + narrow transformations
  • Stage N: After N-th shuffle + narrow transformations
  • Final Stage: Last shuffle + output

Task Scheduling:

  • One task per partition per stage
  • Tasks are scheduled to data locality
  • Stages are executed when parent stages complete

6. Performance Implications

Narrow Transformations:

  • Low latency (no network overhead)
  • High throughput (can process data in-place)
  • Good cache locality
  • Minimal memory overhead

Wide Transformations:

  • High latency (network round trips)
  • Disk I/O for shuffle files
  • Serialization/deserialization overhead
  • Memory pressure from buffering

7. Common Shuffle Patterns

GroupBy + Aggregate:

# BAD: groupByKey + mapValues (two shuffles)
rdd.groupByKey().mapValues(sum)

# GOOD: reduceByKey (single shuffle)
rdd.reduceByKey(lambda a, b: a + b)

Join:

# Join always requires shuffle (unless broadcast)
joined = rdd1.join(rdd2)

Distinct:

# Requires shuffle to deduplicate across partitions
distinct = rdd.distinct()

πŸ”‘ Key Concepts Table

ConceptTypeShuffle?Example
mapNarrowNordd.map(lambda x: x * 2)
filterNarrowNordd.filter(lambda x: x > 0)
flatMapNarrowNordd.flatMap(lambda x: x.split())
mapPartitionsNarrowNordd.mapPartitions(process_partition)
unionNarrowNordd1.union(rdd2)
groupByKeyWideYesrdd.groupByKey()
reduceByKeyWideYesrdd.reduceByKey(lambda a,b: a+b)
joinWideYesrdd1.join(rdd2)
repartitionWideYesrdd.repartition(100)
distinctWideYesrdd.distinct()
coalesceMixedNo (↓) / Yes (↑)rdd.coalesce(10)
sortWideYesrdd.sortByKey()

πŸ’» Code Examples

Example 1: Narrow Transformations

from pyspark import SparkContext

sc = SparkContext("local", "TransformationTypes")

# Create RDD
rdd = sc.parallelize(range(1, 101), 4)

# Narrow transformations (no shuffle)
mapped = rdd.map(lambda x: x * 2)  # Double each element
filtered = rdd.filter(lambda x: x % 2 == 0)  # Keep even numbers
flatMapped = rdd.flatMap(lambda x: [x, x * 10])  # Expand each element
sampled = rdd.sample(False, 0.1)  # 10% sample
mappedPartitions = rdd.mapPartitions(lambda it: [sum(it)])  # Sum per partition

# Execute actions
print(f"Mapped sum: {mapped.sum()}")
print(f"Filtered count: {filtered.count()}")
print(f"FlatMapped count: {flatMapped.count()}")
print(f"Sampled count: {sampled.count()}")
print(f"Per-partition sums: {mappedPartitions.collect()}")

# No shuffle means these can be pipelined
# Single stage execution

Example 2: Wide Transformations and Shuffle

# Wide transformations (require shuffle)
words = sc.parallelize(["hello world", "hello spark", "world spark"], 2)

# Split words (narrow)
word_pairs = words.flatMap(lambda line: line.split()) \
                  .map(lambda word: (word, 1))

# ReduceByKey (wide) - single shuffle
word_counts = word_pairs.reduceByKey(lambda a, b: a + b)
print(f"Word counts: {word_counts.collect()}")

# GroupByKey (wide) - less efficient
word_groups = word_pairs.groupByKey()
for word, counts in word_groups.collect():
    print(f"{word}: {list(counts)}")

# Join (wide)
rdd1 = sc.parallelize([(1, "a"), (2, "b"), (3, "c")])
rdd2 = sc.parallelize([(1, "x"), (2, "y"), (3, "z")])
joined = rdd1.join(rdd2)  # Shuffle required
print(f"Joined: {joined.collect()}")

# Distinct (wide)
duplicated = sc.parallelize([1, 2, 2, 3, 3, 3])
distinct = duplicated.distinct()  # Shuffle required
print(f"Distinct: {distinct.collect()}")

Example 3: Shuffle Optimization

# Create large dataset
large_rdd = sc.parallelize(range(1000000), 8)

# BAD: groupByKey + mapValues (two shuffles)
word_pairs = large_rdd.map(lambda x: (x % 100, x))
grouped = word_pairs.groupByKey()
result_bad = grouped.mapValues(sum)

# GOOD: reduceByKey (single shuffle)
result_good = word_pairs.reduceByKey(lambda a, b: a + b)

# Compare execution plans
print("=== BAD (groupByKey) ===")
result_bad.toDebugString().decode()[:500]

print("\n=== GOOD (reduceByKey) ===")
result_good.toDebugString().decode()[:500]

# Performance comparison
import time
start = time.time()
_ = result_bad.collect()
print(f"groupByKey: {time.time() - start:.2f}s")

start = time.time()
_ = result_good.collect()
print(f"reduceByKey: {time.time() - start:.2f}s")

Example 4: Partitioning Strategies

# Repartition vs Coalesce
rdd = sc.parallelize(range(1000), 10)
print(f"Initial partitions: {rdd.getNumPartitions()}")

# Coalesce: Reduce partitions without full shuffle
coalesced = rdd.coalesce(5)
print(f"After coalesce(5): {coalesced.getNumPartitions()}")

# Repartition: Increase or decrease with full shuffle
repartitioned = rdd.repartition(20)
print(f"After repartition(20): {repartitioned.getNumPartitions()}")

# Partition by key
key_value_rdd = sc.parallelize([(i % 10, i) for i in range(100)])
partitioned = key_value_rdd.partitionBy(5, lambda k: k % 5)

# Check partition distribution
def count_per_partition(rdd):
    return rdd.mapPartitionsWithIndex(
        lambda idx, it: [(idx, sum(1 for _ in it))]
    ).collect()

print(f"Partition counts: {count_per_partition(partitioned)}")

πŸ“Š Performance Metrics

OperationNarrow (ms)Wide (ms)Shuffle Size (MB)Network (MB/s)
map()45N/A00
filter()35N/A00
flatMap()50N/A00
groupByKey()N/A85025080
reduceByKey()N/A620180120
join()N/A1200400100
repartition()N/A75030090
distinct()N/A680200110
sort()N/A90035085

βœ… Best Practices

1. Minimize Wide Transformations

# BAD: Multiple wide transformations
result = rdd.groupByKey().mapValues(sum).sortByKey()

# GOOD: Combine operations
result = rdd.reduceByKey(lambda a, b: a + b).sortByKey()

2. Use reduceByKey Instead of groupByKey

# BAD: groupByKey collects all values to memory
grouped = rdd.groupByKey().mapValues(sum)

# GOOD: reduceByKey combines per partition first
reduced = rdd.reduceByKey(lambda a, b: a + b)

3. Broadcast Small Tables

from pyspark.sql.functions import broadcast
# Avoids shuffle for join with small table
result = large_df.join(broadcast(small_df), "key")

4. Repartition for Parallelism

# Increase partitions for more parallelism
rdd = rdd.repartition(100)

# Decrease partitions with coalesce (no shuffle)
rdd = rdd.coalesce(10)

5. Cache Intermediate Results

# Cache if RDD is reused across multiple actions
rdd.cache()  # or persist() with specific storage level

# Uncache when no longer needed
rdd.unpersist()

6. Monitor Shuffle Metrics

# Check shuffle metrics in Spark UI
# Look for:
# - Shuffle Read Size / Records
# - Shuffle Write Size / Records
# - Shuffle Spill (Memory/Disk)

See Also

  • Kafka Streams (kafka/03): Transformation types in stream processing
  • Data Engineering Streaming (data-engineering/022): Shuffle optimization in streaming pipelines

Advertisement

Need Expert PySpark Help?

Get personalized Spark optimization, cluster tuning, or production data pipeline consulting.

Advertisement