β‘ PySpark UDF Optimization
DfUser Defined Function (UDF)
A UDF is a custom function defined by the user that extends Spark's built-in functions. Python UDFs are serialized via pickle and executed in a separate Python process per executor, incurring serialization overhead per row.
DfPandas UDF (Vectorized UDF)
A Pandas UDF uses Apache Arrow to transfer data between JVM and Python in batches, enabling vectorized operations via pandas/Series. This avoids row-by-row serialization overhead, achieving 3xβ100x speedup over regular Python UDFs.
Pandas UDF Speedup Factor
Here,
- =Execution time with regular Python UDF
- =Execution time with Pandas UDF
- =Arrow batch transfer time (amortized per row)
- =Vectorized computation time via pandas
Python UDFs are 10xβ100x slower than built-in functions due to per-row serialization between JVM and Python. Pandas UDFs reduce this overhead by transferring data in batches via Apache Arrow.
Use built-in Spark SQL functions whenever possible β they are implemented in JVM and optimized by Catalyst. Only use UDFs when built-in functions cannot express the required logic.
ThArrow Batch Transfer Optimization
Theorem: Pandas UDFs achieve a minimum speedup of N_{rows} / B_{batch} over Python UDFs, where B_{batch} is the Arrow batch size (default 1000 rows). Larger batches amortize Arrow overhead further, with diminishing returns beyond B_{batch} = 10,000.
- Python UDFs: row-by-row serialization, 10xβ100x slower than built-in functions
- Pandas UDFs: batch processing via Arrow, 3xβ100x faster than Python UDFs
- Always prefer built-in functions; use UDFs only when necessary
- Tune
arrow.maxRecordsPerBatchfor optimal batch size (default 1000)
ποΈ Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β UDF EXECUTION ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β PYTHON UDF FLOW β β
β β β β
β β βββββββββββββββ β β
β β β Driver β Send UDF function (pickle) β β
β β β (Python) β βββββββββββββββββββββββββββββββββββΊ β β
β β βββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Executor (JVM + Python) β β β
β β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Spark JVM Process β β β β
β β β β βββββββββββββββββββββββββββββββββββ β β β β
β β β β β Row-by-Row Processing β β β β β
β β β β β Row 1 β Python β Result 1 β β β β β
β β β β β Row 2 β Python β Result 2 β β β β β
β β β β β Row 3 β Python β Result 3 β β β β β
β β β β β ... β β β β β
β β β β βββββββββββββββββββββββββββββββββββ β β β β
β β β β β² β β β β
β β β β β Serialization (Py4J) β β β β
β β β β βΌ β β β β
β β β β βββββββββββββββββββββββββββββββββββ β β β β
β β β β β Python Worker Process β β β β β
β β β β β def my_udf(x): β β β β β
β β β β β return x * 2 β β β β β
β β β β βββββββββββββββββββββββββββββββββββ β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β OVERHEAD: Serialization per row + Python process switch β β
β β PERFORMANCE: ~100x slower than native Spark operations β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β PANDAS UDF FLOW β β
β β β β
β β βββββββββββββββ β β
β β β Driver β Send UDF function (pickle) β β
β β β (Python) β βββββββββββββββββββββββββββββββββββΊ β β
β β βββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Executor (JVM + Python) β β β β
β β β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββ β β β β
β β β β Spark JVM Process β β β β β
β β β β βββββββββββββββββββββββββββββββββββ β β β β
β β β β β Batch Processing β β β β β
β β β β β Batch 1 (1000 rows) β Pandas β β β β β
β β β β β Batch 2 (1000 rows) β Pandas β β β β β
β β β β β Batch 3 (1000 rows) β Pandas β β β β β
β β β β βββββββββββββββββββββββββββββββββββ β β β β
β β β β β² β β β β
β β β β β Arrow Serialization β β β β
β β β β βΌ β β β β
β β β β βββββββββββββββββββββββββββββββββββ β β β β
β β β β β Python Worker Process β β β β β
β β β β β def my_pandas_udf(pdf): β β β β β
β β β β β return pdf * 2 β β β β β
β β β β βββββββββββββββββββββββββββββββββββ β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β ADVANTAGE: Vectorized operations, Arrow serialization β β
β β PERFORMANCE: ~10x faster than Python UDFs β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β ARROW SERIALIZATION β β
β β β β
β β JVM (Spark) Python (Pandas) β β
β β ββββββββββββββββ ββββββββββββββββ β β
β β β Columnar β Arrow β Columnar β β β
β β β Format β βββββββββββΊ β Format β β β
β β β (Internal) β Zero-copy β (Pandas) β β β
β β ββββββββββββββββ ββββββββββββββββ β β
β β β β
β β β’ Zero-copy data transfer β β
β β β’ Columnar format (cache-friendly) β β
β β β’ Vectorized operations β β
β β β’ No per-row serialization β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β UDF TYPE COMPARISON MATRIX β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β PERFORMANCE COMPARISON β β
β β β β
β β Operation: Process 1M rows β β
β β β β
β β βββββββββββββββββββ¬βββββββββββ¬βββββββββββ¬βββββββββββ β β
β β β Method β Time β Memory β GC β β β
β β βββββββββββββββββββΌβββββββββββΌβββββββββββΌβββββββββββ€ β β
β β β Native Spark β 100ms β 50MB β Low β β β
β β β Pandas UDF β 500ms β 100MB β Medium β β β
β β β Python UDF β 5000ms β 200MB β High β β β
β β β Python UDF+map β 4500ms β 180MB β High β β β
β β βββββββββββββββββββ΄βββββββββββ΄βββββββββββ΄βββββββββββ β β
β β β β
β β Speedup: Native > Pandas (10x) > Python (100x) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β MEMORY USAGE COMPARISON β β
β β β β
β β Native Spark: β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β InternalRow[] (compact, columnar) β β β
β β β Memory: 1x data size β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β Pandas UDF: β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Arrow β Pandas DataFrame β Arrow β β β
β β β Memory: 2x data size (Arrow + Pandas) β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β Python UDF: β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β InternalRow β Python object β InternalRow β β β
β β β Memory: 3-5x data size (Python objects) β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β SERIALIZATION OVERHEAD β β
β β β β
β β Python UDF (Py4J): β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Row 1: Serialize β Process β Deserialize β β β
β β β Row 2: Serialize β Process β Deserialize β β β
β β β Row 3: Serialize β Process β Deserialize β β β
β β β ... β β β
β β β Overhead: ~100ns per row β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β Pandas UDF (Arrow): β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Batch: Serialize once β Process β Deserialize β β β
β β β Overhead: ~1ms per batch (1000 rows) β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β UDF OPTIMIZATION DECISION TREE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Can you implement with built-in Spark functions? β β
β β β β
β β YES βββββββββββββββββββββββββββββββββββββββββββββββΊ β β
β β β Use Spark functions (fastest) β β
β β β - col(), when(), lit(), etc. β β
β β β - 10-100x faster than UDFs β β
β β β β β
β β NO β β
β β β β β
β β βΌ β β
β β Can you use Pandas/NumPy for vectorized ops? β β
β β β β
β β YES βββββββββββββββββββββββββββββββββββββββββββββββΊ β β
β β β Use Pandas UDF (10x faster) β β
β β β @pandas_udf(returnType) β β
β β β def my_udf(pdf: pd.DataFrame) -> pd.DataFrame β β
β β β β β
β β NO β β
β β β β β
β β βΌ β β
β β Use Python UDF (slowest, but flexible) β β
β β @udf(returnType) β β
β β def my_udf(x): β β
β β return ... β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β OPTIMIZATION CHECKLIST β β
β β β β
β β β‘ Try built-in functions first β β
β β β‘ Use Pandas UDF instead of Python UDF β β
β β β‘ Minimize data serialization β β
β β β‘ Use @pandas_udf("double") for simple transforms β β
β β β‘ Use @pandas_udf(returnType, PandasUDFType.GROUPED_ β β
β β MAP) for group operations β β
β β β‘ Enable Arrow: spark.sql.execution.arrow.enabled=trueβ β
β β β‘ Tune batch size: spark.sql.execution.arrow.maxRecordsβ β
β β PerBatch (default 1000) β β
β β β‘ Avoid global variables in UDFs β β
β β β‘ Use broadcast for large lookup tables β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Detailed Explanation
1. What is a UDF?
A User Defined Function (UDF) is a function defined by the user that can be used in Spark SQL and DataFrame operations. UDFs extend Spark's built-in function library to implement custom logic that isn't available out of the box.
Types of UDFs in PySpark:
- Python UDF: Standard Python function processed row-by-row
- Pandas UDF: Vectorized function using Pandas/NumPy operations
- Grouped Map UDF: Processes groups of rows as Pandas DataFrames
2. Python UDF Performance Characteristics
Python UDFs have significant performance overhead due to:
Serialization Overhead:
- Each row is serialized from JVM to Python (Py4J)
- Python processes the row
- Result is serialized back to JVM
- This happens for EVERY row
Process Switch Overhead:
- Context switching between JVM and Python
- Memory copying between processes
- Garbage collection in both JVM and Python
Python Execution Overhead:
- Python is interpreted (no JIT compilation)
- GIL limits true parallelism
- Object creation/destruction overhead
Performance Impact:
- ~10-100x slower than native Spark operations
- Significant memory overhead (Python object duplication)
- High GC pressure in both JVM and Python
3. Pandas UDF Performance Characteristics
Pandas UDFs (introduced in Spark 2.3) dramatically improve performance by:
Vectorized Processing:
- Process batches of rows at once (default: 1000 rows)
- Use Pandas/NumPy for vectorized operations
- Avoid per-row function call overhead
Arrow Serialization:
- Zero-copy data transfer between JVM and Python
- Columnar format (cache-friendly)
- No per-row serialization/deserialization
Performance Improvement:
- ~10x faster than Python UDFs
- Lower memory usage (Arrow format)
- Better GC behavior
4. When to Use UDFs
Use Built-in Functions When Possible:
# GOOD: Built-in function
from pyspark.sql.functions import col, upper, concat
df.withColumn("name_upper", upper(col("name")))
# BAD: UDF for same thing
@udf(StringType())
def my_upper(s):
return s.upper() if s else None
df.withColumn("name_upper", my_upper(col("name")))
Use Pandas UDF When:
- Complex logic not supported by built-in functions
- Need to use Pandas/NumPy libraries
- Processing batches of rows
- Group-by operations with complex logic
Use Python UDF When:
- Simple row-level logic
- Cannot be vectorized
- Need to call external services
- Legacy code compatibility
5. Pandas UDF Types
Scalar Pandas UDF:
@pandas_udf(DoubleType())
def multiply_by_two(pdf: pd.Series) -> pd.Series:
return pdf * 2
Grouped Map Pandas UDF:
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def normalize(pdf: pd.DataFrame) -> pd.DataFrame:
pdf['value'] = (pdf['value'] - pdf['value'].mean()) / pdf['value'].std()
return pdf
Grouped Aggregate Pandas UDF:
@pandas_udf(DoubleType(), PandasUDFType.GROUPED_AGG)
def mean_udf(v: pd.Series) -> float:
return v.mean()
6. Arrow Configuration
Apache Arrow enables efficient data transfer between JVM and Python:
Configuration:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1000")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
Batch Size Tuning:
- Default: 1000 rows per batch
- Larger batches: Better throughput, more memory
- Smaller batches: Lower latency, less memory
7. UDF Testing and Debugging
Testing UDFs:
# Unit test Python UDF
def test_my_udf():
assert my_udf("hello") == "HELLO"
assert my_udf(None) is None
# Test with Spark
df = spark.createDataFrame([("hello",)], ["text"])
result = df.withColumn("upper", my_udf(col("text"))).collect()
assert result[0]["upper"] == "HELLO"
Debugging UDFs:
# Add logging
import logging
logging.basicConfig(level=logging.DEBUG)
@udf(StringType())
def debug_udf(x):
logging.debug(f"Input: {x}")
result = x.upper() if x else None
logging.debug(f"Output: {result}")
return result
8. Common UDF Pitfalls
Pitfall 1: Using global variables
# BAD: Global variable not broadcast
LOOKUP = {"a": 1, "b": 2} # Not serialized to executors
@udf(IntegerType())
def lookup_value(key):
return LOOKUP.get(key) # Error on executors!
# GOOD: Use broadcast variable
broadcast_lookup = spark.sparkContext.broadcast({"a": 1, "b": 2})
@udf(IntegerType())
def lookup_value(key):
return broadcast_lookup.value.get(key)
Pitfall 2: Not handling nulls
# BAD: No null handling
@udf(DoubleType())
def process_value(x):
return x * 2 # Error if x is None
# GOOD: Handle nulls
@udf(DoubleType())
def process_value(x):
return x * 2 if x is not None else None
Pitfall 3: Using Python UDF when Pandas UDF would work
# BAD: Python UDF for simple transformation
@udf(DoubleType())
def double_value(x):
return x * 2
# GOOD: Pandas UDF for vectorized operation
@pandas_udf(DoubleType())
def double_value(pdf: pd.Series) -> pd.Series:
return pdf * 2
π Key Concepts Table
| UDF Type | Processing | Serialization | Speed | Memory | Use Case |
|---|---|---|---|---|---|
| Built-in | JVM native | None | β β β β β | β β β β β | Always prefer |
| Pandas UDF | Batch (Vectorized) | Arrow | β β β β β | β β β β β | Complex logic |
| Python UDF | Row-by-row | Py4J | β β βββ | β β βββ | Simple logic |
| Grouped Map | Group batch | Arrow | β β β β β | β β β β β | Group operations |
π» Code Examples
Example 1: Python UDF vs Pandas UDF
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, pandas_udf, col
from pyspark.sql.types import DoubleType
import pandas as pd
spark = SparkSession.builder.appName("UDFOptimization").getOrCreate()
# Create test data
df = spark.range(1000000).withColumn("value", col("id") * 1.0)
# Python UDF (slow)
@udf(DoubleType())
def python_double(x):
return x * 2 if x is not None else None
# Pandas UDF (fast)
@pandas_udf(DoubleType())
def pandas_double(pdf: pd.Series) -> pd.Series:
return pdf * 2
# Benchmark
import time
# Python UDF
start = time.time()
result_python = df.withColumn("doubled", python_double(col("value")))
result_python.count()
python_time = time.time() - start
# Pandas UDF
start = time.time()
result_pandas = df.withColumn("doubled", pandas_double(col("value")))
result_pandas.count()
pandas_time = time.time() - start
print(f"Python UDF: {python_time:.2f}s")
print(f"Pandas UDF: {pandas_time:.2f}s")
print(f"Speedup: {python_time / pandas_time:.1f}x")
Example 2: Grouped Map Pandas UDF
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
import pandas as pd
# Define schema
schema = StructType([
StructField("group", IntegerType()),
StructField("value", DoubleType()),
StructField("normalized", DoubleType())
])
# Grouped Map UDF
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def normalize(pdf: pd.DataFrame) -> pd.DataFrame:
pdf['normalized'] = (pdf['value'] - pdf['value'].mean()) / pdf['value'].std()
return pdf
# Create data
df = spark.range(100000).withColumn(
"group", col("id") % 10
).withColumn(
"value", col("id") * 1.0
)
# Apply UDF
result = df.groupby("group").apply(normalize)
result.show()
Example 3: Arrow Configuration
# Enable Arrow for better performance
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1000")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
# Pandas UDF with Arrow
@pandas_udf(DoubleType())
def efficient_udf(pdf: pd.Series) -> pd.Series:
return pdf * 2
# This will use Arrow for efficient data transfer
result = df.withColumn("doubled", efficient_udf(col("value")))
Example 4: UDF with Broadcast Variable
from pyspark.sql.functions import udf, broadcast
from pyspark.sql.types import IntegerType
# Create lookup table
lookup_data = {i: i * 10 for i in range(100)}
broadcast_lookup = spark.sparkContext.broadcast(lookup_data)
# UDF using broadcast
@udf(IntegerType())
def lookup_value(key):
return broadcast_lookup.value.get(key)
# Apply UDF
df = spark.range(1000).withColumn("key", col("id") % 100)
result = df.withColumn("lookup_result", lookup_value(col("key")))
result.show()
π Performance Metrics
| Method | 1M Rows | 10M Rows | Memory | GC | Complexity |
|---|---|---|---|---|---|
| Built-in | 100ms | 800ms | 50MB | Low | N/A |
| Pandas UDF | 500ms | 4s | 100MB | Medium | Medium |
| Python UDF | 5s | 50s | 200MB | High | High |
| Arrow Batch | 400ms | 3.5s | 80MB | Low | Medium |
| Grouped Map | 800ms | 7s | 150MB | Medium | High |
β Best Practices
1. Prefer Built-in Functions
# GOOD: Use built-in functions
from pyspark.sql.functions import upper, concat, when
df.withColumn("name_upper", upper(col("name")))
# BAD: UDF for same thing
@udf(StringType())
def my_upper(s):
return s.upper() if s else None
2. Use Pandas UDF When Possible
# BAD: Python UDF
@udf(DoubleType())
def process(x):
return x * 2
# GOOD: Pandas UDF
@pandas_udf(DoubleType())
def process(pdf: pd.Series) -> pd.Series:
return pdf * 2
3. Enable Arrow
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1000")
4. Handle Nulls
@pandas_udf(DoubleType())
def safe_process(pdf: pd.Series) -> pd.Series:
return pdf * 2 # Pandas handles NaN automatically
5. Use Broadcast for Large Lookups
lookup = spark.sparkContext.broadcast(large_dict)
@udf(IntegerType())
def lookup_value(key):
return lookup.value.get(key)
6. Tune Batch Size
# Larger batches for better throughput
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "5000")
# Smaller batches for lower latency
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "100")
See Also
- Kafka Streams (kafka/03): UDF patterns in stream processing
- Data Engineering Streaming (data-engineering/022): UDF optimization in streaming pipelines