π PySpark Joins Optimization
DfJoin (Relational Algebra)
A join combines two datasets based on a common key. In Spark, joins are implemented as wide transformations requiring shuffle unless one side is broadcast. The join strategy (broadcast, sort-merge, shuffle-hash) is selected by Catalyst based on data statistics.
DfBroadcast Join
A broadcast join sends the smaller dataset to all executors via broadcast variables, eliminating shuffle on the larger side. Effective when one dataset fits in executor memory (typically < 10MB, configurable via autoBroadcastJoinThreshold).
Broadcast Join Threshold
Here,
- =Effective broadcast threshold in bytes
- =Executor memory available for broadcast
- =Safety factor (default 0.25 β use 25% of executor memory)
- =Configured spark.sql.autoBroadcastJoinThreshold
Catalyst selects join strategy based on estimated table sizes: if the smaller side is below autoBroadcastJoinThreshold, it uses BroadcastHashJoin; otherwise, it defaults to SortMergeJoin (most general) or ShuffleHashJoin (for medium-sized data).
For skewed joins, use skewJoinHint or AQE's skew join handling. AQE automatically detects skewed partitions and splits them into sub-partitions to balance the workload.
ThBroadcast Join Optimization
Theorem: If one side of a join is broadcast, the shuffle cost is reduced from O(P Γ N Γ W) to O(P Γ N_{small} Γ W) where P is the number of partitions on the large side, N is row count, W is row width, and N_{small} is the small table row count. This provides a speedup of N_{large} / (N_{large} + N_{small} Γ P).
- Broadcast joins eliminate shuffle when one side fits in memory
- Default broadcast threshold is 10MB; increase to 50-100MB for large clusters
- Sort-merge join is the most general strategy; requires both sides to be sorted
- Bucket joins eliminate shuffle for repeated joins on the same key
- AQE handles skewed joins automatically at runtime
ποΈ Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β JOIN TYPES OVERVIEW β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β INNER JOIN β β
β β β β
β β Table A Table B Result β β
β β βββββββββ βββββββββ βββββββββ β β
β β β ββββ β β βββββ β β ββββ β β β
β β β βββββ β β β βββββ β = β ββββ β β β
β β β βββββ β β βββββ β βββββββββ β β
β β βββββββββ βββββββββ β β
β β Only matching rows from both tables β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β LEFT JOIN β β
β β β β
β β Table A Table B Result β β
β β βββββββββ βββββββββ βββββββββ β β
β β β ββββ β β βββββ β β ββββ β β β
β β β βββββ β β β βββββ β = β βββββ β β β
β β β βββββ β β βββββ β β βββββ β β β
β β βββββββββ βββββββββ βββββββββ β β
β β All rows from left + matching from right β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β RIGHT JOIN β β
β β β β
β β Table A Table B Result β β
β β βββββββββ βββββββββ βββββββββ β β
β β β ββββ β β βββββ β β βββββ β β β
β β β βββββ β β β βββββ β = β βββββ β β β
β β β βββββ β β βββββ β β βββββ β β β
β β βββββββββ βββββββββ βββββββββ β β
β β All rows from right + matching from left β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β FULL OUTER JOIN β β
β β β β
β β Table A Table B Result β β
β β βββββββββ βββββββββ βββββββββ β β
β β β ββββ β β βββββ β β βββββ β β β
β β β βββββ β β β βββββ β = β βββββ β β β
β β β βββββ β β βββββ β β βββββ β β β
β β βββββββββ βββββββββ βββββββββ β β
β β All rows from both tables (nulls for mismatches) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β BROADCAST HASH JOIN ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β DRIVER NODE β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Small Table (fits in memory) β β β
β β β βββββββ βββββββ βββββββ βββββββ β β β
β β β β A β β B β β C β β D β β β β
β β β βββββββ βββββββ βββββββ βββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β β
β β βΌ βΌ βΌ β β
β β Broadcast to all executors β β
β ββββββββββββββ¬ββββββββββββ¬ββββββββββββ¬ββββββββββββββββββ β
β β β β β
β βΌ βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β EXECUTOR NODES β β
β β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β β β Executor 0 β β Executor 1 β β Executor 2 β β β
β β β βββββββββ β β βββββββββ β β βββββββββ β β β
β β β βSmall β β β βSmall β β β βSmall β β β β
β β β βTable β β β βTable β β β βTable β β β β
β β β βββββββββ β β βββββββββ β β βββββββββ β β β
β β β βββββββββ β β βββββββββ β β βββββββββ β β β
β β β βLarge β β β βLarge β β β βLarge β β β β
β β β βPart 0β β β βPart 1β β β βPart 2β β β β
β β β βββββββββ β β βββββββββ β β βββββββββ β β β
β β β β β β β β β β β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β β β β
β β Each executor joins local large partition with β β
β β broadcast copy of small table (NO SHUFFLE) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β ADVANTAGE: Zero shuffle, parallel execution β
β LIMITATION: Small table must fit in executor memory β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SORT-MERGE JOIN ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β STAGE 1: SHUFFLE WRITE β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β Table A Partitions Table B Partitions β β
β β βββββββ βββββββ βββββββ βββββββ β β
β β β 0 β β 1 β β 0 β β 1 β β β
β β ββββ¬βββ ββββ¬βββ ββββ¬βββ ββββ¬βββ β β
β β β β β β β β
β β βΌ βΌ βΌ βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β HASH PARTITION BY KEY β β β
β β β βββββββββ βββββββββ βββββββββ βββββββββ β β β
β β β βreduce β βreduce β βreduce β βreduce β β β β
β β β β 0 β β 1 β β 2 β β 3 β β β β
β β β βββββββββ βββββββββ βββββββββ βββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β STAGE 2: SORT + MERGE β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β Reduce Partition 0: β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Sort A: [a1, a2, a3, ...] β β β
β β β Sort B: [b1, b2, b3, ...] β β β
β β β β β β β
β β β MERGE: β β β
β β β βββββββ βββββββ βββββββ β β β
β β β β A β β B β βResultβ β β β
β β β βptr β βptr β β β β β β
β β β βββββββ βββββββ βββββββ β β β
β β β β β β
β β β while (a < b) a++ β β β
β β β while (a == b) output(a, b), a++, b++ β β β
β β β while (a > b) b++ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β COMPLEXITY: O(N + M) per partition (linear merge) β
β ADVANTAGE: Works for any key distribution β
β SHUFFLE: Required (data redistribution by key) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Detailed Explanation
1. Join Types in PySpark
PySpark supports several join types, each with different semantics:
Inner Join: Returns only rows that have matching keys in both DataFrames. This is the default join type and the most common.
Left Outer Join: Returns all rows from the left DataFrame and matching rows from the right DataFrame. Non-matching rows have null values for right columns.
Right Outer Join: Returns all rows from the right DataFrame and matching rows from the left DataFrame. Non-matching rows have null values for left columns.
Full Outer Join: Returns all rows from both DataFrames. Non-matching rows have null values for missing columns.
Left Semi Join: Returns all rows from the left DataFrame where there is a matching row in the right DataFrame. Equivalent to EXISTS in SQL.
Left Anti Join: Returns all rows from the left DataFrame where there is NO matching row in the right DataFrame. Equivalent to NOT EXISTS in SQL.
Cross Join: Returns the Cartesian product of both DataFrames (all combinations). Use with extreme caution!
2. Broadcast Hash Join
Broadcast hash join is the most efficient join strategy when one table is small enough to fit in memory. The small table is broadcast to all executors, and each executor joins its local partition with the broadcast table.
Configuration:
# Default threshold: 10MB
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
# Manual broadcast hint
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")
Advantages:
- No shuffle required
- Parallel execution on all executors
- Low memory overhead for small table
- Very fast for small-large joins
Limitations:
- Small table must fit in executor memory
- Network overhead for broadcasting
- Not suitable for large-small joins
3. Sort-Merge Join
Sort-merge join is the default strategy for joining large tables. It consists of three phases:
- Shuffle: Both tables are partitioned by key
- Sort: Each partition is sorted by key
- Merge: Sorted partitions are merged
Optimization Techniques:
- Partition pruning: Skip partitions that don't match
- Bucketing: Pre-partition data by join key
- Sort merge join with Bloom filter: Skip non-matching keys
4. Shuffle Hash Join
Shuffle hash join is used when one table is moderately sized. After shuffling, a hash table is built for the smaller partition, and the larger partition probes the hash table.
When to use:
- Medium-sized tables (10MB - 1GB)
- High selectivity joins
- When broadcast join is not possible
5. Cartesian Product Join
Cartesian product joins produce all combinations of rows from both tables. This is extremely expensive and should be avoided unless absolutely necessary.
Complexity: O(N Γ M) Use cases: Only when business logic requires all combinations
6. Join Optimization Strategies
Broadcast Hints:
# Force broadcast for known small tables
result = large_df.join(broadcast(small_df), "key")
Bucketing:
# Pre-bucket tables by join key
df.write.bucketBy(100, "user_id").saveAsTable("users_bucketed")
df.write.bucketBy(100, "user_id").saveAsTable("orders_bucketed")
Partitioning:
# Repartition by join key
df = df.repartition(100, "user_id")
7. Data Skew in Joins
Data skew occurs when some keys have significantly more data than others, causing some tasks to take much longer.
Detection:
- Monitor task duration in Spark UI
- Look for tasks with much longer duration
- Check shuffle read/write metrics
Mitigation:
- Salting: Add random prefix to skewed keys
- Broadcast join for skewed tables
- Adaptive Query Execution (AQE) in Spark 3.0+
8. Join Order Optimization
The order of joins can significantly impact performance:
- Join smaller tables first to reduce intermediate results
- Use broadcast joins for small-large joins
- Consider join cardinality (1:1, 1:N, N:M)
π Key Concepts Table
| Join Type | Description | Shuffle? | Use Case |
|---|---|---|---|
| Inner Join | Only matching rows from both | Yes | Default, most common |
| Left Outer | All left + matching right | Yes | Keep all left records |
| Right Outer | All right + matching left | Yes | Keep all right records |
| Full Outer | All from both tables | Yes | Complete data merge |
| Left Semi | Left rows with match in right | Yes | EXISTS check |
| Left Anti | Left rows without match in right | Yes | NOT EXISTS check |
| Cross Join | Cartesian product | Yes | All combinations |
| Broadcast Join | Small table broadcast | No | Small + large table |
| Sort-Merge Join | Sort both, then merge | Yes | Large + large table |
| Shuffle Hash Join | Hash table in memory | Yes | Medium + medium |
π» Code Examples
Example 1: Basic Join Types
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("JoinOptimization").getOrCreate()
# Create sample DataFrames
employees = spark.createDataFrame([
(1, "Alice", "Engineering"),
(2, "Bob", "Marketing"),
(3, "Charlie", "Engineering"),
(4, "Diana", "Sales"),
(5, "Eve", None)
], ["id", "name", "department"])
departments = spark.createDataFrame([
("Engineering", "San Francisco", 50),
("Marketing", "New York", 30),
("Sales", "Chicago", 40),
("HR", "Boston", 20)
], ["dept_name", "location", "headcount"])
# Inner Join
inner = employees.join(departments, employees.department == departments.dept_name, "inner")
print("Inner Join:")
inner.show()
# Left Outer Join
left = employees.join(departments, employees.department == departments.dept_name, "left")
print("Left Outer Join:")
left.show()
# Right Outer Join
right = employees.join(departments, employees.department == departments.dept_name, "right")
print("Right Outer Join:")
right.show()
# Full Outer Join
full = employees.join(departments, employees.department == departments.dept_name, "full")
print("Full Outer Join:")
full.show()
# Left Semi Join
semi = employees.join(departments, employees.department == departments.dept_name, "left_semi")
print("Left Semi Join:")
semi.show()
# Left Anti Join
anti = employees.join(departments, employees.department == departments.dept_name, "left_anti")
print("Left Anti Join:")
anti.show()
Example 2: Broadcast Join
# Create large and small DataFrames
large_df = spark.range(1000000).withColumn("key", col("id") % 1000)
small_df = spark.createDataFrame([
(i, f"category_{i}") for i in range(100)
], ["key", "category"])
# Method 1: Broadcast hint
result = large_df.join(broadcast(small_df), "key")
# Method 2: Configure auto broadcast threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
result = large_df.join(small_df, "key")
# Check execution plan
result.explain()
# Output shows BroadcastHashJoin
# == Physical Plan ==
# *(2) BroadcastHashJoin [key], [key], Inner, BuildLeft
# :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
# : *(1) Scan Parquet [id#0L, key#1L]
# *(2) BroadcastExchange HashedRelationBroadcastMode(List(input[0, int, true]))
# : *(1) Scan Parquet [key#2L, category#3]
Example 3: Sort-Merge Join with Bucketing
# Create bucketed tables
users = spark.createDataFrame([
(i, f"user_{i}", i % 10) for i in range(100000)
], ["user_id", "name", "dept_id"])
orders = spark.createDataFrame([
(i, i % 100000, i * 10.0) for i in range(1000000)
], ["order_id", "user_id", "amount"])
# Write bucketed tables
users.write \
.bucketBy(20, "user_id") \
.sortBy("user_id") \
.saveAsTable("users_bucketed")
orders.write \
.bucketBy(20, "user_id") \
.sortBy("user_id") \
.saveAsTable("orders_bucketed")
# Read bucketed tables
users_bucketed = spark.table("users_bucketed")
orders_bucketed = spark.table("orders_bucketed")
# Join bucketed tables (no shuffle needed!)
result = users_bucketed.join(orders_bucketed, "user_id")
result.explain()
# Check that no shuffle occurs
# Output shows SortMergeJoin without Exchange
Example 4: Handling Data Skew
# Create skewed data
skewed_data = spark.createDataFrame(
[(i, f"user_{i % 10}") for i in range(1000000)] +
[(i + 1000000, "skewed_user") for i in range(100000)], # Skewed key
["id", "user_id"]
)
user_data = spark.createDataFrame([
(f"user_{i}", f"Name {i}") for i in range(10)
] + [("skewed_user", "Skewed User")],
["user_id", "name"]
)
# Method 1: Broadcast if possible
result = skewed_data.join(broadcast(user_data), "user_id")
# Method 2: Salting (add random prefix to skewed keys)
import random
# Add salt to skewed data
salted = skewed_data.withColumn(
"salt",
when(col("user_id") == "skewed_user",
(rand() * 10).cast("int"))
.otherwise(0)
).withColumn(
"salted_key",
concat(col("user_id"), lit("_"), col("salt"))
)
# Expand user data with salts
user_with_salt = user_data.crossJoin(
spark.range(10).withColumnRenamed("id", "salt")
).withColumn(
"salted_key",
concat(col("user_id"), lit("_"), col("salt"))
)
# Join on salted keys
result = salted.join(user_with_salt, "salted_key")
result.explain()
π Performance Metrics
| Join Type | 1GB + 10MB | 1GB + 1GB | 10GB + 10GB | Shuffle Size |
|---|---|---|---|---|
| Broadcast Join | 2.5s | N/A | N/A | 0 MB |
| Sort-Merge Join | 8.5s | 12.0s | 45.0s | 2x input |
| Shuffle Hash Join | 6.0s | 9.0s | N/A | 2x input |
| Broadcast (SQL) | 2.0s | N/A | N/A | 0 MB |
| Bucket Join | 4.0s | 6.0s | 25.0s | 0 MB |
| Left Semi Join | 5.0s | 8.0s | 30.0s | 1x input |
| Left Anti Join | 4.5s | 7.0s | 28.0s | 1x input |
| Cross Join | 120.0s | 1200.0s | N/A | N/A |
β Best Practices
1. Use Broadcast Joins for Small Tables
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")
# Or configure threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
2. Bucket Tables for Repeated Joins
# Write bucketed tables
df1.write.bucketBy(100, "key").saveAsTable("t1_bucketed")
df2.write.bucketBy(100, "key").saveAsTable("t2_bucketed")
# Join bucketed tables without shuffle
result = spark.table("t1_bucketed").join(spark.table("t2_bucketed"), "key")
3. Handle Data Skew
# Broadcast if possible
result = skewed_df.join(broadcast(small_df), "key")
# Or use salting for large skewed joins
salted_df = skewed_df.withColumn("salt", (rand() * 10).cast("int"))
4. Choose Correct Join Type
# Use left_semi instead of inner when you only need left table columns
result = df1.join(df2, "key", "left_semi") # Faster, no duplicate columns
# Use left_anti for NOT EXISTS
result = df1.join(df2, "key", "left_anti")
5. Filter Before Join
# Filter early to reduce data size
result = df1.filter(col("age") > 30).join(df2, "key")
6. Monitor Join Performance
# Check execution plan
result.explain(True)
# Look for:
# - BroadcastHashJoin (good for small tables)
# - SortMergeJoin (default for large tables)
# - Exchange (shuffle operations)
See Also
- Kafka Streams (kafka/03): Join operations in stream processing
- Data Engineering Streaming (data-engineering/022): Join optimization in streaming pipelines