πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Tungsten Execution Engine

Apache SparkInternals⭐ Premium

Advertisement

Tungsten Execution Engine

Difficulty: Senior Level | Companies: Databricks, Netflix, Uber, Apple, Airbnb

Tungsten's Three Pillars

Tungsten optimizes Spark execution through: Memory Management, Cache-Aware Algorithms, and Whole-Stage Code Generation.

Off-Heap Memory Management

from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder \
    .appName("TungstenEngine") \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size", "4g") \
    .config("spark.sql.inMemoryColumnarStorage.compressed", "true") \
    .config("spark.sql.inMemoryColumnarStorage.batchSize", "10000") \
    .getOrCreate()

# Tungsten uses compact binary format instead of JVM objects
# This reduces memory usage and GC pressure

df = spark.read.parquet("hdfs://data/events")
df.cache()  # Stored in Tungsten's binary format
df.count()  # Materialize cache

# Check storage format in Spark UI
# Storage tab shows "Format: External append-only"

ℹ️

Interview Insight: Tungsten's binary format uses 30-50% less memory than Java serialization. It stores data in compact rows with null bitmap, avoiding object headers and alignment padding.

Binary Row Format

# Tungsten stores rows in a compact binary format
# Each row is: [null bitmap | fixed-length data | variable-length data]

# Example: Schema with mixed types
schema = "id INT, name STRING, amount DOUBLE, is_active BOOLEAN"

# Internal representation (conceptual):
# Row bytes: [null bitmap (N bytes)] [id (4B)] [amount (8B)] [is_active (1B)] [name offset] [name bytes]

# This format enables:
# 1. Cache-friendly sequential access
# 2. SIMD-friendly operations
# 3. Reduced memory bandwidth usage

# Verify Tungsten is active
df = spark.read.parquet("hdfs://data/large")
result = df.groupBy("category").agg(F.sum("amount"))

# Check physical plan for Tungsten operators
result.explain(mode="formatted")
# Look for "TungstenAggregate", "TungstenSort"

Whole-Stage Code Generation

Tungsten generates optimized Java bytecode that eliminates virtual function calls and leverages CPU registers.

# Whole-stage code generation merges multiple operators
df = spark.read.parquet("hdfs://data/transactions")

# This query benefits from code generation
result = df \
    .filter(F.col("amount") > 100) \
    .withColumn("tax", F.col("amount") * 0.08) \
    .withColumn("total", F.col("amount") + F.col("tax")) \
    .groupBy("category", "region") \
    .agg(
        F.sum("total").alias("grand_total"),
        F.count("*").alias("count")
    )

# Check code generation boundaries
result.explain(mode="formatted")
# Look for "*(N) WholeStageCodegen" nodes
# Each node generates a single optimized function

# Control code generation
spark.conf.set("spark.sql.codegen.wholeStage", "true")
spark.conf.set("spark.sql.codegen.maxFields", "100")
spark.conf.set("spark.sql.codegen.fallback", "true")

⚠️

Warning: UDFs break the code generation pipeline. Each UDF creates a code generation boundary, reducing performance. Prefer built-in functions over UDFs.

Cache-Aware Algorithms

Tungsten optimizes for modern CPU architectures with cache-aware algorithms.

Sort Algorithms

# Tungsten uses radix sort for large datasets
df = spark.read.parquet("hdfs://data/events")

# Sort operations use Tungsten's optimized sort
sorted_df = df.orderBy("timestamp", "user_id")

# Check sort strategy in physical plan
sorted_df.explain(mode="formatted")
# Look for "TungstenSort" or "SortMerge"

# Radix sort is O(n) for fixed-length keys
# Compared to Arrays.sort() which is O(n log n)

# Verify Tungsten sort is used
spark.conf.set("spark.sql.sort.mergeJoin.execBuffer.maxThreshold", "64m")

Hash Aggregation

# Tungsten uses hash aggregation with optimized hash tables
df = spark.read.parquet("hdfs://data/sales")

# Hash aggregation is faster than sort-based aggregation
result = df \
    .groupBy("product_id", "region") \
    .agg(
        F.sum("amount").alias("total"),
        F.avg("amount").alias("average"),
        F.count("*").alias("count")
    )

# Check aggregation strategy
result.explain(mode="formatted")
# Look for "TungstenAggregate" vs "SortAggregate"

# Tungsten hash table uses open addressing with linear probing
# Better cache locality than chaining

Memory Layout Optimization

# Tungsten arranges data for optimal cache performance
df = spark.read.parquet("hdfs://data/wide-table")

# Columnar storage batches data for cache efficiency
df.cache()

# Tungsten stores data in column batches
# Each batch contains multiple rows of the same column
# This enables:
# 1. Better compression (similar values together)
# 2. SIMD vectorization (process multiple values at once)
# 3. Cache-friendly access patterns

# Configure batch size
spark.conf.set("spark.sql.inMemoryColumnarStorage.batchSize", "10000")
spark.conf.set("spark.sql.inMemoryColumnarStorage.compressed", "true")
spark.conf.set("spark.sql.inMemoryColumnarStorage.vectorized", "true")

Vectorized Execution (Spark 3.x)

# Spark 3.x adds vectorized execution for Parquet/ORC
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "true")
spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")

# Vectorized reader processes multiple values per instruction
df = spark.read.parquet("hdfs://data/columnar")

# Check if vectorized reader is used
df.explain(mode="formatted")
# Look for "VectorizedParquetRecordReader" in scan node

# Compare performance
import time

start = time.time()
df.filter(F.col("amount") > 100).count()
vectorized_time = time.time() - start

spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")

start = time.time()
df.filter(F.col("amount") > 100).count()
non_vectorized_time = time.time() - start

print(f"Vectorized: {vectorized_time:.2f}s, Non-vectorized: {non_vectorized_time:.2f}s")

ℹ️

Performance Note: Vectorized execution can be 2-10x faster for columnar formats. Always enable it for Parquet/ORC workloads.

Tungsten vs Traditional Execution

# Traditional Spark (pre-Tungsten):
# - JVM objects with headers (16 bytes overhead per object)
# - Java sorting (O(n log n))
# - Interpreted execution (virtual function calls)

# Tungsten:
# - Compact binary rows (no headers)
# - Radix sort (O(n) for fixed-length keys)
# - Compiled code (direct function calls)

# Example: Comparing memory usage
import sys

# Traditional object-based
class TraditionalRow:
    def __init__(self, id, name, amount):
        self.id = id
        self.name = name
        self.amount = amount

row = TraditionalRow(1, "test", 100.0)
traditional_size = sys.getsizeof(row)

# Tungsten binary format (conceptual)
# Row: [null_bitmap][id:4B][amount:8B][name_offset:4B][name_bytes]
tungsten_size = 1 + 4 + 8 + 4 + len("test")

print(f"Traditional: ~{traditional_size} bytes")
print(f"Tungsten: ~{tungsten_size} bytes")
print(f"Savings: ~{traditional_size - tungsten_size} bytes per row")

Debugging Tungsten Performance

# Monitor Tungsten-specific metrics
spark = SparkSession.builder \
    .appName("TungstenDebug") \
    .config("spark.eventLog.enabled", "true") \
    .config("spark.eventLog.dir", "hdfs://logs/spark-events") \
    .getOrCreate()

# Check if Tungsten is being used
df = spark.read.parquet("hdfs://data/events")

# Check physical plan for Tungsten operators
result = df.groupBy("key").agg(F.sum("value"))
result.explain(mode="formatted")

# Monitor memory usage
def check_tungsten_memory():
    # Access Tungsten's memory manager
    # In production, use Spark's metrics system
    pass

# Check code generation boundaries
spark.conf.set("spark.sql.codegen.fallback", "true")
result = df.withColumn("processed", F.upper(F.col("text")))
result.explain(mode="formatted")
# Look for code generation boundaries

ℹ️

Key Takeaway: Tungsten optimizes Spark execution through compact binary storage, cache-aware algorithms, and whole-stage code generation. Understanding these internals helps you write performance-optimized Spark code.

Follow-Up Questions

  • How does Tungsten's binary format handle schema evolution?
  • Explain the difference between Tungsten sort and Java's Arrays.sort().
  • What are the limitations of whole-stage code generation?
  • How does vectorized execution interact with complex nested types?
  • Describe how Tungsten manages memory when data exceeds off-heap capacity.

Advertisement