PySpark SQL Engine: Execution, Catalyst Optimizer, and Tungsten

Free Lesson

Advertisement

⚑ PySpark SQL Engine

DfSpark SQL Engine

The Spark SQL Engine is the computational backend that translates SQL queries and DataFrame operations into optimized RDD computations. It consists of the Catalyst optimizer (logical/physical plan optimization) and the Tungsten execution engine (memory management and code generation).

DfWhole-Stage Code Generation

Whole-stage code generation (Tungsten) compiles multiple operators into a single optimized JVM function, eliminating virtual function calls and leveraging CPU registers and SIMD instructions. This can improve query performance by 2x–10x.

Costplan=Costscan+Costfilter+Costproject+Costjoin+CostaggCost_{plan} = Cost_{scan} + Cost_{filter} + Cost_{project} + Cost_{join} + Cost_{agg}

Tungsten Memory Format

Rowbinary=[nullbitmap∣fixedlengthcolumns∣variablelengthoffsets∣variablelengthcolumns]Row_{binary} = [null\\_bitmap | fixed\\_length\\_columns | variable\\_length\\_offsets | variable\\_length\\_columns]

Here,

  • null_bitmapnull\_bitmap=Bit array marking NULL values (1 bit per column)
  • fixed_length_columnsfixed\_length\_columns=Fixed-width values stored sequentially (8 bytes each)
  • variable_length_offsetsvariable\_length\_offsets=Offset pointers to variable-length data
  • variable_length_columnsvariable\_length\_columns=Variable-width values (strings, arrays, maps)

Catalyst optimization rules include: Constant Folding, Boolean Simplification, Filter Pushdown, Column Pruning, Join Reordering, and Subquery Elimination. These rules are applied iteratively until no further improvements are found.

Enable AQE (Adaptive Query Execution) in Spark 3.x to automatically optimize shuffle partition count, convert sort-merge joins to broadcast joins at runtime, and handle data skew.

ThTungsten Memory Efficiency

Theorem: Tungsten's off-heap binary row format achieves memory efficiency β‰₯ 2x compared to Java object serialization by eliminating object headers, pointer indirection, and GC pressure. This allows processing datasets larger than heap size through managed off-heap allocation.

  • Spark SQL = Catalyst (optimizer) + Tungsten (execution engine)
  • Catalyst: Analysis β†’ Optimization β†’ Physical Planning β†’ Code Generation
  • Tungsten: Off-heap binary format eliminates GC overhead, whole-stage codegen eliminates virtual calls
  • AQE (Adaptive Query Execution) provides runtime optimization
  • Predicate pushdown reduces data volume before shuffle/join operations

πŸ—οΈ Architecture Diagram

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                 SPARK SQL ENGINE ARCHITECTURE                    β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                  API Layer (DataFrame/SQL)               β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β”‚
β”‚  β”‚  β”‚DataFrameβ”‚  β”‚  SQL    β”‚  β”‚  Hive   β”‚  β”‚  JDBC   β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  API    β”‚  β”‚ Queries β”‚  β”‚  QL     β”‚  β”‚  JDBC   β”‚  β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜  β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚          β”‚            β”‚            β”‚            β”‚              β”‚
β”‚          β–Ό            β–Ό            β–Ό            β–Ό              β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚              SQL Context / SparkSession                   β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”             β”‚   β”‚
β”‚  β”‚  β”‚  Parser  β”‚β†’ β”‚ Analyzer β”‚β†’ β”‚ Optimizer β”‚             β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜             β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                          β”‚                                      β”‚
β”‚                          β–Ό                                      β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚              Catalyst Optimizer Pipeline                  β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”             β”‚   β”‚
β”‚  β”‚  β”‚ Logical  β”‚β†’ β”‚ Logical  β”‚β†’ β”‚ Physical β”‚             β”‚   β”‚
β”‚  β”‚  β”‚  Plan 1  β”‚  β”‚  Plan 2  β”‚  β”‚  Plans   β”‚             β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜             β”‚   β”‚
β”‚  β”‚       ↑              ↑              ↑                   β”‚   β”‚
β”‚  β”‚       β”‚              β”‚              β”‚                   β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”΄β”€β”€β”€β”€β”            β”‚   β”‚
β”‚  β”‚  β”‚Analysis β”‚  β”‚ Optimizationβ”‚  β”‚  Cost  β”‚            β”‚   β”‚
β”‚  β”‚  β”‚  Rules  β”‚  β”‚    Rules    β”‚  β”‚  Model β”‚            β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜            β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                          β”‚                                      β”‚
β”‚                          β–Ό                                      β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚              Tungsten Execution Engine                   β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”             β”‚   β”‚
β”‚  β”‚  β”‚  Code    β”‚  β”‚  Memory  β”‚  β”‚  Shuffle β”‚             β”‚   β”‚
β”‚  β”‚  β”‚Generationβ”‚  β”‚ Manager  β”‚  β”‚  Manager β”‚             β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜             β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                          β”‚                                      β”‚
β”‚                          β–Ό                                      β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚              Spark Core (RDD Execution)                  β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”             β”‚   β”‚
β”‚  β”‚  β”‚  Task    β”‚  β”‚  Stage   β”‚  β”‚  DAG     β”‚             β”‚   β”‚
β”‚  β”‚  β”‚Scheduler β”‚  β”‚ Schedulerβ”‚  β”‚ Schedulerβ”‚             β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜             β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              CATALYST OPTIMIZER RULES CATALOG                    β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚  ANALYSIS RULES (Schema Resolution)                     β”‚   β”‚
β”‚  β”‚  β”œβ”€β”€ ResolveRelations: Table β†’ Catalog lookup           β”‚   β”‚
β”‚  β”‚  β”œβ”€β”€ ResolveReferences: Column β†’ Attribute binding      β”‚   β”‚
β”‚  β”‚  β”œβ”€β”€ ResolveFunctions: Function β†’ Registry lookup       β”‚   β”‚
β”‚  β”‚  β”œβ”€β”€ ResolveSubquery: Subquery β†’ LogicalPlan            β”‚   β”‚
β”‚  β”‚  └── TypeCoercion: Implicit type casting                β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚  OPTIMIZATION RULES (Plan Rewriting)                    β”‚   β”‚
β”‚  β”‚  β”œβ”€β”€ PushDownPredicate: Filter β†’ Scan                   β”‚   β”‚
β”‚  β”‚  β”œβ”€β”€ PruneColumns: Select only needed columns           β”‚   β”‚
β”‚  β”‚  β”œβ”€β”€ ConstantFolding: Evaluate constants at compile     β”‚   β”‚
β”‚  β”‚  β”œβ”€β”€ EliminateSorts: Remove unnecessary sorts           β”‚   β”‚
β”‚  β”‚  β”œβ”€β”€ CombineFilters: Merge adjacent filters             β”‚   β”‚
β”‚  β”‚  β”œβ”€β”€ CollapseProject: Merge adjacent projections        β”‚   β”‚
β”‚  β”‚  β”œβ”€β”€ PushThroughAggregate: Push predicate past agg      β”‚   β”‚
β”‚  β”‚  β”œβ”€β”€ PushDownJoinPredicates: Filter push to join sides  β”‚   β”‚
β”‚  β”‚  β”œβ”€β”€ ReorderJoin: Optimal join order                    β”‚   β”‚
β”‚  β”‚  └── ConvertToIn: Subquery β†’ IN expression              β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚  PHYSICAL PLANNING RULES                                β”‚   β”‚
β”‚  β”‚  β”œβ”€β”€ JoinSelection: Broadcast/SortMerge/HashJoin        β”‚   β”‚
β”‚  β”‚  β”œβ”€β”€ EnsureRequirements: Add shuffles/sorts             β”‚   β”‚
β”‚  β”‚  └── Partitioning: Map partitions to physical layout    β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚  STRATEGIES (Rule Ordering)                             β”‚   β”‚
β”‚  β”‚  1. Resolution β†’ 2. Optimization β†’ 3. Planning         β”‚   β”‚
β”‚  β”‚  Each strategy applies rules until fixpoint             β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  QUERY EXECUTION FLOW                            β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  SQL Query: "SELECT dept, AVG(salary) FROM emp WHERE age>30    β”‚
β”‚              GROUP BY dept HAVING AVG(salary) > 75000"         β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ Step 1: PARSING                                          β”‚   β”‚
β”‚  β”‚ Input: SQL string β†’ AST (Abstract Syntax Tree)          β”‚   β”‚
β”‚  β”‚ Output: Unresolved Logical Plan                         β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚       β”‚                                                         β”‚
β”‚       β–Ό                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ Step 2: ANALYSIS                                         β”‚   β”‚
β”‚  β”‚ β€’ Resolve table "emp" from catalog                      β”‚   β”‚
β”‚  β”‚ β€’ Bind columns "dept", "salary", "age"                  β”‚   β”‚
β”‚  β”‚ β€’ Resolve AVG function                                   β”‚   β”‚
β”‚  β”‚ Output: Resolved Logical Plan                           β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚       β”‚                                                         β”‚
β”‚       β–Ό                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ Step 3: OPTIMIZATION                                     β”‚   β”‚
β”‚  β”‚ β€’ Push filter (age > 30) to scan                        β”‚   β”‚
β”‚  β”‚ β€’ Prune unused columns                                   β”‚   β”‚
β”‚  β”‚ β€’ Optimize aggregation order                             β”‚   β”‚
β”‚  β”‚ Output: Optimized Logical Plan                          β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚       β”‚                                                         β”‚
β”‚       β–Ό                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ Step 4: PHYSICAL PLANNING                                β”‚   β”‚
β”‚  β”‚ β€’ Choose hash aggregate vs sort aggregate               β”‚   β”‚
β”‚  β”‚ β€’ Decide partitioning strategy                          β”‚   β”‚
β”‚  β”‚ β€’ Select exchange (shuffle) strategy                    β”‚   β”‚
β”‚  β”‚ Output: Physical Plan (multiple candidates)             β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚       β”‚                                                         β”‚
β”‚       β–Ό                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ Step 5: CODE GENERATION (Tungsten)                       β”‚   β”‚
β”‚  β”‚ β€’ Whole-stage code generation                           β”‚   β”‚
β”‚  β”‚ β€’ Generate JVM bytecode                                 β”‚   β”‚
β”‚  β”‚ β€’ Compile to native code                                β”‚   β”‚
β”‚  β”‚ Output: RDD[InternalRow] execution plan                 β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚       β”‚                                                         β”‚
β”‚       β–Ό                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ Step 6: EXECUTION                                         β”‚   β”‚
β”‚  β”‚ β€’ Execute RDD plan on cluster                           β”‚   β”‚
β”‚  β”‚ β€’ Schedule tasks across executors                       β”‚   β”‚
β”‚  β”‚ β€’ Return results to driver or write to storage          β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ“š Detailed Explanation

1. Spark SQL Engine Overview

Spark SQL is a Spark module for structured data processing. It provides:

  • A programming abstraction called DataFrames and Datasets
  • A SQL query engine that can run SQL/HiveQL queries
  • Integration with Spark's ecosystem (Spark Streaming, MLlib)
  • Connectivity to various data sources (Parquet, JSON, JDBC, Hive)

The engine consists of three main components:

  1. Catalyst Optimizer: Transforms logical plans into optimized physical plans
  2. Tungsten Execution Engine: Efficient runtime execution with code generation
  3. Data Source API: Integration with various storage systems

2. Catalyst Optimizer Deep Dive

The Catalyst optimizer is a framework for manipulating query plans. It uses:

  • Trees: Representations of query plans
  • Rules: Transformations applied to trees
  • Strategies: Ordering of rule application

Optimization Techniques:

Predicate Pushdown:

Architecture Diagram
Before: Project(name) β†’ Filter(age > 30) β†’ Scan(all columns)
After:  Project(name) β†’ Scan(name, age) β†’ Filter(age > 30)

Column Pruning:

Architecture Diagram
Before: Project(name, age) β†’ Scan(id, name, age, salary, dept)
After:  Project(name, age) β†’ Scan(name, age)

Constant Folding:

Architecture Diagram
Before: Filter(age > 25 + 5)
After:  Filter(age > 30)

Join Reordering:

Architecture Diagram
Before: (A β‹ˆ B) β‹ˆ C
After:  A β‹ˆ (B β‹ˆ C)  -- if Bβ‹ˆC is smaller

3. Tungsten Execution Engine

Tungsten focuses on three areas:

Memory Management:

  • Off-heap memory to avoid GC overhead
  • Binary row format for data storage
  • Cache-aware algorithms (hash joins, sorts)

Code Generation:

  • Whole-stage code generation (since Spark 2.0)
  • Generates tight JVM bytecode loops
  • Eliminates virtual function calls
  • Enables CPU pipeline optimization

Columnar Processing:

  • Vectorized operations (batch processing)
  • SIMD-friendly data layout
  • Efficient compression (dictionary, run-length, delta encoding)

4. Physical Plan Strategies

Join Selection:

  • Broadcast Hash Join: For small tables (< 10MB default)
  • Sort-Merge Join: For large tables with sorted data
  • Shuffle Hash Join: For medium-sized tables
  • Cartesian Product: For cross joins (avoid!)

Aggregation Strategies:

  • Hash Aggregation: Fast, uses hash table
  • Sort Aggregation: Better for sorted input
  • Tungsten Aggregation: Optimized with code generation

5. Shuffle and Exchange Operations

Shuffles occur when data needs to be redistributed across partitions:

  • Wide transformations (groupBy, join, repartition)
  • Data exchange between stages
  • Most expensive operation in Spark

Shuffle Optimization:

  • Use broadcast joins to avoid shuffle
  • Partition data to minimize shuffle
  • Use bucketing for repeated joins
  • Tune shuffle partition count

6. Data Source API

Spark supports multiple data sources:

  • File formats: Parquet, ORC, JSON, CSV, Text
  • JDBC: Relational databases
  • Hive: Hive tables
  • Custom: Through DataSource API v2

Predicate Pushdown to Sources:

Architecture Diagram
Parquet: Row group filtering via statistics
ORC: Bloom filter pushdown
JDBC: SQL WHERE clause pushdown

7. Performance Monitoring

Spark UI Metrics:

  • SQL tab: Query plans, execution times
  • Stages tab: Task distribution, shuffle read/write
  • Storage tab: Cached data, memory usage

Key Metrics:

  • scanTime: Time to read data
  • shuffleReadTime: Time to read shuffle data
  • shuffleWriteTime: Time to write shuffle data
  • taskDuration: Time per task
  • gcTime: Garbage collection time

πŸ”‘ Key Concepts Table

ConceptDescriptionOptimization
CatalystQuery optimizer for logical/physical plansAutomatic plan optimization
TungstenExecution engine with code generationWhole-stage code generation
Predicate PushdownFilter pushed to data sourceReduce I/O
Column PruningRead only required columnsReduce memory/I/O
Constant FoldingEvaluate constants at compile timeReduce runtime computation
Broadcast JoinSmall table broadcast to executorsAvoid shuffle
Sort-Merge JoinJoin sorted partitionsEfficient for large tables
Hash AggregationUse hash table for aggregationFast for groupBy
ShuffleData redistribution across partitionsMost expensive operation
Whole-Stage CodegenGenerate tight JVM bytecodeEliminate virtual calls

πŸ’» Code Examples

Example 1: SQL Queries with Catalyst

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SparkSQLEngine") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

# Create tables
employees = spark.createDataFrame([
    (1, "Alice", "Engineering", 30, 75000),
    (2, "Bob", "Marketing", 25, 65000),
    (3, "Charlie", "Engineering", 35, 90000),
    (4, "Diana", "Marketing", 28, 70000),
    (5, "Eve", "Engineering", 32, 85000),
    (6, "Frank", "Sales", 29, 60000),
    (7, "Grace", "Sales", 31, 68000)
], ["id", "name", "department", "age", "salary"])

employees.createOrReplaceTempView("employees")

# Simple query
result = spark.sql("""
    SELECT department, 
           COUNT(*) as emp_count,
           AVG(salary) as avg_salary,
           MAX(age) as max_age
    FROM employees
    WHERE age > 25
    GROUP BY department
    HAVING COUNT(*) > 1
    ORDER BY avg_salary DESC
""")

result.show()

# Explain the query plan
result.explain(True)

Example 2: Catalyst Optimizer Rules

# Show optimized plan
df = spark.sql("""
    SELECT e.name, e.salary, d.location
    FROM employees e
    JOIN departments d ON e.department = d.name
    WHERE e.age > 30
    AND e.salary > 70000
""")

# Print physical plan with optimizations
df.explain(True)

# Output shows:
# == Parsed Logical Plan ==
# ...
# == Analyzed Logical Plan ==
# ...
# == Optimized Logical Plan ==
# ... (with predicate pushdown, column pruning)
# == Physical Plan ==
# ... (with join strategy, exchange)

Example 3: Adaptive Query Execution (Spark 3.0+)

# Enable AQE
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

# Complex query that benefits from AQE
result = spark.sql("""
    SELECT 
        department,
        COUNT(*) as cnt,
        AVG(salary) as avg_sal,
        PERCENTILE(salary, 0.5) as median_sal
    FROM employees
    GROUP BY department
    HAVING COUNT(*) >= 2
""")

# AQE will:
# 1. Coalesce partitions after shuffle
# 2. Handle data skew automatically
# 3. Optimize join strategies based on runtime stats
result.show()

# Check AQE optimizations in UI
print(f"Number of partitions: {result.rdd.getNumPartitions()}")

Example 4: Data Source Optimization

# Read Parquet with predicate pushdown
parquet_df = spark.read.parquet("employee_data.parquet")

# Filter pushed to Parquet reader
filtered = parquet_df.filter(
    (parquet_df.age > 30) & 
    (parquet_df.salary > 70000)
)

# Only reads relevant row groups
filtered.explain()

# Write with bucketing for future joins
employees.write \
    .bucketBy(10, "department") \
    .sortBy("id") \
    .saveAsTable("bucketed_employees")

# Bucketed join avoids shuffle
spark.sql("""
    SELECT /*+ MAPJOIN(e2) */
        e1.name, e2.name as manager
    FROM bucketed_employees e1
    JOIN employees e2 ON e1.department = e2.department
""").show()

πŸ“Š Performance Metrics

Query PatternWithout CatalystWith CatalystImprovement
Simple SELECT150ms45ms3.3x
Filter + Agg320ms85ms3.8x
JOIN + WHERE850ms210ms4.0x
Subquery1200ms280ms4.3x
Window Function450ms120ms3.8x
Read Parquet200ms60ms3.3x
Write Parquet300ms150ms2.0x
Shuffle500ms350ms1.4x

βœ… Best Practices

1. Enable Adaptive Query Execution

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

2. Use Broadcast Joins for Small Tables

from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")
# Or configure threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")

3. Cache Frequently Used DataFrames

df.cache()  # For DataFrames reused multiple times
# Or persist with specific storage level
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK_SER)

4. Use Appropriate File Formats

# Parquet for columnar storage
df.write.parquet("output/", compression="snappy")

# ORC for Hive integration
df.write.orc("output/")

# Avoid CSV/JSON for large datasets

5. Partition Data Strategically

# Partition by frequently filtered columns
df.write.partitionBy("year", "month").parquet("output/")

# Use bucketing for join optimization
df.write.bucketBy(100, "user_id").saveAsTable("users_bucketed")

6. Monitor Query Plans

# Always check explain plans
df.explain(True)

# Look for:
# - BroadcastHashSortMergeJoin (good for small tables)
# - FileScan with pushed filters (predicate pushdown)
# - Project with only needed columns (column pruning)

See Also

  • Kafka Streams (kafka/03): SQL integration with Kafka for streaming queries
  • Data Engineering Streaming (data-engineering/022): Spark SQL engine in streaming pipelines

Advertisement

Need Expert PySpark Help?

Get personalized Spark optimization, cluster tuning, or production data pipeline consulting.

Advertisement