Spark SQL: Predicate Pushdown, Column Pruning, Join Reorder
Difficulty: Expert | Companies: Databricks, Snowflake, Meta, Google, Amazon
βΉοΈInterview Context
SQL optimization questions test understanding of Catalyst optimizer rules. Interviewers expect knowledge of how each optimization rule works and when it doesn't apply.
Question
Explain how Spark SQL's Catalyst optimizer applies predicate pushdown, column pruning, and join reordering. What are the limitations of each optimization? How does the cost-based optimizer (CBO) decide between different physical plans?
Detailed Answer
1. Catalyst Optimization Rules Overview
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
.appName("SQLOptimization") \
.config("spark.sql.cbo.enabled", "true") \
.config("spark.sql.statistics.histogram.enabled", "true") \
.getOrCreate()
# Example query with multiple optimization opportunities:
result = spark.sql("""
SELECT o.order_id, c.name, p.product_name, o.amount
FROM orders o
JOIN customers c ON o.customer_id = c.id
JOIN products p ON o.product_id = p.id
WHERE c.country = 'USA'
AND o.amount > 100
AND p.category = 'Electronics'
ORDER BY o.amount DESC
LIMIT 100
""")
2. Predicate Pushdown
Predicate pushdown moves filter conditions as close to the data source as possible, reducing I/O.
# Without predicate pushdown:
# Scan ALL rows β Filter β Join β Filter β Sort β Limit
# With predicate pushdown:
# Scan (with filter) β Join β Sort β Limit
# Only relevant rows are read from storage
# Catalyst rule: PushThroughJoin
# Pushes predicates through joins when columns exist on both sides
# Example execution plan:
result.explain(True)
# == Optimized Logical Plan ==
# *(2) Sort [amount#45 DESC], false, 0
# +- *(2) GlobalLimit 101
# +- *(2) LocalLimit 101
# +- *(2) Project [order_id#42, name#51, product_name#62, amount#45]
# +- *(2) BroadcastHashJoin [product_id#44], [id#60], BuildRight
# :- *(2) BroadcastHashJoin [customer_id#43], [id#49], BuildLeft
# : :- *(2) Filter ((amount#45 > 100) AND isnotnull(customer_id#43))
# : : +- *(2) FileScan parquet [order_id, customer_id, product_id, amount]
# : : PushedFilters: [IsNotNull(customer_id), GreaterThan(amount,100)]
# : : ReadSchema: <struct with only needed columns>
# : +- BroadcastExchange ...
# : +- *(1) Filter (isnotnull(id#49) AND (country#53 = USA))
# : +- *(1) FileScan parquet [id, name, country]
# : PushedFilters: [IsNotNull(id), EqualTo(country,USA)]
# +- BroadcastExchange ...
# +- *(1) Filter (isnotnull(id#60) AND (category#64 = Electronics))
# +- *(1) FileScan parquet [id, product_name, category]
# PushedFilters: [IsNotNull(id), EqualTo(category,Electronics)]
# Predicate Pushdown Mathematical Analysis:
#
# Let:
# N = total rows in table
# F = filter selectivity (fraction passing filter, 0 < F β€ 1)
# B = I/O block size
# C = cost per row read
#
# Without pushdown:
# I/O cost = N Γ C (read all rows)
# Processing cost = N Γ filter_cost
# Total = N Γ C + N Γ filter_cost = N Γ (C + filter_cost)
#
# With pushdown:
# I/O cost = N Γ C Γ F (only matching rows read)
# Processing cost = N Γ C Γ F Γ filter_cost
# Total = N Γ C Γ F Γ (1 + filter_cost)
#
# Savings = N Γ C Γ (1 - F) Γ (1 + filter_cost)
# For F = 0.01 (1% selectivity): savings = 99% of I/O cost
# Limitations of predicate pushdown:
# 1. UDFs β Catalyst cannot push predicates through Python UDFs
udf_filter = F.udf(lambda x: x > 100, "boolean")
df = spark.table("orders").filter(udf_filter(F.col("amount")))
df.explain(True) # UDF will NOT be pushed to scan
# 2. Non-deterministic functions
df = spark.table("orders").filter(F.rand() > 0.5) # NOT pushed down
# 3. Complex expressions with side effects
df = spark.table("orders").filter(F.current_timestamp() > F.col("deadline"))
# 4. Null semantics β IS NULL cannot be pushed for complex types
3. Column Pruning
Column pruning eliminates reading columns that are not needed for the query.
# Without column pruning:
# SELECT name FROM users β Read ALL columns, then project name
# With column pruning:
# SELECT name FROM users β Read only name column
# Catalyst rule: ColumnPruning
# Applied after analysis, before physical planning
# Example:
df = spark.table("wide_table") # 100 columns
result = df.select("col1", "col2") # Only need 2 columns
result.explain(True)
# == Optimized Logical Plan ==
# *(1) Project [col1#1, col2#2]
# +- *(1) FileScan parquet [col1#1, col2#2] β only 2 columns read!
# ReadSchema: <struct with 2 fields>
# Column pruning benefits:
# 1. Reduced I/O: read fewer bytes from storage
# 2. Reduced memory: less data in executor memory
# 3. Reduced network: less data transferred in shuffle
# Quantitative impact:
# Wide table: 100 columns, 1KB per row
# Without pruning: 100KB per row
# With pruning (2 columns): 2KB per row
# I/O reduction: 98%
# Limitations of column pruning:
# 1. Star schema with many joins β columns from fact table may be needed
# for join even if not in SELECT
# 2. Columns used in complex expressions
df = spark.table("events").withColumn(
"score", F.col("col_a") * F.col("col_b") + F.col("col_c")
).select("score")
# col_a, col_b, col_c cannot be pruned
# 3. Columns in WHERE clause
df = spark.table("events").filter(F.col("status") == "active").select("name")
# status column cannot be pruned
# 4. Shuffle β all columns needed for join key must be materialized
4. Join Reordering
Catalyst reorders joins to minimize intermediate result size.
# Original join order: A β B β C
# Reordered: B β C β A (if smaller intermediate)
# Catalyst rule: ReorderJoin
# Uses a greedy algorithm to find optimal join order
# Example:
# orders (1M rows) β customers (100K rows) β products (10K rows)
# Without reordering:
# Step 1: orders β customers β 1M intermediate rows
# Step 2: (1M intermediate) β products β 1M result rows
# With reordering:
# Step 1: customers β products β 100K intermediate rows (smaller!)
# Step 2: orders β (100K intermediate) β 1M result rows
# Catalyst heuristic:
# 1. Start with smallest tables (by row count)
# 2. Join with next smallest that shares a join key
# 3. Continue until all tables joined
# Cost-based join reordering (Spark 3.x):
spark.conf.set("spark.sql.cbo.enabled", "true")
spark.conf.set("spark.sql.statistics.histogram.enabled", "true")
# CBO uses table statistics to compute join costs:
# Cost(A β B) = |A| + |B| + |A| Γ |B| / max(unique_keys(A), unique_keys(B))
#
# For sort-merge join:
# Cost_SMJ = O(|A| Γ log(|A|) + |B| Γ log(|B|))
#
# For broadcast hash join:
# Cost_BHJ = O(|A| + |B|) (if |B| < broadcast_threshold)
5. Cost-Based Optimizer (CBO)
# CBO requires table statistics:
# 1. Row count
# 2. Column statistics (min, max, distinct count, null count)
# 3. Histograms (for skewed data distributions)
# Compute statistics:
spark.sql("ANALYZE TABLE orders COMPUTE STATISTICS")
spark.sql("ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS customer_id, amount")
spark.sql("ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS customer_id, amount histogram")
# Verify statistics:
spark.sql("DESCRIBE FORMATTED orders").show()
# Look for: Statistics, Size in Bytes, # Rows
# CBO decision example:
# Query: SELECT * FROM orders WHERE customer_id = 12345
#
# Plan A: Full table scan + filter
# Cost = N (scan all rows)
# Estimated rows after filter = N / distinct(customer_id)
#
# Plan B: Index lookup (if available)
# Cost = log(N) (B-tree lookup)
# Estimated rows after filter = exact count
#
# CBO chooses Plan B if cost is lower
# Enable CBO features:
spark.conf.set("spark.sql.cbo.enabled", "true")
spark.conf.set("spark.sql.statistics.histogram.enabled", "true")
spark.conf.set("spark.sql.adaptive.enabled", "true")
6. Advanced Optimization Rules
# Rule 1: Constant Folding
# Simplifies constant expressions at compile time
df = spark.table("events").filter(F.lit(1) + F.lit(1) == F.lit(2))
# Becomes: filter(True) β filter removed entirely
# Rule 2: Null Propagation
# Pushes IS NULL checks through operations
df = spark.table("events").filter(F.col("id").isNotNull())
# If id is primary key (NOT NULL constraint), filter is removed
# Rule 3: Limit Pushdown
# Pushes LIMIT into table scans
df = spark.table("events").limit(100)
# Reads only 100 rows from scan (if supported by format)
# Rule 4: Subquery Elimination
# Converts correlated subqueries to joins
df = spark.sql("""
SELECT * FROM orders
WHERE customer_id IN (SELECT id FROM customers WHERE country = 'USA')
""")
# Becomes: orders JOIN customers ON customer_id = id AND country = 'USA'
# Rule 5: Aggregate Pushdown
# Pushes aggregations into data sources that support it
df = spark.table("parquet_table").groupBy("category").count()
# If Parquet file has statistics, aggregation may be pushed down
7. Query Plan Analysis
# Analyzing query plans for optimization opportunities:
def analyze_query_plan(df, label=""):
print(f"\n{'='*60}")
print(f"Query Plan: {label}")
print('='*60)
# Unresolved Logical Plan
df.explain(extended=True)
# Key metrics to look for:
# 1. PushedFilters β predicates pushed to scan
# 2. ReadSchema β columns being read
# 3. PartitionFilters β partition pruning applied
# 4. PushedAggregates β aggregation pushdown
# 5. BroadcastExchange β broadcast joins used
# 6. SortMergeJoin vs BroadcastHashJoin β join strategy
# Compare before/after optimization:
df_original = spark.table("orders").join(
spark.table("customers"), "customer_id"
).filter(F.col("amount") > 100)
df_optimized = spark.table("orders").filter(F.col("amount") > 100).join(
spark.table("customers"), "customer_id"
)
analyze_query_plan(df_original, "Before optimization")
analyze_query_plan(df_optimized, "After predicate pushdown")
β οΈCommon Pitfall
Predicate pushdown doesn't work with Python UDFs because Catalyst treats them as opaque functions. Always prefer built-in functions or Pandas UDFs for optimization-friendly code.
π‘Interview Tip
When discussing CBO, mention that it requires accurate statistics. If statistics are stale or missing, CBO may make worse decisions than the rule-based optimizer. Always ANALYZE TABLE after large data changes.
Summary
| Optimization | What It Does | When It Works | Limitations |
|---|---|---|---|
| Predicate Pushdown | Reduces I/O by filtering early | Column filters on source | UDFs, non-deterministic functions |
| Column Pruning | Reads only needed columns | Column projections | Complex expressions, shuffle |
| Join Reorder | Minimizes intermediate results | Multi-way joins | Statistics dependency |
| CBO | Cost-based plan selection | With accurate statistics | Stale statistics can hurt |
These optimizations work together in Catalyst to produce highly efficient execution plans from declarative SQL queries.