Bucketing Strategies in PySpark

Free Lesson

Advertisement

πŸͺ£ Bucketing Strategies in PySpark

Architecture Diagram

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    BUCKETING ARCHITECTURE                                β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                         β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”‚
β”‚   β”‚  Write Path  │────▢│  Hash        │────▢│  Bucket      β”‚          β”‚
β”‚   β”‚  (DataFrame) β”‚     β”‚  Function    β”‚     β”‚  Files       β”‚          β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜          β”‚
β”‚                               β”‚                     β”‚                   β”‚
β”‚                               β–Ό                     β–Ό                   β”‚
β”‚                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”‚
β”‚                    β”‚  bucket_N/       β”‚   β”‚  bucket_0/       β”‚         β”‚
β”‚                    β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚         β”‚
β”‚                    β”‚  β”‚ part-00000β”‚   β”‚   β”‚  β”‚ part-00000β”‚   β”‚         β”‚
β”‚                    β”‚  β”‚ part-00001β”‚   β”‚   β”‚  β”‚ part-00001β”‚   β”‚         β”‚
β”‚                    β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚         β”‚
β”‚                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β”‚
β”‚                                                                         β”‚
β”‚   Read Path (with bucket pruning):                                     β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚   β”‚  Query: SELECT * FROM table WHERE bucket_col = 'value'      β”‚     β”‚
β”‚   β”‚                                                              β”‚     β”‚
β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚     β”‚
β”‚   β”‚  β”‚  Bucket Pruning: Only scan matching bucket          β”‚   β”‚     β”‚
β”‚   β”‚  β”‚                                                      β”‚   β”‚     β”‚
β”‚   β”‚  β”‚  bucket_0/  ← SKIP (hash doesn't match)            β”‚   β”‚     β”‚
β”‚   β”‚  β”‚  bucket_1/  ← SCAN (hash matches)                   β”‚   β”‚     β”‚
β”‚   β”‚  β”‚  bucket_2/  ← SKIP                                  β”‚   β”‚     β”‚
β”‚   β”‚  β”‚  bucket_3/  ← SKIP                                  β”‚   β”‚     β”‚
β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚     β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
β”‚                                                                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              BUCKETED JOIN OPTIMIZATION                                  β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                         β”‚
β”‚   Without Bucketing (Sort-Merge Join):                                  β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚   β”‚  Table A                    Table B                          β”‚     β”‚
β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                 β”‚     β”‚
β”‚   β”‚  β”‚ 1000 files   β”‚          β”‚ 1000 files   β”‚                 β”‚     β”‚
β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜          β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜                 β”‚     β”‚
β”‚   β”‚         β”‚                         β”‚                          β”‚     β”‚
β”‚   β”‚         β–Ό                         β–Ό                          β”‚     β”‚
β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                 β”‚     β”‚
β”‚   β”‚  β”‚ Shuffle ALL  β”‚          β”‚ Shuffle ALL  β”‚                 β”‚     β”‚
β”‚   β”‚  β”‚ 200 GB       β”‚          β”‚ 150 GB       β”‚                 β”‚     β”‚
β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜          β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜                 β”‚     β”‚
β”‚   β”‚         β”‚                         β”‚                          β”‚     β”‚
β”‚   β”‚         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                          β”‚     β”‚
β”‚   β”‚                   β–Ό                                          β”‚     β”‚
β”‚   β”‚         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                β”‚     β”‚
β”‚   β”‚         β”‚  Sort + Merge    β”‚                                β”‚     β”‚
β”‚   β”‚         β”‚  350 GB shuffle  β”‚                                β”‚     β”‚
β”‚   β”‚         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                β”‚     β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
β”‚                                                                         β”‚
β”‚   With Bucketing (Bucketed Join):                                      β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚   β”‚  Table A (bucketed by join_key, 256 buckets)                β”‚     β”‚
β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                 β”‚     β”‚
β”‚   β”‚  β”‚ 256 buckets  β”‚          β”‚ 256 buckets  β”‚                 β”‚     β”‚
β”‚   β”‚  β”‚ (sorted)     β”‚          β”‚ (sorted)     β”‚                 β”‚     β”‚
β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜          β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜                 β”‚     β”‚
β”‚   β”‚         β”‚                         β”‚                          β”‚     β”‚
β”‚   β”‚         β–Ό                         β–Ό                          β”‚     β”‚
β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                 β”‚     β”‚
β”‚   β”‚  β”‚ NO shuffle   β”‚          β”‚ NO shuffle   β”‚                 β”‚     β”‚
β”‚   β”‚  β”‚ (pre-bucketedβ”‚          β”‚ (pre-bucketedβ”‚                 β”‚     β”‚
β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜          β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜                 β”‚     β”‚
β”‚   β”‚         β”‚                         β”‚                          β”‚     β”‚
β”‚   β”‚         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                          β”‚     β”‚
β”‚   β”‚                   β–Ό                                          β”‚     β”‚
β”‚   β”‚         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                β”‚     β”‚
β”‚   β”‚         β”‚  Bucket-wise     β”‚                                β”‚     β”‚
β”‚   β”‚         β”‚  merge (256      β”‚                                β”‚     β”‚
β”‚   β”‚         β”‚  independent)    β”‚                                β”‚     β”‚
β”‚   β”‚         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                β”‚     β”‚
β”‚   β”‚                                                              β”‚     β”‚
β”‚   β”‚  Shuffle eliminated! Only local merges per bucket.          β”‚     β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
β”‚                                                                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚               HASH DISTRIBUTION & BUCKET PRUNING                        β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                         β”‚
β”‚   Hash Function Application:                                           β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚   β”‚  Input: join_key = "customer_123"                           β”‚     β”‚
β”‚   β”‚                                                              β”‚     β”‚
β”‚   β”‚  hash("customer_123") % 256 = bucket_index                  β”‚     β”‚
β”‚   β”‚                                                              β”‚     β”‚
β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚     β”‚
β”‚   β”‚  β”‚  Bucket 0   Bucket 1   Bucket 2   ...  Bucket 255  β”‚   β”‚     β”‚
β”‚   β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”  β”‚   β”‚     β”‚
β”‚   β”‚  β”‚  β”‚ cust  β”‚  β”‚ cust  β”‚  β”‚ cust  β”‚       β”‚ cust  β”‚  β”‚   β”‚     β”‚
β”‚   β”‚  β”‚  β”‚ _456  β”‚  β”‚ _123  β”‚  β”‚ _789  β”‚       β”‚ _012  β”‚  β”‚   β”‚     β”‚
β”‚   β”‚  β”‚  β”‚ cust  β”‚  β”‚ cust  β”‚  β”‚ cust  β”‚       β”‚ cust  β”‚  β”‚   β”‚     β”‚
β”‚   β”‚  β”‚  β”‚ _321  β”‚  β”‚ _654  β”‚  β”‚ _987  β”‚       β”‚ _345  β”‚  β”‚   β”‚     β”‚
β”‚   β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β”‚     β”‚
β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚     β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
β”‚                                                                         β”‚
β”‚   Bucket Pruning Decision Tree:                                        β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚   β”‚  Query: WHERE join_key = 'customer_123'                     β”‚     β”‚
β”‚   β”‚                                                              β”‚     β”‚
β”‚   β”‚  1. Compute hash('customer_123') % num_buckets              β”‚     β”‚
β”‚   β”‚  2. Identify target bucket (e.g., bucket 42)                β”‚     β”‚
β”‚   β”‚  3. Scan ONLY bucket_42/ files                               β”‚     β”‚
β”‚   β”‚  4. Apply remaining predicates within bucket                β”‚     β”‚
β”‚   β”‚                                                              β”‚     β”‚
β”‚   β”‚  Result: 1/256 of data scanned = 99.6% reduction            β”‚     β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
β”‚                                                                         β”‚
β”‚   Sort Order Within Buckets:                                           β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚   β”‚  Bucket 42 (sorted by join_key):                            β”‚     β”‚
β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚     β”‚
β”‚   β”‚  β”‚  customer_1000123  β”‚  customer_1000456  β”‚  ...      β”‚   β”‚     β”‚
β”‚   β”‚  β”‚  ─────────────────┼────────────────────┼───────    β”‚   β”‚     β”‚
β”‚   β”‚  β”‚  byte-0 (sorted)  β”‚  byte-1 (sorted)  β”‚  ...      β”‚   β”‚     β”‚
β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚     β”‚
β”‚   β”‚                                                              β”‚     β”‚
β”‚   β”‚  Enables efficient range queries WITHIN each bucket         β”‚     β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
β”‚                                                                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Detailed Explanation

Bucketing in PySpark is a data layout strategy that distributes data into a fixed number of files (buckets) based on the hash value of one or more columns. Unlike partitioning, which creates directory structures, bucketing organizes data within a directory into hash-based file groups. The primary benefits are eliminating shuffle during joins (when both tables are bucketed on the join key), enabling bucket pruning for point queries, and maintaining sorted order within each bucket for efficient range scans.

The hash function used by PySpark is murmur3 (Spark's default hash), which produces a 32-bit integer. The bucket index is computed as hash(column_value) % num_buckets. This means that all rows with the same value in the bucketing column will be placed in the same bucket file. When both tables in a join are bucketed on the join key with the same number of buckets, Spark can perform a bucket-aware sort-merge join without shuffling dataβ€”each bucket in table A is joined with the corresponding bucket in table B.

Bucket pruning is an optimization that skips entire buckets during query execution. When a query includes an equality predicate on the bucketing column (e.g., WHERE customer_id = 'CUST-001'), Spark computes the hash and identifies the target bucket. Only that bucket's files are read, potentially skipping 99%+ of the data. This optimization requires the query to use the exact equality predicate that matches the bucketing columnβ€”range predicates or predicates on other columns do not trigger bucket pruning.

The number of buckets is a critical configuration decision. Too few buckets result in large files that are slow to scan; too many buckets result in many small files that add metadata overhead. A good rule of thumb is to target file sizes of 128-256 MB after bucketing. For a 1 TB table with 128 MB target file size, you would need approximately 8,000 buckets. However, the number of buckets must match between tables for bucketed joins to work, so consider future join patterns when choosing this value.

Sorted bucketing adds an additional optimization: within each bucket, data is sorted by the bucketing columns. This enables efficient range scans within a bucket, as the data is already in sorted order. The sort order is maintained at the file levelβ€”each file within a bucket is individually sorted, and the concatenation of all files in a bucket preserves the sort order due to the hash-based distribution.

Bucketing interacts with partitioning in important ways. When both are used, partitioning creates the directory structure (e.g., date=2024-01-15/) and bucketing creates files within each partition directory. This means bucket pruning only works within a partitionβ€”queries that filter on both the partition column and the bucketing column benefit from both pruning strategies. However, over-partitioning combined with bucketing can create too many small files, so balance is essential.

Mathematical Foundations

Definition: Hash Bucketing

A hash bucketing strategy partitions dataset DD into BB buckets via a hash function h:keyβ†’{0,1,…,Bβˆ’1}h: \text{key} \rightarrow \{0, 1, \ldots, B-1\}. Bucket bb contains:

Db={r∈D:h(r.key)=b}D_b = \{r \in D : h(r.\text{key}) = b\}

Bucket Pruning

For equi-join on bucketed columns with BB buckets, only matching buckets need scanning:

Scanned(D)=∣D∣B(per table, per matching bucket)\text{Scanned}(D) = \frac{|D|}{B} \quad \text{(per table, per matching bucket)}

Join scans reduce from ∣D1βˆ£Γ—βˆ£D2∣|D_1| \times |D_2| to BΓ—βˆ£D1∣BΓ—βˆ£D2∣B=∣D1βˆ£Γ—βˆ£D2∣BB \times \frac{|D_1|}{B} \times \frac{|D_2|}{B} = \frac{|D_1| \times |D_2|}{B}.

Load Balance Theorem

For hash function hh mapping nn keys to BB buckets, the expected maximum bucket size is:

E[max⁑b∣Db∣]β‰ˆnB+2nln⁑BB\mathbb{E}[\max_b |D_b|] \approx \frac{n}{B} + \sqrt{\frac{2n \ln B}{B}}

Load imbalance approaches 0 as n/Bβ†’βˆžn/B \rightarrow \infty for good hash functions.

Sort-Merge Join Cost

Without bucketing, sort-merge join cost is:

C=O(∣D1∣log⁑∣D1∣+∣D2∣log⁑∣D2∣+∣D1∣+∣D2∣)C = O\left(|D_1| \log |D_1| + |D_2| \log |D_2| + |D_1| + |D_2|\right)

With bucketing, sorting is eliminated: Cbucketed=O(∣D1∣+∣D2∣)C_{\text{bucketed}} = O(|D_1| + |D_2|).

Bucket Count Selection

Optimal bucket count BB minimizes total I/O:

Bβˆ—=arg⁑min⁑B(BΓ—βŒˆβˆ£D1∣/BSblockβŒ‰Γ—costread+overhead(B))B^* = \arg\min_B \left( B \times \left\lceil \frac{|D_1|/B}{S_{\text{block}}} \right\rceil \times \text{cost}_{\text{read}} + \text{overhead}(B) \right)

where SblockS_{\text{block}} is the HDFS block size.

Key Insight

Bucketing trades write-time computation for read-time savings. The benefit is maximal when the same bucketed column is used for joins across multiple queries. Over-bucketing (B>∣D∣/SblockB > |D|/S_{\text{block}}) creates too many small files, hurting I/O.

Summary

Hash bucketing partitions data by key, enabling bucket pruning that reduces join scan by factor BB. Load balance follows a balls-into-bins model. Optimal bucket count balances block-aligned I/O against overhead. Bucketing is most beneficial for repeated equi-joins on the same key.

Key Concepts Table

ConceptDescriptionPerformance Impact
Bucket ColumnColumn used for hash-based distributionDetermines join and pruning efficiency
Number of BucketsFixed count of hash-based file groupsMust match for bucketed joins
Hash Functionmurmur3 hash for bucket assignmentUniform distribution across buckets
Bucket PruningSkip non-matching buckets on queryUp to 99%+ reduction in data scanned
Bucketed JoinJoin without shuffle on matching bucketsEliminates shuffle stage
Sorted BucketingData sorted within each bucketEnables efficient range scans
Bucket MetadataStored in table propertiesRequired for bucket-aware optimizations
File Size per BucketControlled by num_buckets and data sizeTarget 128-256 MB per file
Partition + BucketHierarchical layoutBoth pruning strategies apply
Bucket EvolutionChanging bucket countRequires table rewrite

Code Examples

Creating Bucketed Tables

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("BucketingStrategies") \
    .config("spark.sql.sources.bucketing.enabled", "true") \
    .config("spark.sql.sources.bucketing.maxBuckets", "100000") \
    .getOrCreate()

# Create large dataset for bucketing demonstration
from pyspark.sql.functions import rand, floor, concat, lit

# Generate customer orders (10M rows)
orders_df = spark.range(0, 10_000_000) \
    .withColumn("order_id", concat(lit("ORD-"), col("id"))) \
    .withColumn("customer_id", concat(lit("CUST-"), (col("id") % 1_000_000))) \
    .withColumn("product_id", concat(lit("PROD-"), (col("id") % 50_000))) \
    .withColumn("amount", (rand() * 1000 + 10).cast("decimal(10,2)")) \
    .withColumn("order_date", date_add(lit("2024-01-01"), (col("id") % 365).cast("int")))

# Create customers dataset (1M rows)
customers_df = spark.range(0, 1_000_000) \
    .withColumn("customer_id", concat(lit("CUST-"), col("id"))) \
    .withColumn("name", concat(lit("Customer_"), col("id"))) \
    .withColumn("segment", 
        when(rand() < 0.3, "Premium")
        .when(rand() < 0.6, "Standard")
        .otherwise("Basic"))

# Write bucketed tables
# Bucket by customer_id with 256 buckets
orders_df.write \
    .bucketBy(256, "customer_id") \
    .sortBy("customer_id") \
    .mode("overwrite") \
    .saveAsTable("bucketed_orders")

customers_df.write \
    .bucketBy(256, "customer_id") \
    .sortBy("customer_id") \
    .mode("overwrite") \
    .saveAsTable("bucketed_customers")

# Verify bucketing
spark.sql("DESCRIBE EXTENDED bucketed_orders").show(truncate=False)
spark.sql("DESCRIBE EXTENDED bucketed_customers").show(truncate=False)

Bucket Pruning Demonstration

# Enable bucket pruning for queries
spark.conf.set("spark.sql.sources.bucketing.enabled", "true")

# Query with bucket pruning (equality predicate on bucket column)
# This should only scan 1/256 of the data
import time

# Point query - bucket pruning enabled
start = time.time()
result1 = spark.sql("""
    SELECT o.order_id, o.amount, c.name
    FROM bucketed_orders o
    JOIN bucketed_customers c ON o.customer_id = c.customer_id
    WHERE o.customer_id = 'CUST-000042'
""")
result1.show(truncate=False)
pruned_time = time.time() - start
print(f"Bucket pruning query time: {pruned_time:.2f} seconds")

# Query without bucket pruning (range predicate)
start = time.time()
result2 = spark.sql("""
    SELECT o.order_id, o.amount, c.name
    FROM bucketed_orders o
    JOIN bucketed_customers c ON o.customer_id = c.customer_id
    WHERE o.amount > 500
""")
result2.show(5, truncate=False)
full_scan_time = time.time() - start
print(f"Full scan query time: {full_scan_time:.2f} seconds")

# Verify bucket pruning in query plan
spark.sql("""
    EXPLAIN SELECT * FROM bucketed_orders 
    WHERE customer_id = 'CUST-000042'
""").show(truncate=False)

Bucket-Aware Joins

# Bucketed join - should NOT shuffle
start = time.time()
bucketed_join_df = spark.sql("""
    SELECT 
        o.order_id,
        o.amount,
        o.order_date,
        c.name,
        c.segment
    FROM bucketed_orders o
    JOIN bucketed_customers c 
    ON o.customer_id = c.customer_id
""")
bucketed_join_df.count()  # Force execution
bucketed_join_time = time.time() - start
print(f"Bucketed join time: {bucketed_join_time:.2f} seconds")

# Non-bucketed join for comparison
orders_df.write.mode("overwrite").saveAsTable("regular_orders")
customers_df.write.mode("overwrite").saveAsTable("regular_customers")

start = time.time()
regular_join_df = spark.sql("""
    SELECT 
        o.order_id,
        o.amount,
        o.order_date,
        c.name,
        c.segment
    FROM regular_orders o
    JOIN regular_customers c 
    ON o.customer_id = c.customer_id
""")
regular_join_df.count()  # Force execution
regular_join_time = time.time() - start
print(f"Regular join time: {regular_join_time:.2f} seconds")

print(f"Speedup: {regular_join_time / bucketed_join_time:.1f}x")

# Verify no shuffle in bucketed join
spark.sql("""
    EXPLAIN SELECT * FROM bucketed_orders o
    JOIN bucketed_customers c ON o.customer_id = c.customer_id
""").show(truncate=False)

Multi-Column Bucketing

# Bucket by multiple columns for complex query patterns
complex_orders_df = orders_df \
    .withColumn("region", 
        when(rand() < 0.25, "North")
        .when(rand() < 0.5, "South")
        .when(rand() < 0.75, "East")
        .otherwise("West"))

# Write with multi-column bucketing
complex_orders_df.write \
    .bucketBy(16, "region", "customer_id") \
    .sortBy("region", "customer_id") \
    .mode("overwrite") \
    .saveAsTable("multi_bucket_orders")

# Query benefits from both columns
spark.sql("""
    SELECT region, customer_id, SUM(amount) as total
    FROM multi_bucket_orders
    WHERE region = 'North' AND customer_id = 'CUST-000042'
    GROUP BY region, customer_id
""").show(truncate=False)

# Verify multi-column bucketing
spark.sql("DESCRIBE EXTENDED multi_bucket_orders").show(truncate=False)

Performance Metrics

MetricNon-Bucketed64 Buckets256 Buckets1024 Buckets
File Count (100GB table)~800 files64 files256 files1024 files
Avg File Size128 MB (varies)1.5 GB400 MB100 MB
Write Time (10M rows)45 seconds60 seconds75 seconds90 seconds
Point Query (bucket pruning)8-12 seconds2-4 seconds1-2 seconds0.5-1 second
Range Query (no pruning)8-12 seconds10-15 seconds8-12 seconds12-18 seconds
Bucketed Join (2 tables)120-180 seconds8-15 seconds6-10 seconds10-20 seconds
Non-Bucketed Join120-180 seconds120-180 seconds120-180 seconds120-180 seconds
Shuffle Volume (join)100% of data~0% (bucketed)~0% (bucketed)~0% (bucketed)
Memory per PartitionHigh varianceUniformUniformLow per partition
Concurrent Query PerformanceDegradesStableOptimalDegrades (too many files)

Best Practices

  1. Match bucket counts between join tables to enable bucket-aware joinsβ€”mismatched counts force a shuffle even with bucketing
  2. Target 128-256 MB file size per bucket by calculating num_buckets = total_data_size / target_file_size
  3. Use sortBy within buckets for columns commonly used in range queries to enable efficient intra-bucket scans
  4. Avoid bucketing on high-cardinality columns (e.g., UUIDs) unless you need bucket pruningβ€”hash distribution will be uniform regardless
  5. Combine partitioning and bucketing when queries commonly filter on both temporal and entity columns
  6. Monitor small file countsβ€”if buckets contain files < 10 MB, reduce the number of buckets
  7. Use INSERT INTO instead of write.saveAsTable for subsequent loads to maintain bucket structure
  8. Enable spark.sql.sources.bucketing.enabled=true explicitly to ensure bucket pruning is active
  9. Avoid bucket evolution (changing bucket count) as it requires a full table rewrite
  10. Use ANALYZE TABLE after bucketing to update column statistics for the optimizer
  11. Test bucket pruning with EXPLAIN to verify the query plan shows bucket pruning (scan only matching buckets)
  12. Consider bucketing for CDC workloads where upserts on the bucket column benefit from targeted file updates

See also: Snowflake Time Travel (snowflake/02), Kafka CDC (kafka/04), Airflow DAGs (airflow/02)

Advertisement

Need Expert PySpark Help?

Get personalized Spark optimization, cluster tuning, or production data pipeline consulting.

Advertisement