PySpark Partitioning Strategies: Hash, Range, and Round-Robin

Free Lesson

Advertisement

šŸŽÆ PySpark Partitioning Strategies

DfPartitioning

Partitioning is the process of distributing data across multiple partitions for parallel processing. The goal is to minimize shuffle during joins and aggregations while maintaining balanced partition sizes.

DfHash Partitioning

Hash partitioning assigns each record to a partition based on hash(key) % num_partitions. It provides uniform distribution for well-distributed keys but causes skew when key distribution is non-uniform.

DfRange Partitioning

Range partitioning assigns records to partitions based on key ranges. Each partition covers a contiguous range of keys. It is used for ORDER BY operations and range-based queries but is vulnerable to skew if data is clustered.

Partitioni=hash(key)modPPartition_i = hash(key) \\mod P

Partition Balance Metric

B=fracmax(S1,...,SP)frac1Psumi=1PSiB = \\frac{\\max(S_1, ..., S_P)}{\\frac{1}{P} \\sum_{i=1}^{P} S_i}

Here,

  • BB=Balance factor (1.0 = perfectly balanced, higher = more skewed)
  • SiS_i=Size (in rows or bytes) of partition i
  • PP=Total number of partitions

Round-robin partitioning (repartition(n)) distributes records evenly across partitions without regard to key values. It is the best choice for eliminating skew and ensuring uniform partition sizes.

When joining two datasets, repartition both by the join key to co-locate matching records on the same executor. This eliminates shuffle during the join: df1.repartition("key").join(df2.repartition("key"), "key").

ThCo-Partitioning Theorem

Theorem: If two datasets are partitioned by the same partitioner with the same number of partitions, joining them requires zero shuffle — each executor joins its local partitions independently. The total join cost is P Ɨ (C_{local} + I_{local}) where P is partition count, C_{local} is local compute cost, and I_{local} is local I/O cost.

  • Hash partitioning: uniform distribution for well-distributed keys
  • Range partitioning: contiguous key ranges for ORDER BY and range queries
  • Round-robin: best for eliminating skew (use repartition(n))
  • Co-partition by join key to eliminate shuffle
  • Balance factor B = max_partition_size / avg_partition_size; target B ā‰ˆ 1.0

šŸ—ļø Architecture Diagram

Architecture Diagram
ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
│                 PARTITIONING STRATEGIES OVERVIEW                  │
ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤
│                                                                 │
│  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”   │
│  │              ROUND-ROBIN PARTITIONING                    │   │
│  │  (Equal distribution, no key dependency)                 │   │
│  │                                                          │   │
│  │  Input Data: [A, B, C, D, E, F, G, H]                 │   │
│  │              ↓                                          │   │
│  │  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”     │   │
│  │  │ Part 0  │ │ Part 1  │ │ Part 2  │ │ Part 3  │     │   │
│  │  │ [A, E]  │ │ [B, F]  │ │ [C, G]  │ │ [D, H]  │     │   │
│  │  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜     │   │
│  │                                                          │   │
│  │  • Each record assigned round-robin                      │   │
│  │  • Equal partition sizes                                 │   │
│  │  • No data locality                                      │   │
│  │  • Used by repartition(n)                               │   │
│  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜   │
│                                                                 │
│  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”   │
│  │              HASH PARTITIONING                           │   │
│  │  (Key-based distribution)                                │   │
│  │                                                          │   │
│  │  Input Data: [(1,A), (2,B), (3,C), (4,D),              │   │
│  │               (5,E), (6,F), (7,G), (8,H)]              │   │
│  │                                                          │   │
│  │  hash(key) % num_partitions                              │   │
│  │                                                          │   │
│  │  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”     │   │
│  │  │ Part 0  │ │ Part 1  │ │ Part 2  │ │ Part 3  │     │   │
│  │  │ hash%4=0│ │ hash%4=1│ │ hash%4=2│ │ hash%4=3│     │   │
│  │  │ [(1,A)] │ │ [(2,B)] │ │ [(3,C)] │ │ [(4,D)] │     │   │
│  │  │ [(5,E)] │ │ [(6,F)] │ │ [(7,G)] │ │ [(8,H)] │     │   │
│  │  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜     │   │
│  │                                                          │   │
│  │  • Same key always goes to same partition                 │   │
│  │  • Enables partition pruning                             │   │
│  │  • May cause skew                                         │   │
│  │  • Used by partitionBy()                                │   │
│  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜   │
│                                                                 │
│  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”   │
│  │              RANGE PARTITIONING                          │   │
│  │  (Ordered distribution)                                  │   │
│  │                                                          │   │
│  │  Input Data: [1, 5, 10, 15, 20, 25, 30, 35]           │   │
│  │                                                          │   │
│  │  Boundaries: [12, 22, 32]                               │   │
│  │                                                          │   │
│  │  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”     │   │
│  │  │ Part 0  │ │ Part 1  │ │ Part 2  │ │ Part 3  │     │   │
│  │  │ < 12    │ │ 12-22   │ │ 22-32   │ │ >= 32   │     │   │
│  │  │ [1,5,10]│ │ [15,20] │ │ [25,30] │ │ [35]    │     │   │
│  │  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜     │   │
│  │                                                          │   │
│  │  • Data ordered across partitions                         │   │
│  │  • Enables range pruning                                 │   │
│  │  • Requires sampling for boundaries                      │   │
│  │  • Used by repartition(n, col) with ordering            │   │
│  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜   │
│                                                                 │
│  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”   │
│  │              BUCKET PARTITIONING                         │   │
│  │  (File-based partitioning for joins)                     │   │
│  │                                                          │   │
│  │  Table A (bucketed by key, 4 buckets):                  │   │
│  │  ā”Œā”€ā”€ā”€ā”€ā”€ā” ā”Œā”€ā”€ā”€ā”€ā”€ā” ā”Œā”€ā”€ā”€ā”€ā”€ā” ā”Œā”€ā”€ā”€ā”€ā”€ā”                     │   │
│  │  │  0  │ │  1  │ │  2  │ │  3  │                     │   │
│  │  ā””ā”€ā”€ā”€ā”€ā”€ā”˜ ā””ā”€ā”€ā”€ā”€ā”€ā”˜ ā””ā”€ā”€ā”€ā”€ā”€ā”˜ ā””ā”€ā”€ā”€ā”€ā”€ā”˜                     │   │
│  │                                                          │   │
│  │  Table B (bucketed by key, 4 buckets):                  │   │
│  │  ā”Œā”€ā”€ā”€ā”€ā”€ā” ā”Œā”€ā”€ā”€ā”€ā”€ā” ā”Œā”€ā”€ā”€ā”€ā”€ā” ā”Œā”€ā”€ā”€ā”€ā”€ā”                     │   │
│  │  │  0  │ │  1  │ │  2  │ │  3  │                     │   │
│  │  ā””ā”€ā”€ā”€ā”€ā”€ā”˜ ā””ā”€ā”€ā”€ā”€ā”€ā”˜ ā””ā”€ā”€ā”€ā”€ā”€ā”˜ ā””ā”€ā”€ā”€ā”€ā”€ā”˜                     │   │
│  │                                                          │   │
│  │  Join: Bucket i joins with Bucket i (no shuffle!)       │   │
│  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜   │
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
Architecture Diagram
ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
│              PARTITION OPTIMIZATION PIPELINE                     │
ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤
│                                                                 │
│  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”   │
│  │ STEP 1: ANALYZE DATA DISTRIBUTION                        │   │
│  │  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”   │   │
│  │  │  • Sample data to understand key distribution    │   │   │
│  │  │  • Identify skew (some keys have 10x more data)  │   │   │
│  │  │  • Calculate partition size estimates             │   │   │
│  │  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜   │   │
│  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜   │
│                          │                                      │
│                          ā–¼                                      │
│  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”   │
│  │ STEP 2: CHOOSE PARTITIONING STRATEGY                    │   │
│  │  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”   │   │
│  │  │  • Round-robin: Equal sizes, no key dependency   │   │   │
│  │  │  • Hash: Key-based, enables pruning              │   │   │
│  │  │  • Range: Ordered data, range queries            │   │   │
│  │  │  • Bucket: Pre-partitioned for joins             │   │   │
│  │  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜   │   │
│  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜   │
│                          │                                      │
│                          ā–¼                                      │
│  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”   │
│  │ STEP 3: DETERMINE PARTITION COUNT                        │   │
│  │  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”   │   │
│  │  │  • Target: 128MB-200MB per partition              │   │   │
│  │  │  • Formula: partitions = total_size / target_size │   │   │
│  │  │  • Consider executor cores (2-4 partitions/core)  │   │   │
│  │  │  • Leave headroom for shuffle                      │   │   │
│  │  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜   │   │
│  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜   │
│                          │                                      │
│                          ā–¼                                      │
│  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”   │
│  │ STEP 4: APPLY PARTITIONING                               │   │
│  │  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”   │   │
│  │  │  • repartition(n): Round-robin, full shuffle     │   │   │
│  │  │  • repartition(n, col): Hash by column           │   │   │
│  │  │  • partitionBy(col): File-based partitioning     │   │   │
│  │  │  • bucketBy(n, col): File-based bucketing        │   │   │
│  │  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜   │   │
│  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜   │
│                          │                                      │
│                          ā–¼                                      │
│  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”   │
│  │ STEP 5: MONITOR AND TUNE                                 │   │
│  │  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”   │   │
│  │  │  • Check partition sizes in Spark UI             │   │   │
│  │  │  • Look for skewed partitions                     │   │   │
│  │  │  • Adjust partition count if needed               │   │   │
│  │  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜   │   │
│  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜   │
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
Architecture Diagram
ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
│          PARTITION PRUNING AND DATA LOCALITY                     │
ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤
│                                                                 │
│  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”   │
│  │  HASH PARTITIONED TABLE (4 partitions)                   │   │
│  │                                                          │   │
│  │  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”     │   │
│  │  │ Part 0  │ │ Part 1  │ │ Part 2  │ │ Part 3  │     │   │
│  │  │ key%4=0 │ │ key%4=1 │ │ key%4=2 │ │ key%4=3 │     │   │
│  │  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜     │   │
│  │                                                          │   │
│  │  Query: WHERE key = 5                                    │   │
│  │                                                          │   │
│  │  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”   │   │
│  │  │ PARTITION PRUNING:                                │   │   │
│  │  │ 5 % 4 = 1 → Only scan Part 1                    │   │   │
│  │                                                          │   │
│  │  Before pruning: Scan 4 partitions (100%)              │   │
│  │  After pruning:  Scan 1 partition  (25%)               │   │
│  │                                                          │   │
│  │  SPEEDUP: 4x                                            │   │
│  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜   │
│                                                                 │
│  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”   │
│  │  RANGE PARTITIONED TABLE (4 partitions)                  │   │
│  │                                                          │   │
│  │  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”     │   │
│  │  │ Part 0  │ │ Part 1  │ │ Part 2  │ │ Part 3  │     │   │
│  │  │ 0-999   │ │ 1000-1999│ │ 2000-2999│ │ 3000+  │     │   │
│  │  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜     │   │
│  │                                                          │   │
│  │  Query: WHERE key BETWEEN 1500 AND 2500                 │   │
│  │                                                          │   │
│  │  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”   │   │
│  │  │ PARTITION PRUNING:                                │   │   │
│  │  │ 1500-2500 spans Part 1 and Part 2               │   │   │
│  │                                                          │   │
│  │  Before pruning: Scan 4 partitions (100%)              │   │   │
│  │  After pruning:  Scan 2 partitions (50%)               │   │   │
│  │                                                          │   │
│  │  SPEEDUP: 2x                                            │   │
│  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜   │
│                                                                 │
│  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”   │
│  │  DATA LOCALITY LEVELS                                    │   │
│  │                                                          │   │
│  │  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”   │   │
│  │  │  PROCESS_LOCAL: Data in same JVM                 │   │   │
│  │  │  • Best performance (no serialization)           │   │   │
│  │  │  • Cache hits, broadcast variables               │   │   │
│  │  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜   │   │
│  │  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”   │   │
│  │  │  NODE_LOCAL: Data on same node, different JVM    │   │   │
│  │  │  • Good performance (local disk read)            │   │   │
│  │  │  • Same executor, different task                 │   │   │
│  │  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜   │   │
│  │  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”   │   │
│  │  │  RACK_LOCAL: Data on same rack                   │   │   │
│  │  │  • Moderate performance (network within rack)    │   │   │
│  │  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜   │   │
│  │  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”   │   │
│  │  │  ANY: Data on different rack                     │   │   │
│  │  │  • Worst performance (cross-rack network)        │   │   │
│  │  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜   │   │
│  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜   │
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜

šŸ“š Detailed Explanation

1. Why Partitioning Matters

Partitioning determines how data is distributed across the cluster. Proper partitioning is critical for:

  • Performance: Reduces data shuffling and network I/O
  • Parallelism: Enables concurrent processing across executors
  • Data locality: Keeps related data together for efficient joins
  • Query optimization: Enables partition pruning

2. Round-Robin Partitioning

Round-robin partitioning distributes data evenly across partitions regardless of key values. Each record is assigned to partitions in a circular order.

Characteristics:

  • Equal partition sizes
  • No data locality
  • Simple distribution
  • Used by repartition(n)

When to use:

  • When data is evenly distributed and you need more partitions
  • When you don't have a natural partition key
  • For load balancing across executors

3. Hash Partitioning

Hash partitioning assigns records to partitions based on the hash of a key value. The same key always goes to the same partition.

Formula: partition = hash(key) % num_partitions

Characteristics:

  • Same key → same partition
  • Enables partition pruning
  • May cause data skew
  • Used by partitionBy(), repartition(n, col)

When to use:

  • When you frequently filter or join on a specific key
  • When you want to co-locate related data
  • For partition-level operations

4. Range Partitioning

Range partitioning divides data into ordered ranges. Each partition contains records within a specific value range.

Characteristics:

  • Data ordered across partitions
  • Enables range pruning
  • Requires boundary calculation
  • Used for ordered data

When to use:

  • When you frequently query ranges
  • For time-series data
  • For ordered analytics

5. Bucket Partitioning

Bucket partitioning is a file-based partitioning scheme that divides data into a fixed number of files based on hash of a key. Unlike runtime partitioning, bucketing is applied at write time.

Characteristics:

  • Fixed number of files per table
  • Files are pre-partitioned by key
  • Enables bucket joins (no shuffle)
  • Persistent across writes

When to use:

  • When you frequently join two large tables
  • When you want to avoid shuffle in joins
  • For repeated join operations

6. Partition Count Optimization

The number of partitions should be based on:

  • Data size: Target 128MB-200MB per partition
  • Executor cores: 2-4 partitions per core
  • Cluster size: Total partitions = executors Ɨ cores Ɨ 2-4
  • Operation type: More partitions for parallel operations

Formula:

Architecture Diagram
optimal_partitions = max(total_data_size / target_partition_size, 
                        num_executors * cores_per_executor * 2)

7. Data Skew and Mitigation

Data skew occurs when some partitions have significantly more data than others. This causes:

  • Longer task durations for skewed partitions
  • Underutilization of cluster resources
  • Memory pressure on executors processing skewed data

Detection:

  • Monitor task duration in Spark UI
  • Check partition sizes
  • Use df.groupBy("partition").count()

Mitigation:

  • Salting: Add random prefix to skewed keys
  • Broadcast join: Avoid shuffle for small tables
  • Adaptive Query Execution: Automatic skew handling

8. Partition Pruning

Partition pruning eliminates unnecessary partition reads based on filter predicates. This is one of the most effective optimization techniques.

How it works:

  1. Filter predicate is analyzed
  2. Matching partitions are identified
  3. Only matching partitions are read
  4. Other partitions are skipped entirely

Example:

-- Table partitioned by year, month
SELECT * FROM events 
WHERE year = 2024 AND month = 1

-- Only reads partitions for year=2024, month=1
-- Skips all other partitions

šŸ”‘ Key Concepts Table

StrategyDistributionPruningSkew RiskUse Case
Round-RobinEqual sizesNoneLowLoad balancing
HashKey-basedYes (point)MediumKey-based queries
RangeOrderedYes (range)Low-MediumRange queries
BucketFile-basedYes (join)MediumRepeated joins
DynamicRuntimeYes (column)VariableAdaptive queries

šŸ’» Code Examples

Example 1: Basic Partitioning

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("PartitioningStrategies").getOrCreate()

# Create sample data
df = spark.range(1000000).withColumn(
    "key", col("id") % 100
).withColumn(
    "value", concat(lit("value_"), col("id"))
)

print(f"Initial partitions: {df.rdd.getNumPartitions()}")

# Round-robin repartition (equal sizes)
df_rr = df.repartition(20)
print(f"After repartition(20): {df_rr.rdd.getNumPartitions()}")

# Hash repartition by key
df_hash = df.repartition(20, "key")
print(f"After repartition(20, key): {df_hash.rdd.getNumPartitions()}")

# Coalesce to reduce partitions
df_coalesced = df_rr.coalesce(10)
print(f"After coalesce(10): {df_coalesced.rdd.getNumPartitions()}")

# Check partition sizes
def get_partition_sizes(rdd):
    return rdd.mapPartitions(lambda it: [sum(1 for _ in it)]).collect()

print(f"Partition sizes (rr): {get_partition_sizes(df_rr.rdd)[:5]}...")

Example 2: Hash Partitioning for Joins

# Create two large DataFrames
users = spark.range(1000000).withColumn(
    "user_id", col("id")
).withColumn(
    "name", concat(lit("user_"), col("id"))
)

orders = spark.range(5000000).withColumn(
    "order_id", col("id")
).withColumn(
    "user_id", col("id") % 1000000
).withColumn(
    "amount", (col("id") * 10.0) % 1000
)

# Method 1: Repartition both by join key
users_repartitioned = users.repartition(100, "user_id")
orders_repartitioned = orders.repartition(100, "user_id")

# Join (no shuffle if both are partitioned by same key)
result = users_repartitioned.join(orders_repartitioned, "user_id")
result.explain()

# Method 2: Use bucketing for persistent partitioning
users.write \
    .bucketBy(50, "user_id") \
    .sortBy("user_id") \
    .saveAsTable("users_bucketed")

orders.write \
    .bucketBy(50, "user_id") \
    .sortBy("user_id") \
    .saveAsTable("orders_bucketed")

# Join bucketed tables (no shuffle!)
users_bucketed = spark.table("users_bucketed")
orders_bucketed = spark.table("orders_bucketed")
result_bucketed = users_bucketed.join(orders_bucketed, "user_id")
result_bucketed.explain()

Example 3: Range Partitioning

# Create time-series data
from pyspark.sql.types import TimestampType
from datetime import datetime, timedelta

# Generate time-series data
base_date = datetime(2024, 1, 1)
time_series = spark.range(1000000).withColumn(
    "timestamp", 
    (lit(base_date.cast("long")) + col("id") * 3600).cast("timestamp")
).withColumn(
    "value", col("id") * 1.0
)

# Range partition by timestamp
# First, compute boundaries
boundaries = time_series.select(
    percentile_approx("timestamp", [0.25, 0.5, 0.75])
).collect()[0]

print(f"Range boundaries: {boundaries}")

# Repartition by range (approximate)
df_range = time_series.repartitionByRange(4, "timestamp")

# Check distribution
df_range.groupBy(
    floor(unix_timestamp("timestamp") / (365 * 24 * 3600)).alias("year")
).count().show()

Example 4: Partition Pruning

# Create partitioned table
events = spark.range(1000000).withColumn(
    "event_id", col("id")
).withColumn(
    "event_date", date_add(lit("2024-01-01"), (col("id") % 365).cast("int"))
).withColumn(
    "user_id", col("id") % 10000
)

# Write with partitioning
events.write \
    .partitionBy("event_date") \
    .mode("overwrite") \
    .parquet("events_partitioned/")

# Read and query with partition pruning
events_df = spark.read.parquet("events_partitioned/")

# This query only reads partitions for January 2024
result = events_df.filter(
    col("event_date").between("2024-01-01", "2024-01-31")
)

# Check physical plan for partition pruning
result.explain()

# Verify partition pruning in Spark UI
# Look for "PartitionFilters" in the scan

šŸ“Š Performance Metrics

Strategy1GB Data10GB DataShuffle (MB)Pruning Speedup
Round-Robin5.0s45s1000None
Hash (100 parts)4.5s40s100010-50x
Range (100 parts)4.8s42s10005-20x
Bucket (50 buckets)3.0s25s0 (join)10-50x
Coalesce (↓)3.5s30s0None
Repartition (↑)6.0s55s1000None

āœ… Best Practices

1. Choose Right Partition Count

# Target 128MB-200MB per partition
data_size_gb = 10  # 10GB
target_partition_mb = 128
optimal_partitions = int(data_size_gb * 1024 / target_partition_mb)
print(f"Optimal partitions: {optimal_partitions}")  # ~80

# Consider executor cores
num_executors = 10
cores_per_executor = 4
max_partitions = num_executors * cores_per_executor * 4
print(f"Max partitions: {max_partitions}")  # 160

2. Use Hash Partitioning for Joins

# Partition both DataFrames by join key
df1_partitioned = df1.repartition(100, "join_key")
df2_partitioned = df2.repartition(100, "join_key")

# Join (no shuffle)
result = df1_partitioned.join(df2_partitioned, "join_key")

3. Use Bucketing for Repeated Joins

# Write bucketed tables
df1.write.bucketBy(100, "key").sortBy("key").saveAsTable("t1_bucketed")
df2.write.bucketBy(100, "key").sortBy("key").saveAsTable("t2_bucketed")

# Join bucketed tables (no shuffle)
result = spark.table("t1_bucketed").join(spark.table("t2_bucketed"), "key")

4. Partition Large Tables for Query Optimization

# Partition by frequently filtered columns
df.write.partitionBy("year", "month").parquet("output/")

# Enables partition pruning
df_filtered = spark.read.parquet("output/").filter(
    (col("year") == 2024) & (col("month") == 1)
)

5. Monitor Partition Distribution

# Check partition sizes
partition_sizes = df.rdd.mapPartitions(
    lambda it: [sum(1 for _ in it)]
).collect()

print(f"Partition sizes: {partition_sizes}")
print(f"Min: {min(partition_sizes)}, Max: {max(partition_sizes)}")
print(f"Skew ratio: {max(partition_sizes) / min(partition_sizes):.2f}")

6. Avoid Over-Partitioning

# BAD: Too many small partitions
df.repartition(10000)  # Creates 10K tiny partitions

# GOOD: Right-sized partitions
df.repartition(100)  # Creates 100 manageable partitions

See Also

  • Kafka Streams (kafka/03): Partitioning in Kafka stream processing
  • Data Engineering Streaming (data-engineering/022): Partitioning strategies for streaming workloads

Advertisement

Need Expert PySpark Help?

Get personalized Spark optimization, cluster tuning, or production data pipeline consulting.

Advertisement