πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Partitioning: Hash, Range, Coalesce, Repartition, Salting

Apache SparkPartitioning⭐ Premium

Advertisement

Partitioning: Hash, Range, Coalesce, Repartition, Salting

Difficulty: Expert | Companies: LinkedIn, Uber, Airbnb, Stripe, Netflix

ℹ️Interview Context

Partitioning is fundamental to Spark performance. Interviewers test understanding of when to use different partitioning strategies and how partitioning affects shuffle, memory, and task scheduling.

Question

Explain the difference between hash partitioning, range partitioning, coalesce, and repartition. When would you use each? How does partitioning affect performance in terms of data locality, shuffle, and task parallelism? Provide mathematical analysis of partition skew.


Detailed Answer

1. Partitioning Strategies Overview

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("PartitioningDeepDive") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

df = spark.range(1000000)  # 1M rows

# Default partitioning: 200 partitions (spark.sql.shuffle.partitions)
print(f"Default partitions: {df.rdd.getNumPartitions()}")  # 200

2. Hash Partitioning

# Hash Partitioning: Distributes data based on hash of key
# Deterministic: same key always goes to same partition

# Application 1: DataFrame.repartition
df_hash = df.repartition(100, "id")
print(f"After repartition: {df_hash.rdd.getNumPartitions()}")  # 100

# Application 2: SQL shuffle operations
# groupBy, join, DISTINCT all use hash partitioning
result = df.groupBy("id").count()  # Hash partitions by "id"

# Mathematical properties:
# Let P = number of partitions, N = number of rows
# Hash function: partition_id = hash(key) % P
#
# Ideal distribution:
# Each partition has N/P rows
# Each key maps to exactly one partition
# Shuffle write: O(N) total, O(N/P) per partition
#
# Actual distribution (with skew):
# Some partitions have N/P Γ— (1 + skew_factor) rows
# Skew factor depends on key distribution

# Hash Partitioning with salt (for join optimization):
salt_range = 100
df_salted = df.withColumn(
    "salt", (F.rand() * salt_range).cast("int")
).withColumn(
    "salted_id", F.concat(F.col("id"), F.lit("_"), F.col("salt"))
).repartition(100, "salted_id")

# Now each original key is distributed across 100 partitions
# Useful for: avoiding shuffle skew in joins

3. Range Partitioning

# Range Partitioning: Distributes data based on value ranges
# Used for: ORDER BY, range-based joins, time-series data

# Application 1: Window functions with ORDER BY
window = Window.partitionBy("category").orderBy("timestamp")
df.withColumn("rank", F.row_number().over(window))

# Application 2: Coalesce by range for sorted output
df_sorted = df.repartitionAndSortWithinPartitions(100)

# Mathematical properties:
# Let K = sort key, P = partitions
# Partition boundaries: [K_0, K_1, ..., K_P] where K_0 = min(K), K_P = max(K)
# Partition i contains rows where K_{i-1} ≀ K < K_i
#
# Ideal partition sizes:
# If data is uniformly distributed: each partition has N/P rows
# If data is skewed: partition sizes vary significantly
#
# Range partitioner sampling:
# 1. Sample data to estimate key distribution
# 2. Compute quantiles: q_i = percentile(K, i/P Γ— 100) for i = 1..P-1
# 3. Partition boundaries = [min(K), q_1, q_2, ..., q_{P-1}, max(K)]

# Example: time-series partitioning
from datetime import datetime, timedelta

dates = [datetime(2024, 1, 1) + timedelta(days=i) for i in range(365)]
df_time = spark.createDataFrame(
    [(d, f"user_{i%1000}") for d in dates for i in range(1000)],
    ["date", "user_id"]
)

# Partition by date range (monthly)
df_time_monthly = df_time.withColumn(
    "month", F.date_format("date", "yyyy-MM")
).repartition(12, "month")  # 12 partitions, one per month

4. Coalesce

# Coalesce: Reduce partition count without full shuffle
# Merges existing partitions (no data movement between executors)

# Application: Reduce partitions before write
df.repartition(1000)  # Creates 1000 partitions (shuffle)
    .coalesce(10)     # Merges to 10 partitions (no shuffle)

# Coalesce mechanics:
# 1. Partitions are assigned to new partition IDs
# 2. No shuffle: data stays on same executor
# 3. New partition = concatenation of old partitions
# 4. Memory risk: new partition may be too large!

# Mathematical analysis:
# Let P_old = current partitions, P_new = target partitions (P_new < P_old)
# Each new partition = P_old / P_new old partitions
# New partition size = P_old/P_new Γ— old_partition_size
#
# Example: 1000 partitions Γ— 100MB = 100GB total
# After coalesce to 10: 10 partitions Γ— 10GB each
# Risk: 10GB may exceed executor memory!

# Safe coalesce formula:
# new_partition_size = total_data_size / P_new
# Requirement: new_partition_size < executor_memory Γ— memory_fraction
# P_new > total_data_size / (executor_memory Γ— memory_fraction)
#
# For 100GB data, 8GB executor, 0.6 memory fraction:
# P_new > 100GB / (8GB Γ— 0.6) = 20.8
# So P_new β‰₯ 21 partitions minimum

# Coalesce vs Repartition:
# Coalesce: O(1) overhead, merges adjacent partitions
# Repartition: O(N) overhead, full shuffle, even distribution

# Best practice:
df.coalesce(100)  # Good: reduce partitions before write
    .write.parquet("output/")

# Avoid:
df.coalesce(1000)  # Bad: increasing partitions via coalesce
# This just groups existing partitions, doesn't add parallelism

5. Repartition

# Repartition: Full shuffle to create exactly N partitions
# Guarantees even distribution across partitions

# Application 1: Even distribution for parallel processing
df_even = df.repartition(100)
# Each partition has approximately N/100 rows

# Application 2: Repartition by column
df_by_user = df.repartition(100, "user_id")
# All rows for same user in same partition
# Useful for: window functions, groupByKey operations

# Application 3: Repartition by multiple columns
df_multi = df.repartition(100, "user_id", "event_type")
# Composite partitioning

# Mathematical analysis:
# Repartition uses HashPartitioning (default) or RoundRobinPartitioning
#
# Hash Repartition:
#   partition_id = hash(key) % P
#   Distribution: depends on hash function quality and key distribution
#   Skew: if keys are not uniformly distributed
#
# Round Robin Repartition:
#   partition_id = row_number % P
#   Distribution: perfectly even (assuming no partition-level skew)
#   No key dependency
#
# Shuffle cost:
#   Write: O(N) total, O(N/P) per partition
#   Read: O(N) total, O(N/P) per partition
#   Network: O(N Γ— key_size) for hash repartition
#
# Comparison:
# Coalesce: O(1) overhead, potentially uneven, no shuffle
# Repartition: O(N) overhead, guaranteed even, full shuffle

# Repartition strategies:
df.repartition(100)  # Round robin (default, no key)
df.repartition(100, "user_id")  # Hash by key
df.repartition(F.col("user_id").asc())  # Range by key (Spark 3.x)

6. Partition Skew Analysis and Mitigation

# Partition skew detection:
def analyze_partition_skew(df, label=""):
    """Analyze partition sizes for skew detection."""
    partition_sizes = df.rdd.mapPartitionsWithIndex(
        lambda idx, it: [(idx, sum(1 for _ in it))]
    ).collect()
    
    sizes = [s for _, s in partition_sizes]
    mean_size = sum(sizes) / len(sizes)
    max_size = max(sizes)
    min_size = min(sizes)
    skew_ratio = max_size / mean_size if mean_size > 0 else float('inf')
    
    print(f"{label} Partition Analysis:")
    print(f"  Partitions: {len(sizes)}")
    print(f"  Mean size: {mean_size:.0f}")
    print(f"  Min size: {min_size}")
    print(f"  Max size: {max_size}")
    print(f"  Skew ratio: {skew_ratio:.2f}")
    
    return {"skewed": skew_ratio > 2, "skew_ratio": skew_ratio}

# Example: skewed data
skewed_df = spark.createDataFrame(
    [(i,) for i in range(100000)] +  # 100K rows for key=0
    [(i,) for i in range(100)],       # 100 rows for other keys
    ["key"]
)

# Detect skew
analyze_partition_skew(skewed_df.repartition(10, "key"))
# Skew ratio: ~1000 (extremely skewed!)
# Mitigation Strategy 1: Salting for even distribution
def salt_dataframe(df, key_col, num_salt_buckets=100):
    """Add salt column for even distribution."""
    return df.withColumn(
        "salt", (F.rand() * num_salt_buckets).cast("int")
    ).withColumn(
        "salted_key", F.concat(F.col(key_col), F.lit("_"), F.col("salt"))
    ).repartition(num_salt_buckets, "salted_key")

# Mitigation Strategy 2: Separate skewed keys
def isolate_skewed_keys(df, key_col, skew_threshold=1000):
    """Process skewed keys separately."""
    # Find skewed keys
    key_counts = df.groupBy(key_col).count()
    skewed_keys = key_counts.filter(
        F.col("count") > skew_threshold
    ).select(key_col).collect()
    skewed_key_list = [row[key_col] for row in skewed_keys]
    
    # Split data
    normal_df = df.filter(~F.col(key_col).isin(skewed_key_list))
    skewed_df = df.filter(F.col(key_col).isin(skewed_key_list))
    
    # Process normally, skewed with more partitions
    return normal_df.repartition(200, key_col), skewed_df.repartition(1000, key_col)

# Mitigation Strategy 3: Range partitioning for time-series
# Partition by time ranges ensures even distribution
df_time_partitioned = df.withColumn(
    "date_bucket", F.date_trunc("day", "timestamp")
).repartition(365, "date_bucket")  # One partition per day

7. Partitioning Best Practices

# Best Practice 1: Partition before shuffle operations
# Repartition by join key to avoid shuffle skew
df_left = left_df.repartition(200, "join_key")
df_right = right_df.repartition(200, "join_key")
result = df_left.join(df_right, "join_key")  # No shuffle!

# Best Practice 2: Coalesce before write
# Reduce small files
df.write.parquet("output/")  # Default: 200 files
# Better:
df.coalesce(10).write.parquet("output/")  # 10 files

# Best Practice 3: Match partition count to data size
# Rule of thumb: 128-256 MB per partition
data_size_mb = 10000  # 10 GB
target_partition_mb = 128
optimal_partitions = data_size_mb / target_partition_mb  # ~78 partitions

# Best Practice 4: Use bucketing for repeated operations
df.write.bucketBy(100, "user_id") \
    .sortBy("user_id") \
    .saveAsTable("events_bucketed")

# Best Practice 5: Consider executor count
num_executors = 50
cores_per_executor = 4
total_cores = num_executors * cores_per_executor  # 200 cores
# Each task processes one partition
# So optimal partitions β‰₯ total_cores for full parallelism
# Optimal partitions = max(total_cores, data_size_mb / target_partition_mb)

⚠️Common Pitfall

Using coalesce() to increase partition count doesn't work β€” it only reduces. Coalesce merges existing partitions without shuffle, so it can't add parallelism. Use repartition() to increase partitions.

πŸ’‘Interview Tip

When discussing partitioning, always mention the task scheduling overhead: each partition becomes one task. Too many partitions = excessive scheduling overhead. Too few = underutilized parallelism. Optimal is typically 2-4x total executor cores.


Summary

MethodShufflePartition ChangeUse Case
CoalesceNoDecrease onlyReduce partitions before write
Repartition (round-robin)YesIncrease or decreaseEven distribution
Repartition (hash)YesIncrease or decreasePartition by key
Range PartitioningYesSet boundariesTime-series, sorted data
SaltingYesIncreaseSkew mitigation
BucketingWrite-timeFixed at writeRepeated join/groupBy

The key to partitioning optimization is balancing data locality, parallelism, and shuffle overhead for your specific workload.

Advertisement