β‘ Adaptive Query Execution in PySpark
Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β AQE ARCHITECTURE OVERVIEW β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β Physical ββββββΆβ Query ββββββΆβ Stage β β
β β Plan β β Execution β β Boundary β β
β ββββββββββββββββ ββββββββ¬ββββββββ ββββββββ¬ββββββββ β
β β β β
β βΌ βΌ β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β Runtime Stats β β Plan β β
β β Collection β β Re-Optimization β β
β β βββββββββββββ β β βββββββββββββ β β
β β Row counts β β Join strategy β β
β β File sizes β β Shuffle part. β β
β β Column stats β β Sort strategy β β
β ββββββββββ¬ββββββββββ ββββββββββ¬ββββββββββ β
β β β β
β βΌ βΌ β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β Skew Detection β β Dynamic β β
β β & Resolution β β Partitioning β β
β β βββββββββββββ β β βββββββββββββ β β
β β Split skewed β β Coalesce small β β
β β partitions β β partitions β β
β ββββββββββ¬ββββββββββ ββββββββββ¬ββββββββββ β
β β β β
β βΌ βΌ β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β Post-Stage β β Optimized β β
β β Optimization β β Execution β β
β β βββββββββββββ β β βββββββββββββ β β
β β Merge join β β Better plans β β
β β partitions β β based on actual β β
β ββββββββββββββββββββ β data sizes β β
β ββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β AQE RE-OPTIMIZATION DECISION POINTS β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Original Query Plan: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β SortMergeJoin(A, B) β β
β β βββ Sort(A.customer_id) β β
β β β βββ Scan(A) [estimated: 1M rows] β β
β β βββ Sort(B.customer_id) β β
β β βββ Scan(B) [estimated: 10K rows] β WRONG ESTIMATE β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β After Stage 1 (Scan A complete): β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Runtime Stats: A has 5M rows (actual), not 1M β β
β β β β
β β AQE Decision: Switch to BroadcastHashJoin β β
β β (B is still small at 10K rows) β β
β β β β
β β Optimized Plan: β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β BroadcastHashJoin(A, B) β β β
β β β βββ Scan(A) [actual: 5M rows] β β β
β β β βββ BroadcastExchange(B) β β β
β β β βββ Scan(B) [actual: 10K rows] β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Decision Points in AQE: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β Stage 1 Complete βββΆ Re-optimize join strategy β β
β β β β β
β β βΌ β β
β β Stage 2 Complete βββΆ Re-optimize shuffle partitions β β
β β β β β
β β βΌ β β
β β Stage 3 Complete βββΆ Detect & resolve skew β β
β β β β β
β β βΌ β β
β β Final Stage βββΆ Coalesce final output partitions β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SKEW DETECTION & RESOLUTION β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Before AQE (Skewed Data): β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Partition 0: 100 MB (normal) β β
β β Partition 1: 100 MB (normal) β β
β β Partition 2: 50 GB (SKEWED!) β bottleneck β β
β β Partition 3: 100 MB (normal) β β
β β ... β β
β β Partition 99: 100 MB (normal) β β
β β β β
β β Total time = 50 GB processing = HOURS β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β After AQE (Skew Resolved): β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Partition 0: 100 MB β β
β β Partition 1: 100 MB β β
β β Partition 2a: 16.7 GB β split from 2 β β
β β Partition 2b: 16.7 GB β split from 2 β β
β β Partition 2c: 16.7 GB β split from 2 β β
β β Partition 3: 100 MB β β
β β ... β β
β β Partition 99: 100 MB β β
β β β β
β β Total time = 16.7 GB processing = MINUTES β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β AQE Skew Detection Algorithm: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β 1. Collect partition sizes after shuffle β β
β β 2. Compute statistics: mean, stddev, median β β
β β 3. Identify partitions where: β β
β β size > median Γ skew_threshold (default: 5) β β
β β AND size > min_partition_size (default: 64 MB) β β
β β 4. For each skewed partition: β β
β β a. Sample the partition to find the skew key(s) β β
β β b. Split into sub-partitions by skew key(s) β β
β β c. Rebalance remaining keys across partitions β β
β β 5. Update physical plan with new partition assignments β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Detailed Explanation
Adaptive Query Execution (AQE) is Spark's framework for dynamically optimizing query plans at runtime based on actual data statistics rather than estimated statistics. Traditional query optimization relies on table statistics collected before query execution (via ANALYZE TABLE), which can be significantly inaccurate due to data drift, skewed distributions, or stale metadata. AQE addresses this by re-evaluating and modifying the physical plan at stage boundaries, where actual data characteristics become available.
The core innovation of AQE is its ability to make three types of dynamic optimizations: join strategy switching, shuffle partition coalescing, and skew partition splitting. Join strategy switching occurs when the actual size of one join partner differs significantly from the estimateβif the smaller side turns out to be broadcastable (< spark.sql.adaptiveBroadcastJoinThreshold), AQE switches from sort-merge join to broadcast hash join. This can reduce query time from minutes to seconds for cases where statistics were stale.
Shuffle partition coalescing addresses the problem of over-partitioning. When a query starts with spark.sql.shuffle.partitions=200 but the actual data volume only requires 20 partitions, the default behavior creates 200 small files that add unnecessary I/O overhead. AQE monitors the actual size of each partition after shuffle and merges partitions that are below a threshold (spark.sql.adaptive.coalescePartitions.minPartitionSize), reducing the number of partitions to an optimal count. This optimization is particularly valuable for queries with multiple shuffle stages where the data volume decreases at each stage.
Skew detection and resolution is the most sophisticated AQE feature. Data skew occurs when certain keys have disproportionately more records than others, causing some partitions to process far more data than others. AQE detects skew by comparing each partition's size to the median partition size after shuffle. Partitions exceeding a configurable threshold (spark.sql.adaptive.skewJoin.skewedPartitionFactor, default 5) are split into sub-partitions. The split is performed by sampling the skewed partition to identify the offending key(s) and then partitioning those keys into separate sub-partitions while redistributing the remaining keys evenly.
AQE operates at stage boundaries, which are points in the query plan where a shuffle occurs. At each boundary, Spark collects runtime statistics from the completed stage and uses them to re-optimize the plan for subsequent stages. This means AQE cannot optimize within a single stageβit can only optimize the plan for stages that haven't started yet. The number of re-optimization opportunities depends on the query structure; queries with multiple joins and aggregations have more decision points.
The configuration options for AQE are extensive. The most important are spark.sql.adaptive.enabled (master switch), spark.sql.adaptive.coalescePartitions.enabled (enable partition coalescing), spark.sql.adaptive.skewJoin.enabled (enable skew handling), and spark.sql.adaptive.localShuffleReader.enabled (enable local shuffle readers for coalesced partitions). For broadcast join switching, spark.sql.adaptive.autoBroadcastJoinThreshold controls the maximum size for automatic broadcast.
Mathematical Foundations
Definition: Adaptive Query Execution
AQE dynamically re-optimizes query plans during execution based on runtime statistics. A plan is re-optimized at stage boundaries when observed statistics differ from estimated statistics by more than threshold :
Skew Detection
A partition is skewed if its size exceeds the median by factor :
AQE splits skewed partitions into sub-partitions of target size .
Cost-Based Optimization Theorem
Given plan alternatives with estimated costs , AQE selects:
at each re-optimization point, where is the current runtime statistics estimate.
Dynamic Join Strategy
For join of tables and with runtime sizes and :
AQE switches strategies mid-query when actual sizes differ from estimates.
AQE Overhead
The overhead of AQE is:
where is the number of re-optimization points. Target: .
Key Insight
AQE is most beneficial for multi-stage queries with intermediate shuffles where cardinality estimates are unreliable. For simple queries with accurate statistics, AQE overhead may exceed benefit. Enable selectively via spark.sql.adaptive.enabled.
Summary
AQE improves query performance by 20-50% through dynamic plan re-optimization. Key mechanisms include skew split, dynamic join strategy selection, and optimized sort-merge join. The cost of re-optimization must be justified by the improvement in plan quality.
Key Concepts Table
| Concept | Description | Configuration |
|---|---|---|
| AQE Enabled | Master switch for adaptive execution | spark.sql.adaptive.enabled=true |
| Join Strategy Switching | Dynamically change join type based on runtime stats | spark.sql.adaptive.autoBroadcastJoinThreshold |
| Partition Coalescing | Merge small partitions after shuffle | spark.sql.adaptive.coalescePartitions.enabled=true |
| Skew Detection | Identify partitions with disproportionate size | spark.sql.adaptive.skewJoin.skewedPartitionFactor=5 |
| Skew Splitting | Split skewed partitions into sub-partitions | spark.sql.adaptive.skewJoin.skewedPartitionThreshold=256MB |
| Local Shuffle Reader | Read coalesced partitions without global shuffle | spark.sql.adaptive.localShuffleReader.enabled=true |
| Stage Boundary | Point where AQE collects stats and re-optimizes | Automatic at shuffle boundaries |
| Runtime Statistics | Actual row counts, file sizes, partition sizes | Collected at each stage boundary |
| Min Partition Size | Minimum size for coalesced partitions | spark.sql.adaptive.coalescePartitions.minPartitionSize=1MB |
| Target Post-Shuffle Partitions | Optimal partition count after coalescing | spark.sql.adaptive.advisoryPartitionSizeInMB=64MB |
Code Examples
Enabling and Configuring AQE
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("AdaptiveQueryExecution") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.minPartitionSize", "1MB") \
.config("spark.sql.adaptive.advisoryPartitionSizeInMB", "64") \
.config("spark.sql.adaptive.skewJoin.enabled", "true") \
.config("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5") \
.config("spark.sql.adaptive.skewJoin.skewedPartitionThreshold", "256MB") \
.config("spark.sql.adaptive.autoBroadcastJoinThreshold", "10MB") \
.config("spark.sql.adaptive.localShuffleReader.enabled", "true") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()
# Create skewed dataset
from pyspark.sql.functions import rand, when, concat, lit
# Orders with skewed customer distribution (some customers have 100x more orders)
orders_df = spark.range(0, 10_000_000) \
.withColumn("order_id", concat(lit("ORD-"), col("id"))) \
.withColumn("customer_id",
when(rand() < 0.001, concat(lit("VIP-"), (col("id") % 10))) # 0.1% customers = 50% orders
.when(rand() < 0.01, concat(lit("PREMIUM-"), (col("id") % 100))) # 1% customers
.otherwise(concat(lit("REGULAR-"), (col("id") % 100000))) # 99% customers
) \
.withColumn("amount", (rand() * 1000 + 10).cast("decimal(10,2)"))
# Small customers table
customers_df = spark.range(0, 100_000) \
.withColumn("customer_id", concat(lit("REGULAR-"), col("id"))) \
.withColumn("name", concat(lit("Customer_"), col("id"))) \
.withColumn("segment",
when(rand() < 0.1, "VIP")
.when(rand() < 0.3, "Premium")
.otherwise("Regular"))
# Write tables
orders_df.write.mode("overwrite").saveAsTable("skewed_orders")
customers_df.write.mode("overwrite").saveAsTable("skewed_customers")
AQE Join Strategy Switching
# Query that benefits from AQE join switching
# Without AQE: SortMergeJoin (based on stale statistics)
# With AQE: BroadcastHashJoin (based on actual small table size)
# Enable AQE logging to see decisions
spark.sparkContext.setLogLevel("INFO")
# Execute join query
result = spark.sql("""
SELECT
o.order_id,
o.amount,
c.name,
c.segment
FROM skewed_orders o
JOIN skewed_customers c ON o.customer_id = c.customer_id
WHERE c.segment = 'VIP'
""")
# Force execution and collect plan
result.write.mode("overwrite").saveAsTable("aqe_result")
# Check the physical plan after AQE optimization
spark.sql("""
EXPLAIN FORMATTED
SELECT o.order_id, o.amount, c.name
FROM skewed_orders o
JOIN skewed_customers c ON o.customer_id = c.customer_id
""").show(truncate=False)
# Verify AQE is active
spark.sql("SET -v").filter("spark.sql.adaptive.enabled").show(truncate=False)
Skew Detection and Resolution
# Create heavily skewed data for demonstration
skewed_data = []
# Normal keys (100 rows each)
for i in range(100):
for j in range(100):
skewed_data.append((f"key_{i}", f"value_{j}", j * 1.0))
# Skewed key (1M rows)
for j in range(1_000_000):
skewed_data.append(("skewed_key", f"value_{j}", j * 1.0))
skewed_df = spark.createDataFrame(skewed_data, ["key", "value", "amount"])
# Without AQE: skewed_key creates one massive partition
# With AQE: skewed_key is split across multiple partitions
# Write skewed data
skewed_df.write.mode("overwrite").saveAsTable("heavily_skewed")
# Perform aggregation that exposes skew
spark.sql("""
SELECT key, COUNT(*) as cnt, SUM(amount) as total
FROM heavily_skewed
GROUP BY key
ORDER BY cnt DESC
""").show(10, truncate=False)
# Join with skew
skewed_df2 = spark.createDataFrame(
[(f"key_{i}", f"lookup_{i}") for i in range(100)] + [("skewed_key", "skewed_lookup")],
["key", "lookup_value"]
)
# This join will be optimized by AQE to handle the skewed key
result = skewed_df.join(skewed_df2, "key")
result.write.mode("overwrite").saveAsTable("skew_resolved_result")
# Verify skew was handled
spark.sql("""
EXPLAIN FORMATTED
SELECT * FROM heavily_skewed h
JOIN (SELECT key, lookup_value FROM
(VALUES ('key_0', 'v0'), ('skewed_key', 'sv')) AS t(key, lookup_value)
) l ON h.key = l.key
""").show(truncate=False)
Partition Coalescing
# Demonstrate partition coalescing
# Start with high shuffle partition count
spark.conf.set("spark.sql.shuffle.partitions", "500")
# Small dataset that doesn't need 500 partitions
small_df = spark.range(0, 10_000) \
.withColumn("key", col("id") % 100) \
.withColumn("value", rand())
# Without AQE: 500 partitions (most empty or tiny)
# With AQE: Coalesced to optimal count based on actual data size
result = small_df.groupBy("key").agg(sum("value").alias("total"))
# Check number of partitions in the plan
result.write.mode("overwrite").saveAsTable("coalesced_result")
# Verify partition count was reduced
spark.sql("""
SELECT
input_file_name(),
count(*) as row_count
FROM coalesced_result
GROUP BY input_file_name()
""").show(truncate=False)
# Compare with explicit coalesce
spark.conf.set("spark.sql.shuffle.partitions", "200")
result_explicit = small_df.repartition(20, "key") \
.groupBy("key").agg(sum("value").alias("total"))
# AQE automatically determines optimal partition count
spark.conf.set("spark.sql.shuffle.partitions", "500") # Over-partition
spark.conf.set("spark.sql.adaptive.enabled", "true")
# AQE will coalesce to appropriate count
adaptive_result = small_df.groupBy("key").agg(sum("value").alias("total"))
adaptive_result.write.mode("overwrite").saveAsTable("adaptive_coalesced")
Performance Metrics
| Metric | Without AQE | With AQE (Join Switch) | With AQE (Skew Resolve) | With AQE (Coalesce) |
|---|---|---|---|---|
| Join Execution Time | 120-180 sec | 5-15 sec | 30-60 sec | N/A |
| Skew Impact | 10-50x slower | N/A | 2-5x faster | N/A |
| Shuffle Partition Count | Fixed (200) | Fixed (200) | Fixed (200) | Dynamic (10-50) |
| Small File Count | High (200) | High (200) | High (200) | Low (10-50) |
| Memory Usage | High (skew) | Low (broadcast) | Moderate (split) | Low (coalesced) |
| Query Planning Time | 1-2 sec | 2-4 sec | 3-6 sec | 2-3 sec |
| Stage Re-optimization | None | 1-2 per query | 1-3 per query | 1-2 per query |
| Statistics Collection | Pre-execution only | Runtime per stage | Runtime per stage | Runtime per stage |
| Plan Adaptation | Static | Dynamic join type | Dynamic partition split | Dynamic partition count |
| Total Query Improvement | Baseline | 10-30x faster | 2-10x faster | 20-50% faster |
Best Practices
- Always enable AQE (
spark.sql.adaptive.enabled=true) for Spark 3.x+ workloadsβit provides automatic optimization with minimal overhead - Set
spark.sql.adaptive.autoBroadcastJoinThresholdto 10-100MB based on driver memory to enable automatic broadcast join switching - Configure skew detection with
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5for most workloads; increase to 10 for naturally skewed data - Use
spark.sql.adaptive.coalescePartitions.minPartitionSize=1MBto prevent creating partitions smaller than 1MB - Monitor AQE decisions in Spark UI under the "Adaptive Execution" section to understand what optimizations were applied
- Keep
spark.sql.shuffle.partitionshigh initially (200-1000) and let AQE coalesce to the optimal countβthis avoids under-partitioning - Test with
spark.sql.adaptive.enabled=falseperiodically to measure AQE's impact on your specific workloads - Use
EXPLAIN FORMATTEDafter query execution to see the post-AQE optimized plan - Combine AQE with dynamic partition pruning for maximum benefit on partitioned tables
- Increase driver memory for AQE workloads since it collects and processes runtime statistics at each stage boundary
- Avoid disabling AQE for production workloads unless you have specific compatibility issuesβoverriding defaults should be rare
- Use
spark.sql.adaptive.skewJoin.enabled=trueexplicitly even though it's the default, to ensure it's not overridden by other configurations
See also: Snowflake Time Travel (snowflake/02), Kafka CDC (kafka/04), Airflow DAGs (airflow/02)