Adaptive Query Execution in PySpark

Free Lesson

Advertisement

⚑ Adaptive Query Execution in PySpark

Architecture Diagram

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  AQE ARCHITECTURE OVERVIEW                               β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                         β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”‚
β”‚   β”‚  Physical    │────▢│  Query       │────▢│  Stage       β”‚          β”‚
β”‚   β”‚  Plan        β”‚     β”‚  Execution   β”‚     β”‚  Boundary    β”‚          β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜          β”‚
β”‚                               β”‚                     β”‚                   β”‚
β”‚                               β–Ό                     β–Ό                   β”‚
β”‚                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”‚
β”‚                    β”‚  Runtime Stats   β”‚   β”‚  Plan            β”‚         β”‚
β”‚                    β”‚  Collection      β”‚   β”‚  Re-Optimization β”‚         β”‚
β”‚                    β”‚  ─────────────   β”‚   β”‚  ─────────────   β”‚         β”‚
β”‚                    β”‚  Row counts      β”‚   β”‚  Join strategy   β”‚         β”‚
β”‚                    β”‚  File sizes      β”‚   β”‚  Shuffle part.   β”‚         β”‚
β”‚                    β”‚  Column stats    β”‚   β”‚  Sort strategy   β”‚         β”‚
β”‚                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β”‚
β”‚                             β”‚                      β”‚                    β”‚
β”‚                             β–Ό                      β–Ό                    β”‚
β”‚                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”‚
β”‚                    β”‚  Skew Detection  β”‚   β”‚  Dynamic          β”‚         β”‚
β”‚                    β”‚  & Resolution    β”‚   β”‚  Partitioning     β”‚         β”‚
β”‚                    β”‚  ─────────────   β”‚   β”‚  ─────────────   β”‚         β”‚
β”‚                    β”‚  Split skewed   β”‚   β”‚  Coalesce small  β”‚         β”‚
β”‚                    β”‚  partitions     β”‚   β”‚  partitions      β”‚         β”‚
β”‚                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β”‚
β”‚                             β”‚                      β”‚                    β”‚
β”‚                             β–Ό                      β–Ό                    β”‚
β”‚                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”‚
β”‚                    β”‚  Post-Stage      β”‚   β”‚  Optimized       β”‚         β”‚
β”‚                    β”‚  Optimization    β”‚   β”‚  Execution       β”‚         β”‚
β”‚                    β”‚  ─────────────   β”‚   β”‚  ─────────────   β”‚         β”‚
β”‚                    β”‚  Merge join     β”‚   β”‚  Better plans    β”‚         β”‚
β”‚                    β”‚  partitions     β”‚   β”‚  based on actual β”‚         β”‚
β”‚                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚  data sizes      β”‚         β”‚
β”‚                                           β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β”‚
β”‚                                                                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              AQE RE-OPTIMIZATION DECISION POINTS                        β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                         β”‚
β”‚   Original Query Plan:                                                 β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚   β”‚  SortMergeJoin(A, B)                                         β”‚     β”‚
β”‚   β”‚  β”œβ”€β”€ Sort(A.customer_id)                                     β”‚     β”‚
β”‚   β”‚  β”‚   └── Scan(A) [estimated: 1M rows]                       β”‚     β”‚
β”‚   β”‚  └── Sort(B.customer_id)                                     β”‚     β”‚
β”‚   β”‚      └── Scan(B) [estimated: 10K rows]  ← WRONG ESTIMATE   β”‚     β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
β”‚                                                                         β”‚
β”‚   After Stage 1 (Scan A complete):                                     β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚   β”‚  Runtime Stats: A has 5M rows (actual), not 1M              β”‚     β”‚
β”‚   β”‚                                                              β”‚     β”‚
β”‚   β”‚  AQE Decision: Switch to BroadcastHashJoin                  β”‚     β”‚
β”‚   β”‚  (B is still small at 10K rows)                             β”‚     β”‚
β”‚   β”‚                                                              β”‚     β”‚
β”‚   β”‚  Optimized Plan:                                             β”‚     β”‚
β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚     β”‚
β”‚   β”‚  β”‚  BroadcastHashJoin(A, B)                             β”‚   β”‚     β”‚
β”‚   β”‚  β”‚  β”œβ”€β”€ Scan(A) [actual: 5M rows]                       β”‚   β”‚     β”‚
β”‚   β”‚  β”‚  └── BroadcastExchange(B)                             β”‚   β”‚     β”‚
β”‚   β”‚  β”‚      └── Scan(B) [actual: 10K rows]                  β”‚   β”‚     β”‚
β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚     β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
β”‚                                                                         β”‚
β”‚   Decision Points in AQE:                                              β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚   β”‚                                                              β”‚     β”‚
β”‚   β”‚  Stage 1 Complete ──▢ Re-optimize join strategy              β”‚     β”‚
β”‚   β”‚       β”‚                                                      β”‚     β”‚
β”‚   β”‚       β–Ό                                                      β”‚     β”‚
β”‚   β”‚  Stage 2 Complete ──▢ Re-optimize shuffle partitions         β”‚     β”‚
β”‚   β”‚       β”‚                                                      β”‚     β”‚
β”‚   β”‚       β–Ό                                                      β”‚     β”‚
β”‚   β”‚  Stage 3 Complete ──▢ Detect & resolve skew                  β”‚     β”‚
β”‚   β”‚       β”‚                                                      β”‚     β”‚
β”‚   β”‚       β–Ό                                                      β”‚     β”‚
β”‚   β”‚  Final Stage ──▢ Coalesce final output partitions            β”‚     β”‚
β”‚   β”‚                                                              β”‚     β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
β”‚                                                                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                 SKEW DETECTION & RESOLUTION                             β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                         β”‚
β”‚   Before AQE (Skewed Data):                                            β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚   β”‚  Partition 0:  100 MB  (normal)                             β”‚     β”‚
β”‚   β”‚  Partition 1:  100 MB  (normal)                             β”‚     β”‚
β”‚   β”‚  Partition 2:  50 GB   (SKEWED!) ← bottleneck              β”‚     β”‚
β”‚   β”‚  Partition 3:  100 MB  (normal)                             β”‚     β”‚
β”‚   β”‚  ...                                                        β”‚     β”‚
β”‚   β”‚  Partition 99: 100 MB  (normal)                             β”‚     β”‚
β”‚   β”‚                                                              β”‚     β”‚
β”‚   β”‚  Total time = 50 GB processing = HOURS                      β”‚     β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
β”‚                                                                         β”‚
β”‚   After AQE (Skew Resolved):                                          β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚   β”‚  Partition 0:  100 MB                                       β”‚     β”‚
β”‚   β”‚  Partition 1:  100 MB                                       β”‚     β”‚
β”‚   β”‚  Partition 2a: 16.7 GB  ← split from 2                     β”‚     β”‚
β”‚   β”‚  Partition 2b: 16.7 GB  ← split from 2                     β”‚     β”‚
β”‚   β”‚  Partition 2c: 16.7 GB  ← split from 2                     β”‚     β”‚
β”‚   β”‚  Partition 3:  100 MB                                       β”‚     β”‚
β”‚   β”‚  ...                                                        β”‚     β”‚
β”‚   β”‚  Partition 99: 100 MB                                       β”‚     β”‚
β”‚   β”‚                                                              β”‚     β”‚
β”‚   β”‚  Total time = 16.7 GB processing = MINUTES                  β”‚     β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
β”‚                                                                         β”‚
β”‚   AQE Skew Detection Algorithm:                                        β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚   β”‚  1. Collect partition sizes after shuffle                   β”‚     β”‚
β”‚   β”‚  2. Compute statistics: mean, stddev, median                β”‚     β”‚
β”‚   β”‚  3. Identify partitions where:                             β”‚     β”‚
β”‚   β”‚     size > median Γ— skew_threshold (default: 5)            β”‚     β”‚
β”‚   β”‚     AND size > min_partition_size (default: 64 MB)         β”‚     β”‚
β”‚   β”‚  4. For each skewed partition:                              β”‚     β”‚
β”‚   β”‚     a. Sample the partition to find the skew key(s)         β”‚     β”‚
β”‚   β”‚     b. Split into sub-partitions by skew key(s)             β”‚     β”‚
β”‚   β”‚     c. Rebalance remaining keys across partitions           β”‚     β”‚
β”‚   β”‚  5. Update physical plan with new partition assignments     β”‚     β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
β”‚                                                                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Detailed Explanation

Adaptive Query Execution (AQE) is Spark's framework for dynamically optimizing query plans at runtime based on actual data statistics rather than estimated statistics. Traditional query optimization relies on table statistics collected before query execution (via ANALYZE TABLE), which can be significantly inaccurate due to data drift, skewed distributions, or stale metadata. AQE addresses this by re-evaluating and modifying the physical plan at stage boundaries, where actual data characteristics become available.

The core innovation of AQE is its ability to make three types of dynamic optimizations: join strategy switching, shuffle partition coalescing, and skew partition splitting. Join strategy switching occurs when the actual size of one join partner differs significantly from the estimateβ€”if the smaller side turns out to be broadcastable (< spark.sql.adaptiveBroadcastJoinThreshold), AQE switches from sort-merge join to broadcast hash join. This can reduce query time from minutes to seconds for cases where statistics were stale.

Shuffle partition coalescing addresses the problem of over-partitioning. When a query starts with spark.sql.shuffle.partitions=200 but the actual data volume only requires 20 partitions, the default behavior creates 200 small files that add unnecessary I/O overhead. AQE monitors the actual size of each partition after shuffle and merges partitions that are below a threshold (spark.sql.adaptive.coalescePartitions.minPartitionSize), reducing the number of partitions to an optimal count. This optimization is particularly valuable for queries with multiple shuffle stages where the data volume decreases at each stage.

Skew detection and resolution is the most sophisticated AQE feature. Data skew occurs when certain keys have disproportionately more records than others, causing some partitions to process far more data than others. AQE detects skew by comparing each partition's size to the median partition size after shuffle. Partitions exceeding a configurable threshold (spark.sql.adaptive.skewJoin.skewedPartitionFactor, default 5) are split into sub-partitions. The split is performed by sampling the skewed partition to identify the offending key(s) and then partitioning those keys into separate sub-partitions while redistributing the remaining keys evenly.

AQE operates at stage boundaries, which are points in the query plan where a shuffle occurs. At each boundary, Spark collects runtime statistics from the completed stage and uses them to re-optimize the plan for subsequent stages. This means AQE cannot optimize within a single stageβ€”it can only optimize the plan for stages that haven't started yet. The number of re-optimization opportunities depends on the query structure; queries with multiple joins and aggregations have more decision points.

The configuration options for AQE are extensive. The most important are spark.sql.adaptive.enabled (master switch), spark.sql.adaptive.coalescePartitions.enabled (enable partition coalescing), spark.sql.adaptive.skewJoin.enabled (enable skew handling), and spark.sql.adaptive.localShuffleReader.enabled (enable local shuffle readers for coalesced partitions). For broadcast join switching, spark.sql.adaptive.autoBroadcastJoinThreshold controls the maximum size for automatic broadcast.

Mathematical Foundations

Definition: Adaptive Query Execution

AQE dynamically re-optimizes query plans during execution based on runtime statistics. A plan PP is re-optimized at stage boundaries when observed statistics S^\hat{S} differ from estimated statistics SS by more than threshold Ο„\tau:

Reoptimize(P)β€…β€ŠβŸΊβ€…β€Šβˆ₯Sβˆ’S^βˆ₯>Ο„\text{Reoptimize}(P) \iff \|S - \hat{S}\| > \tau

Skew Detection

A partition ii is skewed if its size exceeds the median by factor Ξ±\alpha:

Skewed(i)β€…β€ŠβŸΊβ€…β€Šβˆ£Pi∣>Ξ±β‹…median(∣P1∣,…,∣Pk∣)\text{Skewed}(i) \iff |P_i| > \alpha \cdot \text{median}(|P_1|, \ldots, |P_k|)

AQE splits skewed partitions into sub-partitions of target size TT.

Cost-Based Optimization Theorem

Given plan alternatives {P1,…,Pm}\{P_1, \ldots, P_m\} with estimated costs {C1,…,Cm}\{C_1, \ldots, C_m\}, AQE selects:

Pβˆ—=arg⁑min⁑PjCj(S^)P^* = \arg\min_{P_j} C_j(\hat{S})

at each re-optimization point, where S^\hat{S} is the current runtime statistics estimate.

Dynamic Join Strategy

For join of tables AA and BB with runtime sizes ∣A∣^\hat{|A|} and ∣B∣^\hat{|B|}:

Strategy={BroadcastHashif ∣B∣^<thresholdSortMergeotherwise\text{Strategy} = \begin{cases} \text{BroadcastHash} & \text{if } \hat{|B|} < \text{threshold} \\ \text{SortMerge} & \text{otherwise} \end{cases}

AQE switches strategies mid-query when actual sizes differ from estimates.

AQE Overhead

The overhead of AQE is:

OAQE=βˆ‘i=1r(cstatsβ‹…βˆ£Si∣+cplanβ‹…βˆ£Pi∣)O_{\text{AQE}} = \sum_{i=1}^{r} \left( c_{\text{stats}} \cdot |S_i| + c_{\text{plan}} \cdot |P_i| \right)

where rr is the number of re-optimization points. Target: OAQE/Ctotal<0.05O_{\text{AQE}} / C_{\text{total}} < 0.05.

Key Insight

AQE is most beneficial for multi-stage queries with intermediate shuffles where cardinality estimates are unreliable. For simple queries with accurate statistics, AQE overhead may exceed benefit. Enable selectively via spark.sql.adaptive.enabled.

Summary

AQE improves query performance by 20-50% through dynamic plan re-optimization. Key mechanisms include skew split, dynamic join strategy selection, and optimized sort-merge join. The cost of re-optimization must be justified by the improvement in plan quality.

Key Concepts Table

ConceptDescriptionConfiguration
AQE EnabledMaster switch for adaptive executionspark.sql.adaptive.enabled=true
Join Strategy SwitchingDynamically change join type based on runtime statsspark.sql.adaptive.autoBroadcastJoinThreshold
Partition CoalescingMerge small partitions after shufflespark.sql.adaptive.coalescePartitions.enabled=true
Skew DetectionIdentify partitions with disproportionate sizespark.sql.adaptive.skewJoin.skewedPartitionFactor=5
Skew SplittingSplit skewed partitions into sub-partitionsspark.sql.adaptive.skewJoin.skewedPartitionThreshold=256MB
Local Shuffle ReaderRead coalesced partitions without global shufflespark.sql.adaptive.localShuffleReader.enabled=true
Stage BoundaryPoint where AQE collects stats and re-optimizesAutomatic at shuffle boundaries
Runtime StatisticsActual row counts, file sizes, partition sizesCollected at each stage boundary
Min Partition SizeMinimum size for coalesced partitionsspark.sql.adaptive.coalescePartitions.minPartitionSize=1MB
Target Post-Shuffle PartitionsOptimal partition count after coalescingspark.sql.adaptive.advisoryPartitionSizeInMB=64MB

Code Examples

Enabling and Configuring AQE

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("AdaptiveQueryExecution") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.minPartitionSize", "1MB") \
    .config("spark.sql.adaptive.advisoryPartitionSizeInMB", "64") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5") \
    .config("spark.sql.adaptive.skewJoin.skewedPartitionThreshold", "256MB") \
    .config("spark.sql.adaptive.autoBroadcastJoinThreshold", "10MB") \
    .config("spark.sql.adaptive.localShuffleReader.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# Create skewed dataset
from pyspark.sql.functions import rand, when, concat, lit

# Orders with skewed customer distribution (some customers have 100x more orders)
orders_df = spark.range(0, 10_000_000) \
    .withColumn("order_id", concat(lit("ORD-"), col("id"))) \
    .withColumn("customer_id",
        when(rand() < 0.001, concat(lit("VIP-"), (col("id") % 10)))  # 0.1% customers = 50% orders
        .when(rand() < 0.01, concat(lit("PREMIUM-"), (col("id") % 100)))  # 1% customers
        .otherwise(concat(lit("REGULAR-"), (col("id") % 100000)))  # 99% customers
    ) \
    .withColumn("amount", (rand() * 1000 + 10).cast("decimal(10,2)"))

# Small customers table
customers_df = spark.range(0, 100_000) \
    .withColumn("customer_id", concat(lit("REGULAR-"), col("id"))) \
    .withColumn("name", concat(lit("Customer_"), col("id"))) \
    .withColumn("segment", 
        when(rand() < 0.1, "VIP")
        .when(rand() < 0.3, "Premium")
        .otherwise("Regular"))

# Write tables
orders_df.write.mode("overwrite").saveAsTable("skewed_orders")
customers_df.write.mode("overwrite").saveAsTable("skewed_customers")

AQE Join Strategy Switching

# Query that benefits from AQE join switching
# Without AQE: SortMergeJoin (based on stale statistics)
# With AQE: BroadcastHashJoin (based on actual small table size)

# Enable AQE logging to see decisions
spark.sparkContext.setLogLevel("INFO")

# Execute join query
result = spark.sql("""
    SELECT 
        o.order_id,
        o.amount,
        c.name,
        c.segment
    FROM skewed_orders o
    JOIN skewed_customers c ON o.customer_id = c.customer_id
    WHERE c.segment = 'VIP'
""")

# Force execution and collect plan
result.write.mode("overwrite").saveAsTable("aqe_result")

# Check the physical plan after AQE optimization
spark.sql("""
    EXPLAIN FORMATTED
    SELECT o.order_id, o.amount, c.name
    FROM skewed_orders o
    JOIN skewed_customers c ON o.customer_id = c.customer_id
""").show(truncate=False)

# Verify AQE is active
spark.sql("SET -v").filter("spark.sql.adaptive.enabled").show(truncate=False)

Skew Detection and Resolution

# Create heavily skewed data for demonstration
skewed_data = []

# Normal keys (100 rows each)
for i in range(100):
    for j in range(100):
        skewed_data.append((f"key_{i}", f"value_{j}", j * 1.0))

# Skewed key (1M rows)
for j in range(1_000_000):
    skewed_data.append(("skewed_key", f"value_{j}", j * 1.0))

skewed_df = spark.createDataFrame(skewed_data, ["key", "value", "amount"])

# Without AQE: skewed_key creates one massive partition
# With AQE: skewed_key is split across multiple partitions

# Write skewed data
skewed_df.write.mode("overwrite").saveAsTable("heavily_skewed")

# Perform aggregation that exposes skew
spark.sql("""
    SELECT key, COUNT(*) as cnt, SUM(amount) as total
    FROM heavily_skewed
    GROUP BY key
    ORDER BY cnt DESC
""").show(10, truncate=False)

# Join with skew
skewed_df2 = spark.createDataFrame(
    [(f"key_{i}", f"lookup_{i}") for i in range(100)] + [("skewed_key", "skewed_lookup")],
    ["key", "lookup_value"]
)

# This join will be optimized by AQE to handle the skewed key
result = skewed_df.join(skewed_df2, "key")
result.write.mode("overwrite").saveAsTable("skew_resolved_result")

# Verify skew was handled
spark.sql("""
    EXPLAIN FORMATTED
    SELECT * FROM heavily_skewed h
    JOIN (SELECT key, lookup_value FROM 
          (VALUES ('key_0', 'v0'), ('skewed_key', 'sv')) AS t(key, lookup_value)
    ) l ON h.key = l.key
""").show(truncate=False)

Partition Coalescing

# Demonstrate partition coalescing
# Start with high shuffle partition count
spark.conf.set("spark.sql.shuffle.partitions", "500")

# Small dataset that doesn't need 500 partitions
small_df = spark.range(0, 10_000) \
    .withColumn("key", col("id") % 100) \
    .withColumn("value", rand())

# Without AQE: 500 partitions (most empty or tiny)
# With AQE: Coalesced to optimal count based on actual data size

result = small_df.groupBy("key").agg(sum("value").alias("total"))

# Check number of partitions in the plan
result.write.mode("overwrite").saveAsTable("coalesced_result")

# Verify partition count was reduced
spark.sql("""
    SELECT 
        input_file_name(),
        count(*) as row_count
    FROM coalesced_result
    GROUP BY input_file_name()
""").show(truncate=False)

# Compare with explicit coalesce
spark.conf.set("spark.sql.shuffle.partitions", "200")
result_explicit = small_df.repartition(20, "key") \
    .groupBy("key").agg(sum("value").alias("total"))

# AQE automatically determines optimal partition count
spark.conf.set("spark.sql.shuffle.partitions", "500")  # Over-partition
spark.conf.set("spark.sql.adaptive.enabled", "true")

# AQE will coalesce to appropriate count
adaptive_result = small_df.groupBy("key").agg(sum("value").alias("total"))
adaptive_result.write.mode("overwrite").saveAsTable("adaptive_coalesced")

Performance Metrics

MetricWithout AQEWith AQE (Join Switch)With AQE (Skew Resolve)With AQE (Coalesce)
Join Execution Time120-180 sec5-15 sec30-60 secN/A
Skew Impact10-50x slowerN/A2-5x fasterN/A
Shuffle Partition CountFixed (200)Fixed (200)Fixed (200)Dynamic (10-50)
Small File CountHigh (200)High (200)High (200)Low (10-50)
Memory UsageHigh (skew)Low (broadcast)Moderate (split)Low (coalesced)
Query Planning Time1-2 sec2-4 sec3-6 sec2-3 sec
Stage Re-optimizationNone1-2 per query1-3 per query1-2 per query
Statistics CollectionPre-execution onlyRuntime per stageRuntime per stageRuntime per stage
Plan AdaptationStaticDynamic join typeDynamic partition splitDynamic partition count
Total Query ImprovementBaseline10-30x faster2-10x faster20-50% faster

Best Practices

  1. Always enable AQE (spark.sql.adaptive.enabled=true) for Spark 3.x+ workloadsβ€”it provides automatic optimization with minimal overhead
  2. Set spark.sql.adaptive.autoBroadcastJoinThreshold to 10-100MB based on driver memory to enable automatic broadcast join switching
  3. Configure skew detection with spark.sql.adaptive.skewJoin.skewedPartitionFactor=5 for most workloads; increase to 10 for naturally skewed data
  4. Use spark.sql.adaptive.coalescePartitions.minPartitionSize=1MB to prevent creating partitions smaller than 1MB
  5. Monitor AQE decisions in Spark UI under the "Adaptive Execution" section to understand what optimizations were applied
  6. Keep spark.sql.shuffle.partitions high initially (200-1000) and let AQE coalesce to the optimal countβ€”this avoids under-partitioning
  7. Test with spark.sql.adaptive.enabled=false periodically to measure AQE's impact on your specific workloads
  8. Use EXPLAIN FORMATTED after query execution to see the post-AQE optimized plan
  9. Combine AQE with dynamic partition pruning for maximum benefit on partitioned tables
  10. Increase driver memory for AQE workloads since it collects and processes runtime statistics at each stage boundary
  11. Avoid disabling AQE for production workloads unless you have specific compatibility issuesβ€”overriding defaults should be rare
  12. Use spark.sql.adaptive.skewJoin.enabled=true explicitly even though it's the default, to ensure it's not overridden by other configurations

See also: Snowflake Time Travel (snowflake/02), Kafka CDC (kafka/04), Airflow DAGs (airflow/02)

Advertisement

Need Expert PySpark Help?

Get personalized Spark optimization, cluster tuning, or production data pipeline consulting.

Advertisement