πͺ£ Bucketing Strategies in PySpark
Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β BUCKETING ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β Write Path ββββββΆβ Hash ββββββΆβ Bucket β β
β β (DataFrame) β β Function β β Files β β
β ββββββββββββββββ ββββββββ¬ββββββββ ββββββββ¬ββββββββ β
β β β β
β βΌ βΌ β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β bucket_N/ β β bucket_0/ β β
β β βββββββββββββ β β βββββββββββββ β β
β β β part-00000β β β β part-00000β β β
β β β part-00001β β β β part-00001β β β
β β βββββββββββββ β β βββββββββββββ β β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β
β Read Path (with bucket pruning): β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Query: SELECT * FROM table WHERE bucket_col = 'value' β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Bucket Pruning: Only scan matching bucket β β β
β β β β β β
β β β bucket_0/ β SKIP (hash doesn't match) β β β
β β β bucket_1/ β SCAN (hash matches) β β β
β β β bucket_2/ β SKIP β β β
β β β bucket_3/ β SKIP β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β BUCKETED JOIN OPTIMIZATION β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Without Bucketing (Sort-Merge Join): β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Table A Table B β β
β β ββββββββββββββββ ββββββββββββββββ β β
β β β 1000 files β β 1000 files β β β
β β ββββββββ¬ββββββββ ββββββββ¬ββββββββ β β
β β β β β β
β β βΌ βΌ β β
β β ββββββββββββββββ ββββββββββββββββ β β
β β β Shuffle ALL β β Shuffle ALL β β β
β β β 200 GB β β 150 GB β β β
β β ββββββββ¬ββββββββ ββββββββ¬ββββββββ β β
β β β β β β
β β βββββββββββ¬ββββββββββββββββ β β
β β βΌ β β
β β ββββββββββββββββββββ β β
β β β Sort + Merge β β β
β β β 350 GB shuffle β β β
β β ββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β With Bucketing (Bucketed Join): β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Table A (bucketed by join_key, 256 buckets) β β
β β ββββββββββββββββ ββββββββββββββββ β β
β β β 256 buckets β β 256 buckets β β β
β β β (sorted) β β (sorted) β β β
β β ββββββββ¬ββββββββ ββββββββ¬ββββββββ β β
β β β β β β
β β βΌ βΌ β β
β β ββββββββββββββββ ββββββββββββββββ β β
β β β NO shuffle β β NO shuffle β β β
β β β (pre-bucketedβ β (pre-bucketedβ β β
β β ββββββββ¬ββββββββ ββββββββ¬ββββββββ β β
β β β β β β
β β βββββββββββ¬ββββββββββββββββ β β
β β βΌ β β
β β ββββββββββββββββββββ β β
β β β Bucket-wise β β β
β β β merge (256 β β β
β β β independent) β β β
β β ββββββββββββββββββββ β β
β β β β
β β Shuffle eliminated! Only local merges per bucket. β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β HASH DISTRIBUTION & BUCKET PRUNING β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Hash Function Application: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Input: join_key = "customer_123" β β
β β β β
β β hash("customer_123") % 256 = bucket_index β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Bucket 0 Bucket 1 Bucket 2 ... Bucket 255 β β β
β β β βββββββββ βββββββββ βββββββββ βββββββββ β β β
β β β β cust β β cust β β cust β β cust β β β β
β β β β _456 β β _123 β β _789 β β _012 β β β β
β β β β cust β β cust β β cust β β cust β β β β
β β β β _321 β β _654 β β _987 β β _345 β β β β
β β β βββββββββ βββββββββ βββββββββ βββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Bucket Pruning Decision Tree: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Query: WHERE join_key = 'customer_123' β β
β β β β
β β 1. Compute hash('customer_123') % num_buckets β β
β β 2. Identify target bucket (e.g., bucket 42) β β
β β 3. Scan ONLY bucket_42/ files β β
β β 4. Apply remaining predicates within bucket β β
β β β β
β β Result: 1/256 of data scanned = 99.6% reduction β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Sort Order Within Buckets: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Bucket 42 (sorted by join_key): β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β customer_1000123 β customer_1000456 β ... β β β
β β β ββββββββββββββββββΌβββββββββββββββββββββΌβββββββ β β β
β β β byte-0 (sorted) β byte-1 (sorted) β ... β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β Enables efficient range queries WITHIN each bucket β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Detailed Explanation
Bucketing in PySpark is a data layout strategy that distributes data into a fixed number of files (buckets) based on the hash value of one or more columns. Unlike partitioning, which creates directory structures, bucketing organizes data within a directory into hash-based file groups. The primary benefits are eliminating shuffle during joins (when both tables are bucketed on the join key), enabling bucket pruning for point queries, and maintaining sorted order within each bucket for efficient range scans.
The hash function used by PySpark is murmur3 (Spark's default hash), which produces a 32-bit integer. The bucket index is computed as hash(column_value) % num_buckets. This means that all rows with the same value in the bucketing column will be placed in the same bucket file. When both tables in a join are bucketed on the join key with the same number of buckets, Spark can perform a bucket-aware sort-merge join without shuffling dataβeach bucket in table A is joined with the corresponding bucket in table B.
Bucket pruning is an optimization that skips entire buckets during query execution. When a query includes an equality predicate on the bucketing column (e.g., WHERE customer_id = 'CUST-001'), Spark computes the hash and identifies the target bucket. Only that bucket's files are read, potentially skipping 99%+ of the data. This optimization requires the query to use the exact equality predicate that matches the bucketing columnβrange predicates or predicates on other columns do not trigger bucket pruning.
The number of buckets is a critical configuration decision. Too few buckets result in large files that are slow to scan; too many buckets result in many small files that add metadata overhead. A good rule of thumb is to target file sizes of 128-256 MB after bucketing. For a 1 TB table with 128 MB target file size, you would need approximately 8,000 buckets. However, the number of buckets must match between tables for bucketed joins to work, so consider future join patterns when choosing this value.
Sorted bucketing adds an additional optimization: within each bucket, data is sorted by the bucketing columns. This enables efficient range scans within a bucket, as the data is already in sorted order. The sort order is maintained at the file levelβeach file within a bucket is individually sorted, and the concatenation of all files in a bucket preserves the sort order due to the hash-based distribution.
Bucketing interacts with partitioning in important ways. When both are used, partitioning creates the directory structure (e.g., date=2024-01-15/) and bucketing creates files within each partition directory. This means bucket pruning only works within a partitionβqueries that filter on both the partition column and the bucketing column benefit from both pruning strategies. However, over-partitioning combined with bucketing can create too many small files, so balance is essential.
Mathematical Foundations
Definition: Hash Bucketing
A hash bucketing strategy partitions dataset into buckets via a hash function . Bucket contains:
Bucket Pruning
For equi-join on bucketed columns with buckets, only matching buckets need scanning:
Join scans reduce from to .
Load Balance Theorem
For hash function mapping keys to buckets, the expected maximum bucket size is:
Load imbalance approaches 0 as for good hash functions.
Sort-Merge Join Cost
Without bucketing, sort-merge join cost is:
With bucketing, sorting is eliminated: .
Bucket Count Selection
Optimal bucket count minimizes total I/O:
where is the HDFS block size.
Key Insight
Bucketing trades write-time computation for read-time savings. The benefit is maximal when the same bucketed column is used for joins across multiple queries. Over-bucketing () creates too many small files, hurting I/O.
Summary
Hash bucketing partitions data by key, enabling bucket pruning that reduces join scan by factor . Load balance follows a balls-into-bins model. Optimal bucket count balances block-aligned I/O against overhead. Bucketing is most beneficial for repeated equi-joins on the same key.
Key Concepts Table
| Concept | Description | Performance Impact |
|---|---|---|
| Bucket Column | Column used for hash-based distribution | Determines join and pruning efficiency |
| Number of Buckets | Fixed count of hash-based file groups | Must match for bucketed joins |
| Hash Function | murmur3 hash for bucket assignment | Uniform distribution across buckets |
| Bucket Pruning | Skip non-matching buckets on query | Up to 99%+ reduction in data scanned |
| Bucketed Join | Join without shuffle on matching buckets | Eliminates shuffle stage |
| Sorted Bucketing | Data sorted within each bucket | Enables efficient range scans |
| Bucket Metadata | Stored in table properties | Required for bucket-aware optimizations |
| File Size per Bucket | Controlled by num_buckets and data size | Target 128-256 MB per file |
| Partition + Bucket | Hierarchical layout | Both pruning strategies apply |
| Bucket Evolution | Changing bucket count | Requires table rewrite |
Code Examples
Creating Bucketed Tables
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("BucketingStrategies") \
.config("spark.sql.sources.bucketing.enabled", "true") \
.config("spark.sql.sources.bucketing.maxBuckets", "100000") \
.getOrCreate()
# Create large dataset for bucketing demonstration
from pyspark.sql.functions import rand, floor, concat, lit
# Generate customer orders (10M rows)
orders_df = spark.range(0, 10_000_000) \
.withColumn("order_id", concat(lit("ORD-"), col("id"))) \
.withColumn("customer_id", concat(lit("CUST-"), (col("id") % 1_000_000))) \
.withColumn("product_id", concat(lit("PROD-"), (col("id") % 50_000))) \
.withColumn("amount", (rand() * 1000 + 10).cast("decimal(10,2)")) \
.withColumn("order_date", date_add(lit("2024-01-01"), (col("id") % 365).cast("int")))
# Create customers dataset (1M rows)
customers_df = spark.range(0, 1_000_000) \
.withColumn("customer_id", concat(lit("CUST-"), col("id"))) \
.withColumn("name", concat(lit("Customer_"), col("id"))) \
.withColumn("segment",
when(rand() < 0.3, "Premium")
.when(rand() < 0.6, "Standard")
.otherwise("Basic"))
# Write bucketed tables
# Bucket by customer_id with 256 buckets
orders_df.write \
.bucketBy(256, "customer_id") \
.sortBy("customer_id") \
.mode("overwrite") \
.saveAsTable("bucketed_orders")
customers_df.write \
.bucketBy(256, "customer_id") \
.sortBy("customer_id") \
.mode("overwrite") \
.saveAsTable("bucketed_customers")
# Verify bucketing
spark.sql("DESCRIBE EXTENDED bucketed_orders").show(truncate=False)
spark.sql("DESCRIBE EXTENDED bucketed_customers").show(truncate=False)
Bucket Pruning Demonstration
# Enable bucket pruning for queries
spark.conf.set("spark.sql.sources.bucketing.enabled", "true")
# Query with bucket pruning (equality predicate on bucket column)
# This should only scan 1/256 of the data
import time
# Point query - bucket pruning enabled
start = time.time()
result1 = spark.sql("""
SELECT o.order_id, o.amount, c.name
FROM bucketed_orders o
JOIN bucketed_customers c ON o.customer_id = c.customer_id
WHERE o.customer_id = 'CUST-000042'
""")
result1.show(truncate=False)
pruned_time = time.time() - start
print(f"Bucket pruning query time: {pruned_time:.2f} seconds")
# Query without bucket pruning (range predicate)
start = time.time()
result2 = spark.sql("""
SELECT o.order_id, o.amount, c.name
FROM bucketed_orders o
JOIN bucketed_customers c ON o.customer_id = c.customer_id
WHERE o.amount > 500
""")
result2.show(5, truncate=False)
full_scan_time = time.time() - start
print(f"Full scan query time: {full_scan_time:.2f} seconds")
# Verify bucket pruning in query plan
spark.sql("""
EXPLAIN SELECT * FROM bucketed_orders
WHERE customer_id = 'CUST-000042'
""").show(truncate=False)
Bucket-Aware Joins
# Bucketed join - should NOT shuffle
start = time.time()
bucketed_join_df = spark.sql("""
SELECT
o.order_id,
o.amount,
o.order_date,
c.name,
c.segment
FROM bucketed_orders o
JOIN bucketed_customers c
ON o.customer_id = c.customer_id
""")
bucketed_join_df.count() # Force execution
bucketed_join_time = time.time() - start
print(f"Bucketed join time: {bucketed_join_time:.2f} seconds")
# Non-bucketed join for comparison
orders_df.write.mode("overwrite").saveAsTable("regular_orders")
customers_df.write.mode("overwrite").saveAsTable("regular_customers")
start = time.time()
regular_join_df = spark.sql("""
SELECT
o.order_id,
o.amount,
o.order_date,
c.name,
c.segment
FROM regular_orders o
JOIN regular_customers c
ON o.customer_id = c.customer_id
""")
regular_join_df.count() # Force execution
regular_join_time = time.time() - start
print(f"Regular join time: {regular_join_time:.2f} seconds")
print(f"Speedup: {regular_join_time / bucketed_join_time:.1f}x")
# Verify no shuffle in bucketed join
spark.sql("""
EXPLAIN SELECT * FROM bucketed_orders o
JOIN bucketed_customers c ON o.customer_id = c.customer_id
""").show(truncate=False)
Multi-Column Bucketing
# Bucket by multiple columns for complex query patterns
complex_orders_df = orders_df \
.withColumn("region",
when(rand() < 0.25, "North")
.when(rand() < 0.5, "South")
.when(rand() < 0.75, "East")
.otherwise("West"))
# Write with multi-column bucketing
complex_orders_df.write \
.bucketBy(16, "region", "customer_id") \
.sortBy("region", "customer_id") \
.mode("overwrite") \
.saveAsTable("multi_bucket_orders")
# Query benefits from both columns
spark.sql("""
SELECT region, customer_id, SUM(amount) as total
FROM multi_bucket_orders
WHERE region = 'North' AND customer_id = 'CUST-000042'
GROUP BY region, customer_id
""").show(truncate=False)
# Verify multi-column bucketing
spark.sql("DESCRIBE EXTENDED multi_bucket_orders").show(truncate=False)
Performance Metrics
| Metric | Non-Bucketed | 64 Buckets | 256 Buckets | 1024 Buckets |
|---|---|---|---|---|
| File Count (100GB table) | ~800 files | 64 files | 256 files | 1024 files |
| Avg File Size | 128 MB (varies) | 1.5 GB | 400 MB | 100 MB |
| Write Time (10M rows) | 45 seconds | 60 seconds | 75 seconds | 90 seconds |
| Point Query (bucket pruning) | 8-12 seconds | 2-4 seconds | 1-2 seconds | 0.5-1 second |
| Range Query (no pruning) | 8-12 seconds | 10-15 seconds | 8-12 seconds | 12-18 seconds |
| Bucketed Join (2 tables) | 120-180 seconds | 8-15 seconds | 6-10 seconds | 10-20 seconds |
| Non-Bucketed Join | 120-180 seconds | 120-180 seconds | 120-180 seconds | 120-180 seconds |
| Shuffle Volume (join) | 100% of data | ~0% (bucketed) | ~0% (bucketed) | ~0% (bucketed) |
| Memory per Partition | High variance | Uniform | Uniform | Low per partition |
| Concurrent Query Performance | Degrades | Stable | Optimal | Degrades (too many files) |
Best Practices
- Match bucket counts between join tables to enable bucket-aware joinsβmismatched counts force a shuffle even with bucketing
- Target 128-256 MB file size per bucket by calculating
num_buckets = total_data_size / target_file_size - Use
sortBywithin buckets for columns commonly used in range queries to enable efficient intra-bucket scans - Avoid bucketing on high-cardinality columns (e.g., UUIDs) unless you need bucket pruningβhash distribution will be uniform regardless
- Combine partitioning and bucketing when queries commonly filter on both temporal and entity columns
- Monitor small file countsβif buckets contain files < 10 MB, reduce the number of buckets
- Use
INSERT INTOinstead ofwrite.saveAsTablefor subsequent loads to maintain bucket structure - Enable
spark.sql.sources.bucketing.enabled=trueexplicitly to ensure bucket pruning is active - Avoid bucket evolution (changing bucket count) as it requires a full table rewrite
- Use
ANALYZE TABLEafter bucketing to update column statistics for the optimizer - Test bucket pruning with
EXPLAINto verify the query plan shows bucket pruning (scan only matching buckets) - Consider bucketing for CDC workloads where upserts on the bucket column benefit from targeted file updates
See also: Snowflake Time Travel (snowflake/02), Kafka CDC (kafka/04), Airflow DAGs (airflow/02)