PySpark UDF Optimization: UDFs, Pandas UDFs, and Performance

Free Lesson

Advertisement

⚑ PySpark UDF Optimization

DfUser Defined Function (UDF)

A UDF is a custom function defined by the user that extends Spark's built-in functions. Python UDFs are serialized via pickle and executed in a separate Python process per executor, incurring serialization overhead per row.

DfPandas UDF (Vectorized UDF)

A Pandas UDF uses Apache Arrow to transfer data between JVM and Python in batches, enabling vectorized operations via pandas/Series. This avoids row-by-row serialization overhead, achieving 3x–100x speedup over regular Python UDFs.

Tudf=Nrowstimes(Tserialize+Tprocess+Tdeserialize)+TarrowT_{udf} = N_{rows} \\times (T_{serialize} + T_{process} + T_{deserialize}) + T_{arrow}

Pandas UDF Speedup Factor

Speedup=fracTpythonudfTpandasudf=fracNrowstimes(Tser+Tdeser)Tarrow+TvectorizedSpeedup = \\frac{T_{python\\_udf}}{T_{pandas\\_udf}} = \\frac{N_{rows} \\times (T_{ser} + T_{deser})}{T_{arrow} + T_{vectorized}}

Here,

  • Tpython_udfT_{python\_udf}=Execution time with regular Python UDF
  • Tpandas_udfT_{pandas\_udf}=Execution time with Pandas UDF
  • TarrowT_{arrow}=Arrow batch transfer time (amortized per row)
  • TvectorizedT_{vectorized}=Vectorized computation time via pandas

Python UDFs are 10x–100x slower than built-in functions due to per-row serialization between JVM and Python. Pandas UDFs reduce this overhead by transferring data in batches via Apache Arrow.

Use built-in Spark SQL functions whenever possible β€” they are implemented in JVM and optimized by Catalyst. Only use UDFs when built-in functions cannot express the required logic.

ThArrow Batch Transfer Optimization

Theorem: Pandas UDFs achieve a minimum speedup of N_{rows} / B_{batch} over Python UDFs, where B_{batch} is the Arrow batch size (default 1000 rows). Larger batches amortize Arrow overhead further, with diminishing returns beyond B_{batch} = 10,000.

  • Python UDFs: row-by-row serialization, 10x–100x slower than built-in functions
  • Pandas UDFs: batch processing via Arrow, 3x–100x faster than Python UDFs
  • Always prefer built-in functions; use UDFs only when necessary
  • Tune arrow.maxRecordsPerBatch for optimal batch size (default 1000)

πŸ—οΈ Architecture Diagram

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    UDF EXECUTION ARCHITECTURE                    β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                  PYTHON UDF FLOW                         β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                        β”‚   β”‚
β”‚  β”‚  β”‚  Driver     β”‚  Send UDF function (pickle)            β”‚   β”‚
β”‚  β”‚  β”‚  (Python)   β”‚ ──────────────────────────────────►   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                        β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Executor (JVM + Python)                         β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  Spark JVM Process                      β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β”‚  Row-by-Row Processing           β”‚   β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β”‚  Row 1 β†’ Python β†’ Result 1      β”‚   β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β”‚  Row 2 β†’ Python β†’ Result 2      β”‚   β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β”‚  Row 3 β†’ Python β†’ Result 3      β”‚   β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β”‚  ...                            β”‚   β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚         β–²                               β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚         β”‚ Serialization (Py4J)         β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚         β–Ό                               β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β”‚  Python Worker Process          β”‚   β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β”‚  def my_udf(x):                 β”‚   β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β”‚      return x * 2               β”‚   β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  OVERHEAD: Serialization per row + Python process switch β”‚   β”‚
β”‚  β”‚  PERFORMANCE: ~100x slower than native Spark operations  β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                  PANDAS UDF FLOW                         β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                        β”‚   β”‚
β”‚  β”‚  β”‚  Driver     β”‚  Send UDF function (pickle)            β”‚   β”‚
β”‚  β”‚  β”‚  (Python)   β”‚ ──────────────────────────────────►   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                        β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Executor (JVM + Python)                         β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                  β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  Spark JVM Process                      β”‚   β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β”‚  Batch Processing                β”‚   β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β”‚  Batch 1 (1000 rows) β†’ Pandas   β”‚   β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β”‚  Batch 2 (1000 rows) β†’ Pandas   β”‚   β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β”‚  Batch 3 (1000 rows) β†’ Pandas   β”‚   β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚         β–²                               β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚         β”‚ Arrow Serialization          β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚         β–Ό                               β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β”‚  Python Worker Process          β”‚   β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β”‚  def my_pandas_udf(pdf):        β”‚   β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β”‚      return pdf * 2             β”‚   β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  ADVANTAGE: Vectorized operations, Arrow serialization   β”‚   β”‚
β”‚  β”‚  PERFORMANCE: ~10x faster than Python UDFs              β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                  ARROW SERIALIZATION                      β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  JVM (Spark)                    Python (Pandas)          β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”‚   β”‚
β”‚  β”‚  β”‚  Columnar    β”‚    Arrow     β”‚  Columnar    β”‚         β”‚   β”‚
β”‚  β”‚  β”‚  Format      β”‚ ──────────► β”‚  Format      β”‚         β”‚   β”‚
β”‚  β”‚  β”‚  (Internal)  β”‚  Zero-copy   β”‚  (Pandas)    β”‚         β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜              β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  β€’ Zero-copy data transfer                               β”‚   β”‚
β”‚  β”‚  β€’ Columnar format (cache-friendly)                      β”‚   β”‚
β”‚  β”‚  β€’ Vectorized operations                                 β”‚   β”‚
β”‚  β”‚  β€’ No per-row serialization                              β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              UDF TYPE COMPARISON MATRIX                          β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                  PERFORMANCE COMPARISON                   β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  Operation: Process 1M rows                              β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β”‚
β”‚  β”‚  β”‚    Method       β”‚   Time   β”‚  Memory  β”‚   GC     β”‚  β”‚   β”‚
β”‚  β”‚  β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€  β”‚   β”‚
β”‚  β”‚  β”‚ Native Spark    β”‚  100ms   β”‚   50MB   β”‚   Low    β”‚  β”‚   β”‚
β”‚  β”‚  β”‚ Pandas UDF      β”‚  500ms   β”‚  100MB   β”‚  Medium  β”‚  β”‚   β”‚
β”‚  β”‚  β”‚ Python UDF      β”‚ 5000ms   β”‚  200MB   β”‚   High   β”‚  β”‚   β”‚
β”‚  β”‚  β”‚ Python UDF+map  β”‚ 4500ms   β”‚  180MB   β”‚   High   β”‚  β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  Speedup: Native > Pandas (10x) > Python (100x)        β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                  MEMORY USAGE COMPARISON                  β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  Native Spark:                                          β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  InternalRow[] (compact, columnar)               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  Memory: 1x data size                           β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  Pandas UDF:                                            β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Arrow β†’ Pandas DataFrame β†’ Arrow               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  Memory: 2x data size (Arrow + Pandas)          β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  Python UDF:                                            β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  InternalRow β†’ Python object β†’ InternalRow      β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  Memory: 3-5x data size (Python objects)        β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                  SERIALIZATION OVERHEAD                    β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  Python UDF (Py4J):                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Row 1: Serialize β†’ Process β†’ Deserialize       β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  Row 2: Serialize β†’ Process β†’ Deserialize       β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  Row 3: Serialize β†’ Process β†’ Deserialize       β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  ...                                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  Overhead: ~100ns per row                       β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  Pandas UDF (Arrow):                                    β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Batch: Serialize once β†’ Process β†’ Deserialize  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  Overhead: ~1ms per batch (1000 rows)           β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              UDF OPTIMIZATION DECISION TREE                      β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚  Can you implement with built-in Spark functions?        β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  YES ──────────────────────────────────────────────►    β”‚   β”‚
β”‚  β”‚  β”‚       Use Spark functions (fastest)                  β”‚   β”‚
β”‚  β”‚  β”‚       - col(), when(), lit(), etc.                   β”‚   β”‚
β”‚  β”‚  β”‚       - 10-100x faster than UDFs                     β”‚   β”‚
β”‚  β”‚  β”‚                                                      β”‚   β”‚
β”‚  β”‚  NO                                                     β”‚   β”‚
β”‚  β”‚  β”‚                                                      β”‚   β”‚
β”‚  β”‚  β–Ό                                                      β”‚   β”‚
β”‚  β”‚  Can you use Pandas/NumPy for vectorized ops?          β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  YES ──────────────────────────────────────────────►    β”‚   β”‚
β”‚  β”‚  β”‚       Use Pandas UDF (10x faster)                    β”‚   β”‚
β”‚  β”‚  β”‚       @pandas_udf(returnType)                        β”‚   β”‚
β”‚  β”‚  β”‚       def my_udf(pdf: pd.DataFrame) -> pd.DataFrame β”‚   β”‚
β”‚  β”‚  β”‚                                                      β”‚   β”‚
β”‚  β”‚  NO                                                     β”‚   β”‚
β”‚  β”‚  β”‚                                                      β”‚   β”‚
β”‚  β”‚  β–Ό                                                      β”‚   β”‚
β”‚  β”‚  Use Python UDF (slowest, but flexible)                β”‚   β”‚
β”‚  β”‚  @udf(returnType)                                       β”‚   β”‚
β”‚  β”‚  def my_udf(x):                                         β”‚   β”‚
β”‚  β”‚      return ...                                         β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚  OPTIMIZATION CHECKLIST                                  β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  β–‘ Try built-in functions first                         β”‚   β”‚
β”‚  β”‚  β–‘ Use Pandas UDF instead of Python UDF                 β”‚   β”‚
β”‚  β”‚  β–‘ Minimize data serialization                          β”‚   β”‚
β”‚  β”‚  β–‘ Use @pandas_udf("double") for simple transforms     β”‚   β”‚
β”‚  β”‚  β–‘ Use @pandas_udf(returnType, PandasUDFType.GROUPED_  β”‚   β”‚
β”‚  β”‚    MAP) for group operations                            β”‚   β”‚
β”‚  β”‚  β–‘ Enable Arrow: spark.sql.execution.arrow.enabled=trueβ”‚   β”‚
β”‚  β”‚  β–‘ Tune batch size: spark.sql.execution.arrow.maxRecordsβ”‚   β”‚
β”‚  β”‚    PerBatch (default 1000)                              β”‚   β”‚
β”‚  β”‚  β–‘ Avoid global variables in UDFs                       β”‚   β”‚
β”‚  β”‚  β–‘ Use broadcast for large lookup tables                β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ“š Detailed Explanation

1. What is a UDF?

A User Defined Function (UDF) is a function defined by the user that can be used in Spark SQL and DataFrame operations. UDFs extend Spark's built-in function library to implement custom logic that isn't available out of the box.

Types of UDFs in PySpark:

  1. Python UDF: Standard Python function processed row-by-row
  2. Pandas UDF: Vectorized function using Pandas/NumPy operations
  3. Grouped Map UDF: Processes groups of rows as Pandas DataFrames

2. Python UDF Performance Characteristics

Python UDFs have significant performance overhead due to:

Serialization Overhead:

  • Each row is serialized from JVM to Python (Py4J)
  • Python processes the row
  • Result is serialized back to JVM
  • This happens for EVERY row

Process Switch Overhead:

  • Context switching between JVM and Python
  • Memory copying between processes
  • Garbage collection in both JVM and Python

Python Execution Overhead:

  • Python is interpreted (no JIT compilation)
  • GIL limits true parallelism
  • Object creation/destruction overhead

Performance Impact:

  • ~10-100x slower than native Spark operations
  • Significant memory overhead (Python object duplication)
  • High GC pressure in both JVM and Python

3. Pandas UDF Performance Characteristics

Pandas UDFs (introduced in Spark 2.3) dramatically improve performance by:

Vectorized Processing:

  • Process batches of rows at once (default: 1000 rows)
  • Use Pandas/NumPy for vectorized operations
  • Avoid per-row function call overhead

Arrow Serialization:

  • Zero-copy data transfer between JVM and Python
  • Columnar format (cache-friendly)
  • No per-row serialization/deserialization

Performance Improvement:

  • ~10x faster than Python UDFs
  • Lower memory usage (Arrow format)
  • Better GC behavior

4. When to Use UDFs

Use Built-in Functions When Possible:

# GOOD: Built-in function
from pyspark.sql.functions import col, upper, concat
df.withColumn("name_upper", upper(col("name")))

# BAD: UDF for same thing
@udf(StringType())
def my_upper(s):
    return s.upper() if s else None
df.withColumn("name_upper", my_upper(col("name")))

Use Pandas UDF When:

  • Complex logic not supported by built-in functions
  • Need to use Pandas/NumPy libraries
  • Processing batches of rows
  • Group-by operations with complex logic

Use Python UDF When:

  • Simple row-level logic
  • Cannot be vectorized
  • Need to call external services
  • Legacy code compatibility

5. Pandas UDF Types

Scalar Pandas UDF:

@pandas_udf(DoubleType())
def multiply_by_two(pdf: pd.Series) -> pd.Series:
    return pdf * 2

Grouped Map Pandas UDF:

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def normalize(pdf: pd.DataFrame) -> pd.DataFrame:
    pdf['value'] = (pdf['value'] - pdf['value'].mean()) / pdf['value'].std()
    return pdf

Grouped Aggregate Pandas UDF:

@pandas_udf(DoubleType(), PandasUDFType.GROUPED_AGG)
def mean_udf(v: pd.Series) -> float:
    return v.mean()

6. Arrow Configuration

Apache Arrow enables efficient data transfer between JVM and Python:

Configuration:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1000")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

Batch Size Tuning:

  • Default: 1000 rows per batch
  • Larger batches: Better throughput, more memory
  • Smaller batches: Lower latency, less memory

7. UDF Testing and Debugging

Testing UDFs:

# Unit test Python UDF
def test_my_udf():
    assert my_udf("hello") == "HELLO"
    assert my_udf(None) is None

# Test with Spark
df = spark.createDataFrame([("hello",)], ["text"])
result = df.withColumn("upper", my_udf(col("text"))).collect()
assert result[0]["upper"] == "HELLO"

Debugging UDFs:

# Add logging
import logging
logging.basicConfig(level=logging.DEBUG)

@udf(StringType())
def debug_udf(x):
    logging.debug(f"Input: {x}")
    result = x.upper() if x else None
    logging.debug(f"Output: {result}")
    return result

8. Common UDF Pitfalls

Pitfall 1: Using global variables

# BAD: Global variable not broadcast
LOOKUP = {"a": 1, "b": 2}  # Not serialized to executors

@udf(IntegerType())
def lookup_value(key):
    return LOOKUP.get(key)  # Error on executors!

# GOOD: Use broadcast variable
broadcast_lookup = spark.sparkContext.broadcast({"a": 1, "b": 2})

@udf(IntegerType())
def lookup_value(key):
    return broadcast_lookup.value.get(key)

Pitfall 2: Not handling nulls

# BAD: No null handling
@udf(DoubleType())
def process_value(x):
    return x * 2  # Error if x is None

# GOOD: Handle nulls
@udf(DoubleType())
def process_value(x):
    return x * 2 if x is not None else None

Pitfall 3: Using Python UDF when Pandas UDF would work

# BAD: Python UDF for simple transformation
@udf(DoubleType())
def double_value(x):
    return x * 2

# GOOD: Pandas UDF for vectorized operation
@pandas_udf(DoubleType())
def double_value(pdf: pd.Series) -> pd.Series:
    return pdf * 2

πŸ”‘ Key Concepts Table

UDF TypeProcessingSerializationSpeedMemoryUse Case
Built-inJVM nativeNoneβ˜…β˜…β˜…β˜…β˜…β˜…β˜…β˜…β˜…β˜…Always prefer
Pandas UDFBatch (Vectorized)Arrowβ˜…β˜…β˜…β˜…β˜†β˜…β˜…β˜…β˜…β˜†Complex logic
Python UDFRow-by-rowPy4Jβ˜…β˜…β˜†β˜†β˜†β˜…β˜…β˜†β˜†β˜†Simple logic
Grouped MapGroup batchArrowβ˜…β˜…β˜…β˜…β˜†β˜…β˜…β˜…β˜…β˜†Group operations

πŸ’» Code Examples

Example 1: Python UDF vs Pandas UDF

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, pandas_udf, col
from pyspark.sql.types import DoubleType
import pandas as pd

spark = SparkSession.builder.appName("UDFOptimization").getOrCreate()

# Create test data
df = spark.range(1000000).withColumn("value", col("id") * 1.0)

# Python UDF (slow)
@udf(DoubleType())
def python_double(x):
    return x * 2 if x is not None else None

# Pandas UDF (fast)
@pandas_udf(DoubleType())
def pandas_double(pdf: pd.Series) -> pd.Series:
    return pdf * 2

# Benchmark
import time

# Python UDF
start = time.time()
result_python = df.withColumn("doubled", python_double(col("value")))
result_python.count()
python_time = time.time() - start

# Pandas UDF
start = time.time()
result_pandas = df.withColumn("doubled", pandas_double(col("value")))
result_pandas.count()
pandas_time = time.time() - start

print(f"Python UDF: {python_time:.2f}s")
print(f"Pandas UDF: {pandas_time:.2f}s")
print(f"Speedup: {python_time / pandas_time:.1f}x")

Example 2: Grouped Map Pandas UDF

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
import pandas as pd

# Define schema
schema = StructType([
    StructField("group", IntegerType()),
    StructField("value", DoubleType()),
    StructField("normalized", DoubleType())
])

# Grouped Map UDF
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def normalize(pdf: pd.DataFrame) -> pd.DataFrame:
    pdf['normalized'] = (pdf['value'] - pdf['value'].mean()) / pdf['value'].std()
    return pdf

# Create data
df = spark.range(100000).withColumn(
    "group", col("id") % 10
).withColumn(
    "value", col("id") * 1.0
)

# Apply UDF
result = df.groupby("group").apply(normalize)
result.show()

Example 3: Arrow Configuration

# Enable Arrow for better performance
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1000")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# Pandas UDF with Arrow
@pandas_udf(DoubleType())
def efficient_udf(pdf: pd.Series) -> pd.Series:
    return pdf * 2

# This will use Arrow for efficient data transfer
result = df.withColumn("doubled", efficient_udf(col("value")))

Example 4: UDF with Broadcast Variable

from pyspark.sql.functions import udf, broadcast
from pyspark.sql.types import IntegerType

# Create lookup table
lookup_data = {i: i * 10 for i in range(100)}
broadcast_lookup = spark.sparkContext.broadcast(lookup_data)

# UDF using broadcast
@udf(IntegerType())
def lookup_value(key):
    return broadcast_lookup.value.get(key)

# Apply UDF
df = spark.range(1000).withColumn("key", col("id") % 100)
result = df.withColumn("lookup_result", lookup_value(col("key")))
result.show()

πŸ“Š Performance Metrics

Method1M Rows10M RowsMemoryGCComplexity
Built-in100ms800ms50MBLowN/A
Pandas UDF500ms4s100MBMediumMedium
Python UDF5s50s200MBHighHigh
Arrow Batch400ms3.5s80MBLowMedium
Grouped Map800ms7s150MBMediumHigh

βœ… Best Practices

1. Prefer Built-in Functions

# GOOD: Use built-in functions
from pyspark.sql.functions import upper, concat, when
df.withColumn("name_upper", upper(col("name")))

# BAD: UDF for same thing
@udf(StringType())
def my_upper(s):
    return s.upper() if s else None

2. Use Pandas UDF When Possible

# BAD: Python UDF
@udf(DoubleType())
def process(x):
    return x * 2

# GOOD: Pandas UDF
@pandas_udf(DoubleType())
def process(pdf: pd.Series) -> pd.Series:
    return pdf * 2

3. Enable Arrow

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1000")

4. Handle Nulls

@pandas_udf(DoubleType())
def safe_process(pdf: pd.Series) -> pd.Series:
    return pdf * 2  # Pandas handles NaN automatically

5. Use Broadcast for Large Lookups

lookup = spark.sparkContext.broadcast(large_dict)

@udf(IntegerType())
def lookup_value(key):
    return lookup.value.get(key)

6. Tune Batch Size

# Larger batches for better throughput
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "5000")

# Smaller batches for lower latency
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "100")

See Also

  • Kafka Streams (kafka/03): UDF patterns in stream processing
  • Data Engineering Streaming (data-engineering/022): UDF optimization in streaming pipelines

Advertisement

Need Expert PySpark Help?

Get personalized Spark optimization, cluster tuning, or production data pipeline consulting.

Advertisement