PySpark Joins Optimization: Types, Broadcast, and Shuffle Joins

Free Lesson

Advertisement

πŸ”— PySpark Joins Optimization

DfJoin (Relational Algebra)

A join combines two datasets based on a common key. In Spark, joins are implemented as wide transformations requiring shuffle unless one side is broadcast. The join strategy (broadcast, sort-merge, shuffle-hash) is selected by Catalyst based on data statistics.

DfBroadcast Join

A broadcast join sends the smaller dataset to all executors via broadcast variables, eliminating shuffle on the larger side. Effective when one dataset fits in executor memory (typically < 10MB, configurable via autoBroadcastJoinThreshold).

Sjoin=frac∣AcapB∣∣A∣times∣B∣S_{join} = \\frac{|A \\cap B|}{|A| \\times |B|}

Broadcast Join Threshold

Tbroadcast=min(MexecutortimesFsafety,Tconfig)T_{broadcast} = \\min(M_{executor} \\times F_{safety}, T_{config})

Here,

  • TbroadcastT_{broadcast}=Effective broadcast threshold in bytes
  • MexecutorM_{executor}=Executor memory available for broadcast
  • FsafetyF_{safety}=Safety factor (default 0.25 β€” use 25% of executor memory)
  • TconfigT_{config}=Configured spark.sql.autoBroadcastJoinThreshold

Catalyst selects join strategy based on estimated table sizes: if the smaller side is below autoBroadcastJoinThreshold, it uses BroadcastHashJoin; otherwise, it defaults to SortMergeJoin (most general) or ShuffleHashJoin (for medium-sized data).

For skewed joins, use skewJoinHint or AQE's skew join handling. AQE automatically detects skewed partitions and splits them into sub-partitions to balance the workload.

ThBroadcast Join Optimization

Theorem: If one side of a join is broadcast, the shuffle cost is reduced from O(P Γ— N Γ— W) to O(P Γ— N_{small} Γ— W) where P is the number of partitions on the large side, N is row count, W is row width, and N_{small} is the small table row count. This provides a speedup of N_{large} / (N_{large} + N_{small} Γ— P).

  • Broadcast joins eliminate shuffle when one side fits in memory
  • Default broadcast threshold is 10MB; increase to 50-100MB for large clusters
  • Sort-merge join is the most general strategy; requires both sides to be sorted
  • Bucket joins eliminate shuffle for repeated joins on the same key
  • AQE handles skewed joins automatically at runtime

πŸ—οΈ Architecture Diagram

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    JOIN TYPES OVERVIEW                           β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    INNER JOIN                             β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚    Table A              Table B              Result      β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”           β”Œβ”€β”€β”€β”€β”€β”€β”€β”          β”Œβ”€β”€β”€β”€β”€β”€β”€β”      β”‚   β”‚
β”‚  β”‚  β”‚ β–ˆβ–ˆβ–ˆβ–ˆ  β”‚           β”‚ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β”‚          β”‚ β–ˆβ–ˆβ–ˆβ–ˆ  β”‚      β”‚   β”‚
β”‚  β”‚  β”‚ β–ˆβ–ˆβ–ˆβ–ˆβ–‘ β”‚     β‹ˆ     β”‚ β–‘β–ˆβ–ˆβ–ˆβ–ˆ β”‚    =     β”‚ β–ˆβ–ˆβ–ˆβ–ˆ  β”‚      β”‚   β”‚
β”‚  β”‚  β”‚ β–ˆβ–ˆβ–ˆβ–ˆβ–‘ β”‚           β”‚ β–‘β–‘β–‘β–‘β–‘ β”‚          β””β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”˜           β””β”€β”€β”€β”€β”€β”€β”€β”˜                         β”‚   β”‚
β”‚  β”‚  Only matching rows from both tables                    β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    LEFT JOIN                              β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚    Table A              Table B              Result      β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”           β”Œβ”€β”€β”€β”€β”€β”€β”€β”          β”Œβ”€β”€β”€β”€β”€β”€β”€β”      β”‚   β”‚
β”‚  β”‚  β”‚ β–ˆβ–ˆβ–ˆβ–ˆ  β”‚           β”‚ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β”‚          β”‚ β–ˆβ–ˆβ–ˆβ–ˆ  β”‚      β”‚   β”‚
β”‚  β”‚  β”‚ β–ˆβ–ˆβ–ˆβ–ˆβ–‘ β”‚     β‹ˆ     β”‚ β–‘β–ˆβ–ˆβ–ˆβ–ˆ β”‚    =     β”‚ β–ˆβ–ˆβ–ˆβ–ˆβ–‘ β”‚      β”‚   β”‚
β”‚  β”‚  β”‚ β–ˆβ–ˆβ–ˆβ–ˆβ–‘ β”‚           β”‚ β–‘β–‘β–‘β–‘β–‘ β”‚          β”‚ β–ˆβ–ˆβ–ˆβ–ˆβ–‘ β”‚      β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”˜           β””β”€β”€β”€β”€β”€β”€β”€β”˜          β””β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚   β”‚
β”‚  β”‚  All rows from left + matching from right               β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                   RIGHT JOIN                              β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚    Table A              Table B              Result      β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”           β”Œβ”€β”€β”€β”€β”€β”€β”€β”          β”Œβ”€β”€β”€β”€β”€β”€β”€β”      β”‚   β”‚
β”‚  β”‚  β”‚ β–ˆβ–ˆβ–ˆβ–ˆ  β”‚           β”‚ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β”‚          β”‚ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β”‚      β”‚   β”‚
β”‚  β”‚  β”‚ β–ˆβ–ˆβ–ˆβ–ˆβ–‘ β”‚     β‹ˆ     β”‚ β–‘β–ˆβ–ˆβ–ˆβ–ˆ β”‚    =     β”‚ β–‘β–ˆβ–ˆβ–ˆβ–ˆ β”‚      β”‚   β”‚
β”‚  β”‚  β”‚ β–ˆβ–ˆβ–ˆβ–ˆβ–‘ β”‚           β”‚ β–‘β–‘β–‘β–‘β–‘ β”‚          β”‚ β–‘β–‘β–‘β–‘β–‘ β”‚      β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”˜           β””β”€β”€β”€β”€β”€β”€β”€β”˜          β””β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚   β”‚
β”‚  β”‚  All rows from right + matching from left               β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                   FULL OUTER JOIN                         β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚    Table A              Table B              Result      β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”           β”Œβ”€β”€β”€β”€β”€β”€β”€β”          β”Œβ”€β”€β”€β”€β”€β”€β”€β”      β”‚   β”‚
β”‚  β”‚  β”‚ β–ˆβ–ˆβ–ˆβ–ˆ  β”‚           β”‚ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β”‚          β”‚ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β”‚      β”‚   β”‚
β”‚  β”‚  β”‚ β–ˆβ–ˆβ–ˆβ–ˆβ–‘ β”‚     β‹ˆ     β”‚ β–‘β–ˆβ–ˆβ–ˆβ–ˆ β”‚    =     β”‚ β–ˆβ–ˆβ–ˆβ–ˆβ–‘ β”‚      β”‚   β”‚
β”‚  β”‚  β”‚ β–ˆβ–ˆβ–ˆβ–ˆβ–‘ β”‚           β”‚ β–‘β–‘β–‘β–‘β–‘ β”‚          β”‚ β–ˆβ–ˆβ–ˆβ–ˆβ–‘ β”‚      β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”˜           β””β”€β”€β”€β”€β”€β”€β”€β”˜          β””β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚   β”‚
β”‚  β”‚  All rows from both tables (nulls for mismatches)       β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              BROADCAST HASH JOIN ARCHITECTURE                    β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    DRIVER NODE                            β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Small Table (fits in memory)                    β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”              β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  A  β”‚ β”‚  B  β”‚ β”‚  C  β”‚ β”‚  D  β”‚              β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”˜              β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚            β”‚           β”‚           β”‚                     β”‚   β”‚
β”‚  β”‚            β–Ό           β–Ό           β–Ό                     β”‚   β”‚
β”‚  β”‚      Broadcast to all executors                         β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚               β”‚           β”‚           β”‚                      β”‚
β”‚               β–Ό           β–Ό           β–Ό                      β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                  EXECUTOR NODES                          β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚   β”‚
β”‚  β”‚  β”‚  Executor 0 β”‚ β”‚  Executor 1 β”‚ β”‚  Executor 2 β”‚      β”‚   β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”  β”‚ β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”  β”‚ β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”  β”‚      β”‚   β”‚
β”‚  β”‚  β”‚  β”‚Small β”‚  β”‚ β”‚  β”‚Small β”‚  β”‚ β”‚  β”‚Small β”‚  β”‚      β”‚   β”‚
β”‚  β”‚  β”‚  β”‚Table β”‚  β”‚ β”‚  β”‚Table β”‚  β”‚ β”‚  β”‚Table β”‚  β”‚      β”‚   β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚ β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚ β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚      β”‚   β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”  β”‚ β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”  β”‚ β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”  β”‚      β”‚   β”‚
β”‚  β”‚  β”‚  β”‚Large β”‚  β”‚ β”‚  β”‚Large β”‚  β”‚ β”‚  β”‚Large β”‚  β”‚      β”‚   β”‚
β”‚  β”‚  β”‚  β”‚Part 0β”‚  β”‚ β”‚  β”‚Part 1β”‚  β”‚ β”‚  β”‚Part 2β”‚  β”‚      β”‚   β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚ β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚ β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚      β”‚   β”‚
β”‚  β”‚  β”‚     β‹ˆ       β”‚ β”‚     β‹ˆ       β”‚ β”‚     β‹ˆ       β”‚      β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  Each executor joins local large partition with          β”‚   β”‚
β”‚  β”‚  broadcast copy of small table (NO SHUFFLE)             β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β”‚  ADVANTAGE: Zero shuffle, parallel execution                    β”‚
β”‚  LIMITATION: Small table must fit in executor memory            β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                SORT-MERGE JOIN ARCHITECTURE                      β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  STAGE 1: SHUFFLE WRITE                                        β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  Table A Partitions        Table B Partitions            β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”          β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”             β”‚   β”‚
β”‚  β”‚  β”‚  0  β”‚ β”‚  1  β”‚          β”‚  0  β”‚ β”‚  1  β”‚             β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”¬β”€β”€β”˜ β””β”€β”€β”¬β”€β”€β”˜          β””β”€β”€β”¬β”€β”€β”˜ β””β”€β”€β”¬β”€β”€β”˜             β”‚   β”‚
β”‚  β”‚     β”‚       β”‚                β”‚       β”‚                   β”‚   β”‚
β”‚  β”‚     β–Ό       β–Ό                β–Ό       β–Ό                   β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚           HASH PARTITION BY KEY                  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”      β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚reduce β”‚ β”‚reduce β”‚ β”‚reduce β”‚ β”‚reduce β”‚      β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚   0   β”‚ β”‚   1   β”‚ β”‚   2   β”‚ β”‚   3   β”‚      β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                          β”‚                                      β”‚
β”‚                          β–Ό                                      β”‚
β”‚  STAGE 2: SORT + MERGE                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚  Reduce Partition 0:                                     β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Sort A: [a1, a2, a3, ...]                       β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  Sort B: [b1, b2, b3, ...]                       β”‚   β”‚   β”‚
β”‚  β”‚  β”‚           ↓                                      β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  MERGE:                                           β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”                        β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  A  β”‚ β”‚  B  β”‚ β”‚Resultβ”‚                        β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚ptr  β”‚ β”‚ptr  β”‚ β”‚      β”‚                        β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”˜                        β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  while (a < b) a++                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  while (a == b) output(a, b), a++, b++          β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  while (a > b) b++                               β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β”‚  COMPLEXITY: O(N + M) per partition (linear merge)              β”‚
β”‚  ADVANTAGE: Works for any key distribution                      β”‚
β”‚  SHUFFLE: Required (data redistribution by key)                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ“š Detailed Explanation

1. Join Types in PySpark

PySpark supports several join types, each with different semantics:

Inner Join: Returns only rows that have matching keys in both DataFrames. This is the default join type and the most common.

Left Outer Join: Returns all rows from the left DataFrame and matching rows from the right DataFrame. Non-matching rows have null values for right columns.

Right Outer Join: Returns all rows from the right DataFrame and matching rows from the left DataFrame. Non-matching rows have null values for left columns.

Full Outer Join: Returns all rows from both DataFrames. Non-matching rows have null values for missing columns.

Left Semi Join: Returns all rows from the left DataFrame where there is a matching row in the right DataFrame. Equivalent to EXISTS in SQL.

Left Anti Join: Returns all rows from the left DataFrame where there is NO matching row in the right DataFrame. Equivalent to NOT EXISTS in SQL.

Cross Join: Returns the Cartesian product of both DataFrames (all combinations). Use with extreme caution!

2. Broadcast Hash Join

Broadcast hash join is the most efficient join strategy when one table is small enough to fit in memory. The small table is broadcast to all executors, and each executor joins its local partition with the broadcast table.

Configuration:

# Default threshold: 10MB
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")

# Manual broadcast hint
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")

Advantages:

  • No shuffle required
  • Parallel execution on all executors
  • Low memory overhead for small table
  • Very fast for small-large joins

Limitations:

  • Small table must fit in executor memory
  • Network overhead for broadcasting
  • Not suitable for large-small joins

3. Sort-Merge Join

Sort-merge join is the default strategy for joining large tables. It consists of three phases:

  1. Shuffle: Both tables are partitioned by key
  2. Sort: Each partition is sorted by key
  3. Merge: Sorted partitions are merged

Optimization Techniques:

  • Partition pruning: Skip partitions that don't match
  • Bucketing: Pre-partition data by join key
  • Sort merge join with Bloom filter: Skip non-matching keys

4. Shuffle Hash Join

Shuffle hash join is used when one table is moderately sized. After shuffling, a hash table is built for the smaller partition, and the larger partition probes the hash table.

When to use:

  • Medium-sized tables (10MB - 1GB)
  • High selectivity joins
  • When broadcast join is not possible

5. Cartesian Product Join

Cartesian product joins produce all combinations of rows from both tables. This is extremely expensive and should be avoided unless absolutely necessary.

Complexity: O(N Γ— M) Use cases: Only when business logic requires all combinations

6. Join Optimization Strategies

Broadcast Hints:

# Force broadcast for known small tables
result = large_df.join(broadcast(small_df), "key")

Bucketing:

# Pre-bucket tables by join key
df.write.bucketBy(100, "user_id").saveAsTable("users_bucketed")
df.write.bucketBy(100, "user_id").saveAsTable("orders_bucketed")

Partitioning:

# Repartition by join key
df = df.repartition(100, "user_id")

7. Data Skew in Joins

Data skew occurs when some keys have significantly more data than others, causing some tasks to take much longer.

Detection:

  • Monitor task duration in Spark UI
  • Look for tasks with much longer duration
  • Check shuffle read/write metrics

Mitigation:

  • Salting: Add random prefix to skewed keys
  • Broadcast join for skewed tables
  • Adaptive Query Execution (AQE) in Spark 3.0+

8. Join Order Optimization

The order of joins can significantly impact performance:

  • Join smaller tables first to reduce intermediate results
  • Use broadcast joins for small-large joins
  • Consider join cardinality (1:1, 1:N, N:M)

πŸ”‘ Key Concepts Table

Join TypeDescriptionShuffle?Use Case
Inner JoinOnly matching rows from bothYesDefault, most common
Left OuterAll left + matching rightYesKeep all left records
Right OuterAll right + matching leftYesKeep all right records
Full OuterAll from both tablesYesComplete data merge
Left SemiLeft rows with match in rightYesEXISTS check
Left AntiLeft rows without match in rightYesNOT EXISTS check
Cross JoinCartesian productYesAll combinations
Broadcast JoinSmall table broadcastNoSmall + large table
Sort-Merge JoinSort both, then mergeYesLarge + large table
Shuffle Hash JoinHash table in memoryYesMedium + medium

πŸ’» Code Examples

Example 1: Basic Join Types

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("JoinOptimization").getOrCreate()

# Create sample DataFrames
employees = spark.createDataFrame([
    (1, "Alice", "Engineering"),
    (2, "Bob", "Marketing"),
    (3, "Charlie", "Engineering"),
    (4, "Diana", "Sales"),
    (5, "Eve", None)
], ["id", "name", "department"])

departments = spark.createDataFrame([
    ("Engineering", "San Francisco", 50),
    ("Marketing", "New York", 30),
    ("Sales", "Chicago", 40),
    ("HR", "Boston", 20)
], ["dept_name", "location", "headcount"])

# Inner Join
inner = employees.join(departments, employees.department == departments.dept_name, "inner")
print("Inner Join:")
inner.show()

# Left Outer Join
left = employees.join(departments, employees.department == departments.dept_name, "left")
print("Left Outer Join:")
left.show()

# Right Outer Join
right = employees.join(departments, employees.department == departments.dept_name, "right")
print("Right Outer Join:")
right.show()

# Full Outer Join
full = employees.join(departments, employees.department == departments.dept_name, "full")
print("Full Outer Join:")
full.show()

# Left Semi Join
semi = employees.join(departments, employees.department == departments.dept_name, "left_semi")
print("Left Semi Join:")
semi.show()

# Left Anti Join
anti = employees.join(departments, employees.department == departments.dept_name, "left_anti")
print("Left Anti Join:")
anti.show()

Example 2: Broadcast Join

# Create large and small DataFrames
large_df = spark.range(1000000).withColumn("key", col("id") % 1000)
small_df = spark.createDataFrame([
    (i, f"category_{i}") for i in range(100)
], ["key", "category"])

# Method 1: Broadcast hint
result = large_df.join(broadcast(small_df), "key")

# Method 2: Configure auto broadcast threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
result = large_df.join(small_df, "key")

# Check execution plan
result.explain()

# Output shows BroadcastHashJoin
# == Physical Plan ==
# *(2) BroadcastHashJoin [key], [key], Inner, BuildLeft
# :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
# :  *(1) Scan Parquet [id#0L, key#1L]
# *(2) BroadcastExchange HashedRelationBroadcastMode(List(input[0, int, true]))
# :  *(1) Scan Parquet [key#2L, category#3]

Example 3: Sort-Merge Join with Bucketing

# Create bucketed tables
users = spark.createDataFrame([
    (i, f"user_{i}", i % 10) for i in range(100000)
], ["user_id", "name", "dept_id"])

orders = spark.createDataFrame([
    (i, i % 100000, i * 10.0) for i in range(1000000)
], ["order_id", "user_id", "amount"])

# Write bucketed tables
users.write \
    .bucketBy(20, "user_id") \
    .sortBy("user_id") \
    .saveAsTable("users_bucketed")

orders.write \
    .bucketBy(20, "user_id") \
    .sortBy("user_id") \
    .saveAsTable("orders_bucketed")

# Read bucketed tables
users_bucketed = spark.table("users_bucketed")
orders_bucketed = spark.table("orders_bucketed")

# Join bucketed tables (no shuffle needed!)
result = users_bucketed.join(orders_bucketed, "user_id")
result.explain()

# Check that no shuffle occurs
# Output shows SortMergeJoin without Exchange

Example 4: Handling Data Skew

# Create skewed data
skewed_data = spark.createDataFrame(
    [(i, f"user_{i % 10}") for i in range(1000000)] +
    [(i + 1000000, "skewed_user") for i in range(100000)],  # Skewed key
    ["id", "user_id"]
)

user_data = spark.createDataFrame([
    (f"user_{i}", f"Name {i}") for i in range(10)
] + [("skewed_user", "Skewed User")],
    ["user_id", "name"]
)

# Method 1: Broadcast if possible
result = skewed_data.join(broadcast(user_data), "user_id")

# Method 2: Salting (add random prefix to skewed keys)
import random

# Add salt to skewed data
salted = skewed_data.withColumn(
    "salt",
    when(col("user_id") == "skewed_user", 
         (rand() * 10).cast("int"))
    .otherwise(0)
).withColumn(
    "salted_key",
    concat(col("user_id"), lit("_"), col("salt"))
)

# Expand user data with salts
user_with_salt = user_data.crossJoin(
    spark.range(10).withColumnRenamed("id", "salt")
).withColumn(
    "salted_key",
    concat(col("user_id"), lit("_"), col("salt"))
)

# Join on salted keys
result = salted.join(user_with_salt, "salted_key")
result.explain()

πŸ“Š Performance Metrics

Join Type1GB + 10MB1GB + 1GB10GB + 10GBShuffle Size
Broadcast Join2.5sN/AN/A0 MB
Sort-Merge Join8.5s12.0s45.0s2x input
Shuffle Hash Join6.0s9.0sN/A2x input
Broadcast (SQL)2.0sN/AN/A0 MB
Bucket Join4.0s6.0s25.0s0 MB
Left Semi Join5.0s8.0s30.0s1x input
Left Anti Join4.5s7.0s28.0s1x input
Cross Join120.0s1200.0sN/AN/A

βœ… Best Practices

1. Use Broadcast Joins for Small Tables

from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")
# Or configure threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")

2. Bucket Tables for Repeated Joins

# Write bucketed tables
df1.write.bucketBy(100, "key").saveAsTable("t1_bucketed")
df2.write.bucketBy(100, "key").saveAsTable("t2_bucketed")

# Join bucketed tables without shuffle
result = spark.table("t1_bucketed").join(spark.table("t2_bucketed"), "key")

3. Handle Data Skew

# Broadcast if possible
result = skewed_df.join(broadcast(small_df), "key")

# Or use salting for large skewed joins
salted_df = skewed_df.withColumn("salt", (rand() * 10).cast("int"))

4. Choose Correct Join Type

# Use left_semi instead of inner when you only need left table columns
result = df1.join(df2, "key", "left_semi")  # Faster, no duplicate columns

# Use left_anti for NOT EXISTS
result = df1.join(df2, "key", "left_anti")

5. Filter Before Join

# Filter early to reduce data size
result = df1.filter(col("age") > 30).join(df2, "key")

6. Monitor Join Performance

# Check execution plan
result.explain(True)

# Look for:
# - BroadcastHashJoin (good for small tables)
# - SortMergeJoin (default for large tables)
# - Exchange (shuffle operations)

See Also

  • Kafka Streams (kafka/03): Join operations in stream processing
  • Data Engineering Streaming (data-engineering/022): Join optimization in streaming pipelines

Advertisement

Need Expert PySpark Help?

Get personalized Spark optimization, cluster tuning, or production data pipeline consulting.

Advertisement