β‘ PySpark SQL Engine
DfSpark SQL Engine
The Spark SQL Engine is the computational backend that translates SQL queries and DataFrame operations into optimized RDD computations. It consists of the Catalyst optimizer (logical/physical plan optimization) and the Tungsten execution engine (memory management and code generation).
DfWhole-Stage Code Generation
Whole-stage code generation (Tungsten) compiles multiple operators into a single optimized JVM function, eliminating virtual function calls and leveraging CPU registers and SIMD instructions. This can improve query performance by 2xβ10x.
Tungsten Memory Format
Here,
- =Bit array marking NULL values (1 bit per column)
- =Fixed-width values stored sequentially (8 bytes each)
- =Offset pointers to variable-length data
- =Variable-width values (strings, arrays, maps)
Catalyst optimization rules include: Constant Folding, Boolean Simplification, Filter Pushdown, Column Pruning, Join Reordering, and Subquery Elimination. These rules are applied iteratively until no further improvements are found.
Enable AQE (Adaptive Query Execution) in Spark 3.x to automatically optimize shuffle partition count, convert sort-merge joins to broadcast joins at runtime, and handle data skew.
ThTungsten Memory Efficiency
Theorem: Tungsten's off-heap binary row format achieves memory efficiency β₯ 2x compared to Java object serialization by eliminating object headers, pointer indirection, and GC pressure. This allows processing datasets larger than heap size through managed off-heap allocation.
- Spark SQL = Catalyst (optimizer) + Tungsten (execution engine)
- Catalyst: Analysis β Optimization β Physical Planning β Code Generation
- Tungsten: Off-heap binary format eliminates GC overhead, whole-stage codegen eliminates virtual calls
- AQE (Adaptive Query Execution) provides runtime optimization
- Predicate pushdown reduces data volume before shuffle/join operations
ποΈ Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SPARK SQL ENGINE ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β API Layer (DataFrame/SQL) β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β βDataFrameβ β SQL β β Hive β β JDBC β β β
β β β API β β Queries β β QL β β JDBC β β β
β β ββββββ¬βββββ ββββββ¬βββββ ββββββ¬βββββ ββββββ¬βββββ β β
β βββββββββΌβββββββββββββΌβββββββββββββΌβββββββββββββΌβββββββββ β
β β β β β β
β βΌ βΌ βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β SQL Context / SparkSession β β
β β ββββββββββββ ββββββββββββ ββββββββββββ β β
β β β Parser ββ β Analyzer ββ β Optimizer β β β
β β ββββββββββββ ββββββββββββ ββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Catalyst Optimizer Pipeline β β
β β β β
β β ββββββββββββ ββββββββββββ ββββββββββββ β β
β β β Logical ββ β Logical ββ β Physical β β β
β β β Plan 1 β β Plan 2 β β Plans β β β
β β ββββββββββββ ββββββββββββ ββββββββββββ β β
β β β β β β β
β β β β β β β
β β ββββββ΄βββββ ββββββββ΄βββββββ βββββ΄βββββ β β
β β βAnalysis β β Optimizationβ β Cost β β β
β β β Rules β β Rules β β Model β β β
β β βββββββββββ βββββββββββββββ ββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Tungsten Execution Engine β β
β β ββββββββββββ ββββββββββββ ββββββββββββ β β
β β β Code β β Memory β β Shuffle β β β
β β βGenerationβ β Manager β β Manager β β β
β β ββββββββββββ ββββββββββββ ββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Spark Core (RDD Execution) β β
β β ββββββββββββ ββββββββββββ ββββββββββββ β β
β β β Task β β Stage β β DAG β β β
β β βScheduler β β Schedulerβ β Schedulerβ β β
β β ββββββββββββ ββββββββββββ ββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β CATALYST OPTIMIZER RULES CATALOG β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β ANALYSIS RULES (Schema Resolution) β β
β β βββ ResolveRelations: Table β Catalog lookup β β
β β βββ ResolveReferences: Column β Attribute binding β β
β β βββ ResolveFunctions: Function β Registry lookup β β
β β βββ ResolveSubquery: Subquery β LogicalPlan β β
β β βββ TypeCoercion: Implicit type casting β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β OPTIMIZATION RULES (Plan Rewriting) β β
β β βββ PushDownPredicate: Filter β Scan β β
β β βββ PruneColumns: Select only needed columns β β
β β βββ ConstantFolding: Evaluate constants at compile β β
β β βββ EliminateSorts: Remove unnecessary sorts β β
β β βββ CombineFilters: Merge adjacent filters β β
β β βββ CollapseProject: Merge adjacent projections β β
β β βββ PushThroughAggregate: Push predicate past agg β β
β β βββ PushDownJoinPredicates: Filter push to join sides β β
β β βββ ReorderJoin: Optimal join order β β
β β βββ ConvertToIn: Subquery β IN expression β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β PHYSICAL PLANNING RULES β β
β β βββ JoinSelection: Broadcast/SortMerge/HashJoin β β
β β βββ EnsureRequirements: Add shuffles/sorts β β
β β βββ Partitioning: Map partitions to physical layout β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β STRATEGIES (Rule Ordering) β β
β β 1. Resolution β 2. Optimization β 3. Planning β β
β β Each strategy applies rules until fixpoint β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β QUERY EXECUTION FLOW β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β SQL Query: "SELECT dept, AVG(salary) FROM emp WHERE age>30 β
β GROUP BY dept HAVING AVG(salary) > 75000" β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Step 1: PARSING β β
β β Input: SQL string β AST (Abstract Syntax Tree) β β
β β Output: Unresolved Logical Plan β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Step 2: ANALYSIS β β
β β β’ Resolve table "emp" from catalog β β
β β β’ Bind columns "dept", "salary", "age" β β
β β β’ Resolve AVG function β β
β β Output: Resolved Logical Plan β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Step 3: OPTIMIZATION β β
β β β’ Push filter (age > 30) to scan β β
β β β’ Prune unused columns β β
β β β’ Optimize aggregation order β β
β β Output: Optimized Logical Plan β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Step 4: PHYSICAL PLANNING β β
β β β’ Choose hash aggregate vs sort aggregate β β
β β β’ Decide partitioning strategy β β
β β β’ Select exchange (shuffle) strategy β β
β β Output: Physical Plan (multiple candidates) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Step 5: CODE GENERATION (Tungsten) β β
β β β’ Whole-stage code generation β β
β β β’ Generate JVM bytecode β β
β β β’ Compile to native code β β
β β Output: RDD[InternalRow] execution plan β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Step 6: EXECUTION β β
β β β’ Execute RDD plan on cluster β β
β β β’ Schedule tasks across executors β β
β β β’ Return results to driver or write to storage β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Detailed Explanation
1. Spark SQL Engine Overview
Spark SQL is a Spark module for structured data processing. It provides:
- A programming abstraction called DataFrames and Datasets
- A SQL query engine that can run SQL/HiveQL queries
- Integration with Spark's ecosystem (Spark Streaming, MLlib)
- Connectivity to various data sources (Parquet, JSON, JDBC, Hive)
The engine consists of three main components:
- Catalyst Optimizer: Transforms logical plans into optimized physical plans
- Tungsten Execution Engine: Efficient runtime execution with code generation
- Data Source API: Integration with various storage systems
2. Catalyst Optimizer Deep Dive
The Catalyst optimizer is a framework for manipulating query plans. It uses:
- Trees: Representations of query plans
- Rules: Transformations applied to trees
- Strategies: Ordering of rule application
Optimization Techniques:
Predicate Pushdown:
Before: Project(name) β Filter(age > 30) β Scan(all columns)
After: Project(name) β Scan(name, age) β Filter(age > 30)
Column Pruning:
Before: Project(name, age) β Scan(id, name, age, salary, dept)
After: Project(name, age) β Scan(name, age)
Constant Folding:
Before: Filter(age > 25 + 5)
After: Filter(age > 30)
Join Reordering:
Before: (A β B) β C
After: A β (B β C) -- if BβC is smaller
3. Tungsten Execution Engine
Tungsten focuses on three areas:
Memory Management:
- Off-heap memory to avoid GC overhead
- Binary row format for data storage
- Cache-aware algorithms (hash joins, sorts)
Code Generation:
- Whole-stage code generation (since Spark 2.0)
- Generates tight JVM bytecode loops
- Eliminates virtual function calls
- Enables CPU pipeline optimization
Columnar Processing:
- Vectorized operations (batch processing)
- SIMD-friendly data layout
- Efficient compression (dictionary, run-length, delta encoding)
4. Physical Plan Strategies
Join Selection:
- Broadcast Hash Join: For small tables (< 10MB default)
- Sort-Merge Join: For large tables with sorted data
- Shuffle Hash Join: For medium-sized tables
- Cartesian Product: For cross joins (avoid!)
Aggregation Strategies:
- Hash Aggregation: Fast, uses hash table
- Sort Aggregation: Better for sorted input
- Tungsten Aggregation: Optimized with code generation
5. Shuffle and Exchange Operations
Shuffles occur when data needs to be redistributed across partitions:
- Wide transformations (groupBy, join, repartition)
- Data exchange between stages
- Most expensive operation in Spark
Shuffle Optimization:
- Use broadcast joins to avoid shuffle
- Partition data to minimize shuffle
- Use bucketing for repeated joins
- Tune shuffle partition count
6. Data Source API
Spark supports multiple data sources:
- File formats: Parquet, ORC, JSON, CSV, Text
- JDBC: Relational databases
- Hive: Hive tables
- Custom: Through DataSource API v2
Predicate Pushdown to Sources:
Parquet: Row group filtering via statistics
ORC: Bloom filter pushdown
JDBC: SQL WHERE clause pushdown
7. Performance Monitoring
Spark UI Metrics:
- SQL tab: Query plans, execution times
- Stages tab: Task distribution, shuffle read/write
- Storage tab: Cached data, memory usage
Key Metrics:
scanTime: Time to read datashuffleReadTime: Time to read shuffle datashuffleWriteTime: Time to write shuffle datataskDuration: Time per taskgcTime: Garbage collection time
π Key Concepts Table
| Concept | Description | Optimization |
|---|---|---|
| Catalyst | Query optimizer for logical/physical plans | Automatic plan optimization |
| Tungsten | Execution engine with code generation | Whole-stage code generation |
| Predicate Pushdown | Filter pushed to data source | Reduce I/O |
| Column Pruning | Read only required columns | Reduce memory/I/O |
| Constant Folding | Evaluate constants at compile time | Reduce runtime computation |
| Broadcast Join | Small table broadcast to executors | Avoid shuffle |
| Sort-Merge Join | Join sorted partitions | Efficient for large tables |
| Hash Aggregation | Use hash table for aggregation | Fast for groupBy |
| Shuffle | Data redistribution across partitions | Most expensive operation |
| Whole-Stage Codegen | Generate tight JVM bytecode | Eliminate virtual calls |
π» Code Examples
Example 1: SQL Queries with Catalyst
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("SparkSQLEngine") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.getOrCreate()
# Create tables
employees = spark.createDataFrame([
(1, "Alice", "Engineering", 30, 75000),
(2, "Bob", "Marketing", 25, 65000),
(3, "Charlie", "Engineering", 35, 90000),
(4, "Diana", "Marketing", 28, 70000),
(5, "Eve", "Engineering", 32, 85000),
(6, "Frank", "Sales", 29, 60000),
(7, "Grace", "Sales", 31, 68000)
], ["id", "name", "department", "age", "salary"])
employees.createOrReplaceTempView("employees")
# Simple query
result = spark.sql("""
SELECT department,
COUNT(*) as emp_count,
AVG(salary) as avg_salary,
MAX(age) as max_age
FROM employees
WHERE age > 25
GROUP BY department
HAVING COUNT(*) > 1
ORDER BY avg_salary DESC
""")
result.show()
# Explain the query plan
result.explain(True)
Example 2: Catalyst Optimizer Rules
# Show optimized plan
df = spark.sql("""
SELECT e.name, e.salary, d.location
FROM employees e
JOIN departments d ON e.department = d.name
WHERE e.age > 30
AND e.salary > 70000
""")
# Print physical plan with optimizations
df.explain(True)
# Output shows:
# == Parsed Logical Plan ==
# ...
# == Analyzed Logical Plan ==
# ...
# == Optimized Logical Plan ==
# ... (with predicate pushdown, column pruning)
# == Physical Plan ==
# ... (with join strategy, exchange)
Example 3: Adaptive Query Execution (Spark 3.0+)
# Enable AQE
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Complex query that benefits from AQE
result = spark.sql("""
SELECT
department,
COUNT(*) as cnt,
AVG(salary) as avg_sal,
PERCENTILE(salary, 0.5) as median_sal
FROM employees
GROUP BY department
HAVING COUNT(*) >= 2
""")
# AQE will:
# 1. Coalesce partitions after shuffle
# 2. Handle data skew automatically
# 3. Optimize join strategies based on runtime stats
result.show()
# Check AQE optimizations in UI
print(f"Number of partitions: {result.rdd.getNumPartitions()}")
Example 4: Data Source Optimization
# Read Parquet with predicate pushdown
parquet_df = spark.read.parquet("employee_data.parquet")
# Filter pushed to Parquet reader
filtered = parquet_df.filter(
(parquet_df.age > 30) &
(parquet_df.salary > 70000)
)
# Only reads relevant row groups
filtered.explain()
# Write with bucketing for future joins
employees.write \
.bucketBy(10, "department") \
.sortBy("id") \
.saveAsTable("bucketed_employees")
# Bucketed join avoids shuffle
spark.sql("""
SELECT /*+ MAPJOIN(e2) */
e1.name, e2.name as manager
FROM bucketed_employees e1
JOIN employees e2 ON e1.department = e2.department
""").show()
π Performance Metrics
| Query Pattern | Without Catalyst | With Catalyst | Improvement |
|---|---|---|---|
| Simple SELECT | 150ms | 45ms | 3.3x |
| Filter + Agg | 320ms | 85ms | 3.8x |
| JOIN + WHERE | 850ms | 210ms | 4.0x |
| Subquery | 1200ms | 280ms | 4.3x |
| Window Function | 450ms | 120ms | 3.8x |
| Read Parquet | 200ms | 60ms | 3.3x |
| Write Parquet | 300ms | 150ms | 2.0x |
| Shuffle | 500ms | 350ms | 1.4x |
β Best Practices
1. Enable Adaptive Query Execution
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
2. Use Broadcast Joins for Small Tables
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")
# Or configure threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
3. Cache Frequently Used DataFrames
df.cache() # For DataFrames reused multiple times
# Or persist with specific storage level
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK_SER)
4. Use Appropriate File Formats
# Parquet for columnar storage
df.write.parquet("output/", compression="snappy")
# ORC for Hive integration
df.write.orc("output/")
# Avoid CSV/JSON for large datasets
5. Partition Data Strategically
# Partition by frequently filtered columns
df.write.partitionBy("year", "month").parquet("output/")
# Use bucketing for join optimization
df.write.bucketBy(100, "user_id").saveAsTable("users_bucketed")
6. Monitor Query Plans
# Always check explain plans
df.explain(True)
# Look for:
# - BroadcastHashSortMergeJoin (good for small tables)
# - FileScan with pushed filters (predicate pushdown)
# - Project with only needed columns (column pruning)
See Also
- Kafka Streams (kafka/03): SQL integration with Kafka for streaming queries
- Data Engineering Streaming (data-engineering/022): Spark SQL engine in streaming pipelines