šÆ PySpark Partitioning Strategies
DfPartitioning
Partitioning is the process of distributing data across multiple partitions for parallel processing. The goal is to minimize shuffle during joins and aggregations while maintaining balanced partition sizes.
DfHash Partitioning
Hash partitioning assigns each record to a partition based on hash(key) % num_partitions. It provides uniform distribution for well-distributed keys but causes skew when key distribution is non-uniform.
DfRange Partitioning
Range partitioning assigns records to partitions based on key ranges. Each partition covers a contiguous range of keys. It is used for ORDER BY operations and range-based queries but is vulnerable to skew if data is clustered.
Partition Balance Metric
Here,
- =Balance factor (1.0 = perfectly balanced, higher = more skewed)
- =Size (in rows or bytes) of partition i
- =Total number of partitions
Round-robin partitioning (repartition(n)) distributes records evenly across partitions without regard to key values. It is the best choice for eliminating skew and ensuring uniform partition sizes.
When joining two datasets, repartition both by the join key to co-locate matching records on the same executor. This eliminates shuffle during the join: df1.repartition("key").join(df2.repartition("key"), "key").
ThCo-Partitioning Theorem
Theorem: If two datasets are partitioned by the same partitioner with the same number of partitions, joining them requires zero shuffle ā each executor joins its local partitions independently. The total join cost is P Ć (C_{local} + I_{local}) where P is partition count, C_{local} is local compute cost, and I_{local} is local I/O cost.
- Hash partitioning: uniform distribution for well-distributed keys
- Range partitioning: contiguous key ranges for ORDER BY and range queries
- Round-robin: best for eliminating skew (use
repartition(n)) - Co-partition by join key to eliminate shuffle
- Balance factor B = max_partition_size / avg_partition_size; target B ā 1.0
šļø Architecture Diagram
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā PARTITIONING STRATEGIES OVERVIEW ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā¤
ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ROUND-ROBIN PARTITIONING ā ā
ā ā (Equal distribution, no key dependency) ā ā
ā ā ā ā
ā ā Input Data: [A, B, C, D, E, F, G, H] ā ā
ā ā ā ā ā
ā ā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā ā ā
ā ā ā Part 0 ā ā Part 1 ā ā Part 2 ā ā Part 3 ā ā ā
ā ā ā [A, E] ā ā [B, F] ā ā [C, G] ā ā [D, H] ā ā ā
ā ā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā ā ā
ā ā ā ā
ā ā ⢠Each record assigned round-robin ā ā
ā ā ⢠Equal partition sizes ā ā
ā ā ⢠No data locality ā ā
ā ā ⢠Used by repartition(n) ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā HASH PARTITIONING ā ā
ā ā (Key-based distribution) ā ā
ā ā ā ā
ā ā Input Data: [(1,A), (2,B), (3,C), (4,D), ā ā
ā ā (5,E), (6,F), (7,G), (8,H)] ā ā
ā ā ā ā
ā ā hash(key) % num_partitions ā ā
ā ā ā ā
ā ā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā ā ā
ā ā ā Part 0 ā ā Part 1 ā ā Part 2 ā ā Part 3 ā ā ā
ā ā ā hash%4=0ā ā hash%4=1ā ā hash%4=2ā ā hash%4=3ā ā ā
ā ā ā [(1,A)] ā ā [(2,B)] ā ā [(3,C)] ā ā [(4,D)] ā ā ā
ā ā ā [(5,E)] ā ā [(6,F)] ā ā [(7,G)] ā ā [(8,H)] ā ā ā
ā ā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā ā ā
ā ā ā ā
ā ā ⢠Same key always goes to same partition ā ā
ā ā ⢠Enables partition pruning ā ā
ā ā ⢠May cause skew ā ā
ā ā ⢠Used by partitionBy() ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā RANGE PARTITIONING ā ā
ā ā (Ordered distribution) ā ā
ā ā ā ā
ā ā Input Data: [1, 5, 10, 15, 20, 25, 30, 35] ā ā
ā ā ā ā
ā ā Boundaries: [12, 22, 32] ā ā
ā ā ā ā
ā ā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā ā ā
ā ā ā Part 0 ā ā Part 1 ā ā Part 2 ā ā Part 3 ā ā ā
ā ā ā < 12 ā ā 12-22 ā ā 22-32 ā ā >= 32 ā ā ā
ā ā ā [1,5,10]ā ā [15,20] ā ā [25,30] ā ā [35] ā ā ā
ā ā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā ā ā
ā ā ā ā
ā ā ⢠Data ordered across partitions ā ā
ā ā ⢠Enables range pruning ā ā
ā ā ⢠Requires sampling for boundaries ā ā
ā ā ⢠Used by repartition(n, col) with ordering ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā BUCKET PARTITIONING ā ā
ā ā (File-based partitioning for joins) ā ā
ā ā ā ā
ā ā Table A (bucketed by key, 4 buckets): ā ā
ā ā āāāāāāā āāāāāāā āāāāāāā āāāāāāā ā ā
ā ā ā 0 ā ā 1 ā ā 2 ā ā 3 ā ā ā
ā ā āāāāāāā āāāāāāā āāāāāāā āāāāāāā ā ā
ā ā ā ā
ā ā Table B (bucketed by key, 4 buckets): ā ā
ā ā āāāāāāā āāāāāāā āāāāāāā āāāāāāā ā ā
ā ā ā 0 ā ā 1 ā ā 2 ā ā 3 ā ā ā
ā ā āāāāāāā āāāāāāā āāāāāāā āāāāāāā ā ā
ā ā ā ā
ā ā Join: Bucket i joins with Bucket i (no shuffle!) ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā PARTITION OPTIMIZATION PIPELINE ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā¤
ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā STEP 1: ANALYZE DATA DISTRIBUTION ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā ⢠Sample data to understand key distribution ā ā ā
ā ā ā ⢠Identify skew (some keys have 10x more data) ā ā ā
ā ā ā ⢠Calculate partition size estimates ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā
ā ā¼ ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā STEP 2: CHOOSE PARTITIONING STRATEGY ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā ⢠Round-robin: Equal sizes, no key dependency ā ā ā
ā ā ā ⢠Hash: Key-based, enables pruning ā ā ā
ā ā ā ⢠Range: Ordered data, range queries ā ā ā
ā ā ā ⢠Bucket: Pre-partitioned for joins ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā
ā ā¼ ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā STEP 3: DETERMINE PARTITION COUNT ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā ⢠Target: 128MB-200MB per partition ā ā ā
ā ā ā ⢠Formula: partitions = total_size / target_size ā ā ā
ā ā ā ⢠Consider executor cores (2-4 partitions/core) ā ā ā
ā ā ā ⢠Leave headroom for shuffle ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā
ā ā¼ ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā STEP 4: APPLY PARTITIONING ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā ⢠repartition(n): Round-robin, full shuffle ā ā ā
ā ā ā ⢠repartition(n, col): Hash by column ā ā ā
ā ā ā ⢠partitionBy(col): File-based partitioning ā ā ā
ā ā ā ⢠bucketBy(n, col): File-based bucketing ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā
ā ā¼ ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā STEP 5: MONITOR AND TUNE ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā ⢠Check partition sizes in Spark UI ā ā ā
ā ā ā ⢠Look for skewed partitions ā ā ā
ā ā ā ⢠Adjust partition count if needed ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā PARTITION PRUNING AND DATA LOCALITY ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā¤
ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā HASH PARTITIONED TABLE (4 partitions) ā ā
ā ā ā ā
ā ā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā ā ā
ā ā ā Part 0 ā ā Part 1 ā ā Part 2 ā ā Part 3 ā ā ā
ā ā ā key%4=0 ā ā key%4=1 ā ā key%4=2 ā ā key%4=3 ā ā ā
ā ā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā ā ā
ā ā ā ā
ā ā Query: WHERE key = 5 ā ā
ā ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā PARTITION PRUNING: ā ā ā
ā ā ā 5 % 4 = 1 ā Only scan Part 1 ā ā ā
ā ā ā ā
ā ā Before pruning: Scan 4 partitions (100%) ā ā
ā ā After pruning: Scan 1 partition (25%) ā ā
ā ā ā ā
ā ā SPEEDUP: 4x ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā RANGE PARTITIONED TABLE (4 partitions) ā ā
ā ā ā ā
ā ā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā ā ā
ā ā ā Part 0 ā ā Part 1 ā ā Part 2 ā ā Part 3 ā ā ā
ā ā ā 0-999 ā ā 1000-1999ā ā 2000-2999ā ā 3000+ ā ā ā
ā ā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā āāāāāāāāāāā ā ā
ā ā ā ā
ā ā Query: WHERE key BETWEEN 1500 AND 2500 ā ā
ā ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā PARTITION PRUNING: ā ā ā
ā ā ā 1500-2500 spans Part 1 and Part 2 ā ā ā
ā ā ā ā
ā ā Before pruning: Scan 4 partitions (100%) ā ā ā
ā ā After pruning: Scan 2 partitions (50%) ā ā ā
ā ā ā ā
ā ā SPEEDUP: 2x ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā DATA LOCALITY LEVELS ā ā
ā ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā PROCESS_LOCAL: Data in same JVM ā ā ā
ā ā ā ⢠Best performance (no serialization) ā ā ā
ā ā ā ⢠Cache hits, broadcast variables ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā NODE_LOCAL: Data on same node, different JVM ā ā ā
ā ā ā ⢠Good performance (local disk read) ā ā ā
ā ā ā ⢠Same executor, different task ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā RACK_LOCAL: Data on same rack ā ā ā
ā ā ā ⢠Moderate performance (network within rack) ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā ANY: Data on different rack ā ā ā
ā ā ā ⢠Worst performance (cross-rack network) ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
š Detailed Explanation
1. Why Partitioning Matters
Partitioning determines how data is distributed across the cluster. Proper partitioning is critical for:
- Performance: Reduces data shuffling and network I/O
- Parallelism: Enables concurrent processing across executors
- Data locality: Keeps related data together for efficient joins
- Query optimization: Enables partition pruning
2. Round-Robin Partitioning
Round-robin partitioning distributes data evenly across partitions regardless of key values. Each record is assigned to partitions in a circular order.
Characteristics:
- Equal partition sizes
- No data locality
- Simple distribution
- Used by
repartition(n)
When to use:
- When data is evenly distributed and you need more partitions
- When you don't have a natural partition key
- For load balancing across executors
3. Hash Partitioning
Hash partitioning assigns records to partitions based on the hash of a key value. The same key always goes to the same partition.
Formula: partition = hash(key) % num_partitions
Characteristics:
- Same key ā same partition
- Enables partition pruning
- May cause data skew
- Used by
partitionBy(),repartition(n, col)
When to use:
- When you frequently filter or join on a specific key
- When you want to co-locate related data
- For partition-level operations
4. Range Partitioning
Range partitioning divides data into ordered ranges. Each partition contains records within a specific value range.
Characteristics:
- Data ordered across partitions
- Enables range pruning
- Requires boundary calculation
- Used for ordered data
When to use:
- When you frequently query ranges
- For time-series data
- For ordered analytics
5. Bucket Partitioning
Bucket partitioning is a file-based partitioning scheme that divides data into a fixed number of files based on hash of a key. Unlike runtime partitioning, bucketing is applied at write time.
Characteristics:
- Fixed number of files per table
- Files are pre-partitioned by key
- Enables bucket joins (no shuffle)
- Persistent across writes
When to use:
- When you frequently join two large tables
- When you want to avoid shuffle in joins
- For repeated join operations
6. Partition Count Optimization
The number of partitions should be based on:
- Data size: Target 128MB-200MB per partition
- Executor cores: 2-4 partitions per core
- Cluster size: Total partitions = executors Ć cores Ć 2-4
- Operation type: More partitions for parallel operations
Formula:
optimal_partitions = max(total_data_size / target_partition_size,
num_executors * cores_per_executor * 2)
7. Data Skew and Mitigation
Data skew occurs when some partitions have significantly more data than others. This causes:
- Longer task durations for skewed partitions
- Underutilization of cluster resources
- Memory pressure on executors processing skewed data
Detection:
- Monitor task duration in Spark UI
- Check partition sizes
- Use
df.groupBy("partition").count()
Mitigation:
- Salting: Add random prefix to skewed keys
- Broadcast join: Avoid shuffle for small tables
- Adaptive Query Execution: Automatic skew handling
8. Partition Pruning
Partition pruning eliminates unnecessary partition reads based on filter predicates. This is one of the most effective optimization techniques.
How it works:
- Filter predicate is analyzed
- Matching partitions are identified
- Only matching partitions are read
- Other partitions are skipped entirely
Example:
-- Table partitioned by year, month
SELECT * FROM events
WHERE year = 2024 AND month = 1
-- Only reads partitions for year=2024, month=1
-- Skips all other partitions
š Key Concepts Table
| Strategy | Distribution | Pruning | Skew Risk | Use Case |
|---|---|---|---|---|
| Round-Robin | Equal sizes | None | Low | Load balancing |
| Hash | Key-based | Yes (point) | Medium | Key-based queries |
| Range | Ordered | Yes (range) | Low-Medium | Range queries |
| Bucket | File-based | Yes (join) | Medium | Repeated joins |
| Dynamic | Runtime | Yes (column) | Variable | Adaptive queries |
š» Code Examples
Example 1: Basic Partitioning
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("PartitioningStrategies").getOrCreate()
# Create sample data
df = spark.range(1000000).withColumn(
"key", col("id") % 100
).withColumn(
"value", concat(lit("value_"), col("id"))
)
print(f"Initial partitions: {df.rdd.getNumPartitions()}")
# Round-robin repartition (equal sizes)
df_rr = df.repartition(20)
print(f"After repartition(20): {df_rr.rdd.getNumPartitions()}")
# Hash repartition by key
df_hash = df.repartition(20, "key")
print(f"After repartition(20, key): {df_hash.rdd.getNumPartitions()}")
# Coalesce to reduce partitions
df_coalesced = df_rr.coalesce(10)
print(f"After coalesce(10): {df_coalesced.rdd.getNumPartitions()}")
# Check partition sizes
def get_partition_sizes(rdd):
return rdd.mapPartitions(lambda it: [sum(1 for _ in it)]).collect()
print(f"Partition sizes (rr): {get_partition_sizes(df_rr.rdd)[:5]}...")
Example 2: Hash Partitioning for Joins
# Create two large DataFrames
users = spark.range(1000000).withColumn(
"user_id", col("id")
).withColumn(
"name", concat(lit("user_"), col("id"))
)
orders = spark.range(5000000).withColumn(
"order_id", col("id")
).withColumn(
"user_id", col("id") % 1000000
).withColumn(
"amount", (col("id") * 10.0) % 1000
)
# Method 1: Repartition both by join key
users_repartitioned = users.repartition(100, "user_id")
orders_repartitioned = orders.repartition(100, "user_id")
# Join (no shuffle if both are partitioned by same key)
result = users_repartitioned.join(orders_repartitioned, "user_id")
result.explain()
# Method 2: Use bucketing for persistent partitioning
users.write \
.bucketBy(50, "user_id") \
.sortBy("user_id") \
.saveAsTable("users_bucketed")
orders.write \
.bucketBy(50, "user_id") \
.sortBy("user_id") \
.saveAsTable("orders_bucketed")
# Join bucketed tables (no shuffle!)
users_bucketed = spark.table("users_bucketed")
orders_bucketed = spark.table("orders_bucketed")
result_bucketed = users_bucketed.join(orders_bucketed, "user_id")
result_bucketed.explain()
Example 3: Range Partitioning
# Create time-series data
from pyspark.sql.types import TimestampType
from datetime import datetime, timedelta
# Generate time-series data
base_date = datetime(2024, 1, 1)
time_series = spark.range(1000000).withColumn(
"timestamp",
(lit(base_date.cast("long")) + col("id") * 3600).cast("timestamp")
).withColumn(
"value", col("id") * 1.0
)
# Range partition by timestamp
# First, compute boundaries
boundaries = time_series.select(
percentile_approx("timestamp", [0.25, 0.5, 0.75])
).collect()[0]
print(f"Range boundaries: {boundaries}")
# Repartition by range (approximate)
df_range = time_series.repartitionByRange(4, "timestamp")
# Check distribution
df_range.groupBy(
floor(unix_timestamp("timestamp") / (365 * 24 * 3600)).alias("year")
).count().show()
Example 4: Partition Pruning
# Create partitioned table
events = spark.range(1000000).withColumn(
"event_id", col("id")
).withColumn(
"event_date", date_add(lit("2024-01-01"), (col("id") % 365).cast("int"))
).withColumn(
"user_id", col("id") % 10000
)
# Write with partitioning
events.write \
.partitionBy("event_date") \
.mode("overwrite") \
.parquet("events_partitioned/")
# Read and query with partition pruning
events_df = spark.read.parquet("events_partitioned/")
# This query only reads partitions for January 2024
result = events_df.filter(
col("event_date").between("2024-01-01", "2024-01-31")
)
# Check physical plan for partition pruning
result.explain()
# Verify partition pruning in Spark UI
# Look for "PartitionFilters" in the scan
š Performance Metrics
| Strategy | 1GB Data | 10GB Data | Shuffle (MB) | Pruning Speedup |
|---|---|---|---|---|
| Round-Robin | 5.0s | 45s | 1000 | None |
| Hash (100 parts) | 4.5s | 40s | 1000 | 10-50x |
| Range (100 parts) | 4.8s | 42s | 1000 | 5-20x |
| Bucket (50 buckets) | 3.0s | 25s | 0 (join) | 10-50x |
| Coalesce (ā) | 3.5s | 30s | 0 | None |
| Repartition (ā) | 6.0s | 55s | 1000 | None |
ā Best Practices
1. Choose Right Partition Count
# Target 128MB-200MB per partition
data_size_gb = 10 # 10GB
target_partition_mb = 128
optimal_partitions = int(data_size_gb * 1024 / target_partition_mb)
print(f"Optimal partitions: {optimal_partitions}") # ~80
# Consider executor cores
num_executors = 10
cores_per_executor = 4
max_partitions = num_executors * cores_per_executor * 4
print(f"Max partitions: {max_partitions}") # 160
2. Use Hash Partitioning for Joins
# Partition both DataFrames by join key
df1_partitioned = df1.repartition(100, "join_key")
df2_partitioned = df2.repartition(100, "join_key")
# Join (no shuffle)
result = df1_partitioned.join(df2_partitioned, "join_key")
3. Use Bucketing for Repeated Joins
# Write bucketed tables
df1.write.bucketBy(100, "key").sortBy("key").saveAsTable("t1_bucketed")
df2.write.bucketBy(100, "key").sortBy("key").saveAsTable("t2_bucketed")
# Join bucketed tables (no shuffle)
result = spark.table("t1_bucketed").join(spark.table("t2_bucketed"), "key")
4. Partition Large Tables for Query Optimization
# Partition by frequently filtered columns
df.write.partitionBy("year", "month").parquet("output/")
# Enables partition pruning
df_filtered = spark.read.parquet("output/").filter(
(col("year") == 2024) & (col("month") == 1)
)
5. Monitor Partition Distribution
# Check partition sizes
partition_sizes = df.rdd.mapPartitions(
lambda it: [sum(1 for _ in it)]
).collect()
print(f"Partition sizes: {partition_sizes}")
print(f"Min: {min(partition_sizes)}, Max: {max(partition_sizes)}")
print(f"Skew ratio: {max(partition_sizes) / min(partition_sizes):.2f}")
6. Avoid Over-Partitioning
# BAD: Too many small partitions
df.repartition(10000) # Creates 10K tiny partitions
# GOOD: Right-sized partitions
df.repartition(100) # Creates 100 manageable partitions
See Also
- Kafka Streams (kafka/03): Partitioning in Kafka stream processing
- Data Engineering Streaming (data-engineering/022): Partitioning strategies for streaming workloads