πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Catalyst Optimizer

Apache SparkInternals⭐ Premium

Advertisement

Catalyst Optimizer

Difficulty: Senior Level | Companies: Databricks, Netflix, Uber, Apple, Airbnb

Understanding Catalyst's Pipeline

Catalyst translates SQL/DataFrame operations through four phases: Analysis, Logical Optimization, Physical Planning, and Code Generation.

The Four Phases

from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder \
    .appName("CatalystOptimizer") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# Create a query that exercises Catalyst
df = spark.read.parquet("hdfs://data/orders") \
    .filter(F.col("amount") > 100) \
    .join(spark.read.parquet("hdfs://data/customers"), "customer_id") \
    .groupBy("region") \
    .agg(F.sum("amount").alias("total"))

# View all plan stages
print("=== UNRESOLVED LOGICAL PLAN ===")
df.explain(mode="simple")

print("\n=== ANALYZED LOGICAL PLAN ===")
# After analysis (resolved references, type checking)
df.explain(mode="extended")

print("\n=== OPTIMIZED LOGICAL PLAN ===")
# After optimization rules applied
df.explain(mode="formatted")

print("\n=== PHYSICAL PLAN ===")
# After physical planning (chosen execution strategy)
df.explain(mode="cost")

ℹ️

Interview Insight: Catalyst is rule-based in Spark 2.x but adds cost-based optimization (CBO) in Spark 3.x. Understanding each phase helps you write queries that Catalyst can optimize effectively.

Analysis Phase

The analyzer resolves references and validates types using the Catalog.

# Catalyst's analysis phase:
# 1. Resolves UnresolvedRelations to concrete tables
# 2. Looks up column references in the schema
# 3. Type checking and coercion
# 4. Resolves functions to their implementations

# Example: Column resolution
df = spark.read.parquet("hdfs://data/users")
result = df.select("name", "age")  # Columns resolved here

# Type coercion happens automatically
df.withColumn("amount_str", F.col("amount").cast("string"))
df.withColumn("date_col", F.to_date(F.lit("2024-01-01")))

# Check if Catalyst resolved references correctly
result.explain(mode="extended")  # Shows analyzed plan

Logical Optimization Rules

Catalyst applies dozens of optimization rules to the logical plan.

Predicate Pushdown

# Catalyst pushes filters as close to data source as possible
df = spark.read.parquet("hdfs://data/events")

# This filter is pushed down to Parquet reader
result = df \
    .filter(F.col("event_type") == "click") \
    .filter(F.col("amount") > 100)

# Check that filters are pushed to the data source
result.explain(mode="formatted")
# Look for "PushedFilters" in the Parquet scan

# Catalyst also pushes past joins
orders = spark.read.parquet("hdfs://data/orders")
customers = spark.read.parquet("hdfs://data/customers")

result = orders \
    .join(customers, "customer_id") \
    .filter(F.col("order_date") > "2024-01-01")

# Filter on order_date is pushed before the join
result.explain(mode="formatted")

Column Pruning

# Catalyst eliminates unnecessary columns early
df = spark.read.parquet("hdfs://data/wide-table")  # 100+ columns

# Only read the columns you need
result = df.select("col1", "col2", "col3") \
    .filter(F.col("col1") > 100)

# Check that only needed columns are read
result.explain(mode="formatted")
# Look for column list in the scan node

# Column pruning also applies to nested structures
df = spark.read.parquet("hdfs://data/nested")
result = df.select(
    F.col("nested_struct.field1"),
    F.col("array_col")[0]
)

Constant Folding and Strength Reduction

# Catalyst evaluates constant expressions at compile time
df = spark.read.parquet("hdfs://data/sales")

result = df.withColumn("tax", F.col("amount") * 0.08) \
    .withColumn("total", F.col("amount") + F.col("tax")) \
    .filter(F.col("total") > 100)

# Catalyst computes: amount * 0.08 + amount > 100
# Simplifies to: amount * 1.08 > 100
# Simplifies to: amount > 92.59
result.explain(mode="formatted")

# Strength reduction: converts expensive operations to cheaper ones
result = df.filter(F.year(F.col("date")) == 2024)
# May be converted to date range filter

ℹ️

Key Rule: Write filter conditions in a way that Catalyst can optimize. Use direct column comparisons rather than UDFs when possible.

Physical Planning

Catalyst generates multiple physical plans and selects the best one.

# Catalyst considers multiple join strategies
orders = spark.read.parquet("hdfs://data/orders")
customers = spark.read.parquet("hdfs://data/customers")

# Physical plan options:
# 1. SortMergeJoin (default for large tables)
# 2. BroadcastHashJoin (if small table fits in memory)
# 3. ShuffleHashJoin (medium tables)
# 4. BroadcastNestedLoopJoin (no join key)

# Check which strategy was chosen
result = orders.join(customers, "customer_id")
result.explain(mode="cost")  # Shows cost estimates

# Force specific join strategy
from pyspark.sql.functions import broadcast
result = orders.join(broadcast(customers), "customer_id")
result.explain(mode="formatted")

Cost-Based Optimization (Spark 3.x)

# Enable CBO for better plan selection
spark.conf.set("spark.sql.cbo.enabled", "true")
spark.conf.set("spark.sql.statistics.histogram.enabled", "true")

# Collect table statistics
spark.sql("ANALYZE TABLE orders COMPUTE STATISTICS")
spark.sql("ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS customer_id, amount")

# Now CBO uses statistics for better decisions
result = orders.join(customers, "customer_id")
result.explain(mode="cost")  # Shows cost estimates with statistics

Whole-Stage Code Generation

Spark 3.x uses Tungsten's whole-stage code generation to produce optimized bytecode.

# Whole-stage code generation merges multiple operators
df = spark.read.parquet("hdfs://data/events")

# This query benefits from code generation
result = df \
    .filter(F.col("amount") > 100) \
    .withColumn("tax", F.col("amount") * 0.08) \
    .groupBy("category") \
    .agg(F.sum("amount"), F.count("*"))

# Check code generation in physical plan
result.explain(mode="formatted")
# Look for "(2) WholeStageCodegen" nodes

# Control code generation
spark.conf.set("spark.sql.codegen.wholeStage", "true")
spark.conf.set("spark.sql.codegen.maxFields", "100")
spark.conf.set("spark.sql.codegen.maxNestedFields", "50")

⚠️

Warning: Very complex queries with many joins or UDFs may not benefit from code generation. UDFs break the code generation pipeline.

Custom Catalyst Rules (Advanced)

# You can extend Catalyst with custom rules (Scala/Java only)
# This is typically done in library development

# Example: Custom optimization rule in Scala
# class MyCustomRule extends Rule[LogicalPlan] {
#   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
#     case Filter(condition, child) =>
#       // Custom filter optimization logic
#       optimizeFilter(condition, child)
#   }
# }

# For PySpark, use Spark's built-in extensions
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CustomCatalyst") \
    .config("spark.sql.extensions", 
            "com.example.MyCustomRule") \
    .getOrCreate()

Debugging Catalyst Optimizations

# Detailed plan analysis
df = spark.read.parquet("hdfs://data/complex-query")

result = df \
    .join(spark.read.parquet("hdfs://data/dim1"), "key1") \
    .join(spark.read.parquet("hdfs://data/dim2"), "key2") \
    .filter(F.col("status") == "active") \
    .groupBy("category") \
    .agg(F.sum("amount"))

# Simple plan
result.explain(mode="simple")

# Extended plan with analysis
result.explain(mode="extended")

# Formatted plan with node details
result.explain(mode="formatted")

# Cost-based plan with estimates
result.explain(mode="cost")

# Most detailed
result.explain(mode="everything")

# Check if Catalyst applied specific optimizations
plan = result._jdf.queryExecution().optimizedPlan()
print(plan.toString())  # Shows optimized logical plan

ℹ️

Key Takeaway: Catalyst's optimization pipeline transforms your query through analysis, logical optimization, physical planning, and code generation. Understanding this pipeline helps you write queries that Catalyst can optimize effectively.

Follow-Up Questions

  • How does Catalyst handle UDFs in the optimization pipeline?
  • Explain the difference between logical plan and physical plan. When is each created?
  • What are the limitations of Catalyst's cost-based optimization in Spark 3.x?
  • How does predicate pushdown interact with partitioned tables?
  • Describe how Catalyst optimizes window functions.

Advertisement