π Geospatial Data Processing in PySpark
Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β GEOSPATIAL PROCESSING ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β Raw Spatial ββββββΆβ Coordinate ββββββΆβ Spatial β β
β β Data β β Transform β β Index β β
β β (Lat/Lng) β β (CRS) β β (H3/S2) β β
β ββββββββββββββββ ββββββββ¬ββββββββ ββββββββ¬ββββββββ β
β β β β
β βΌ βΌ β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β Geometric β β Spatial β β
β β Operations β β Partitioning β β
β β βββββββββββββ β β βββββββββββββ β β
β β Buffer β β Kd-tree β β
β β Intersection β β R-tree β β
β β Containment β β Quad-tree β β
β ββββββββββ¬ββββββββββ ββββββββββ¬ββββββββββ β
β β β β
β βΌ βΌ β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β Spatial Join β β Distance β β
β β (Point-in-Poly) β β Calculations β β
β β βββββββββββββ β β βββββββββββββ β β
β β Broadcast join β β Haversine β β
β β KD-tree join β β Euclidean β β
β β Grid-based join β β Manhattan β β
β ββββββββββ¬ββββββββββ ββββββββββ¬ββββββββββ β
β β β β
β βΌ βΌ β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β Trajectory β β Geohash β β
β β Analysis β β Encoding β β
β β βββββββββββββ β β βββββββββββββ β β
β β Movement β β Clustering β β
β β patterns β β Bounding boxes β β
β β Stay points β β Prefix matching β β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SPATIAL INDEXING SCHEMES β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β H3 (Uber's Hexagonal Hierarchical Spatial Index) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β±β² β±β² β±β² β β
β β β± β² β± β² β± β² β β
β β β± β² β± β² β± β² β β
β β β± 123 β²β± 456 β²β± 789 β² β β
β β β² β±β² β±β² β± β β
β β β² β± β² β± β² β± β β
β β β² β± β² β± β² β± β β
β β β²β± β²β± β²β± β β
β β β β
β β Resolution levels: β β
β β ββββββββββ¬βββββββββββ¬βββββββββββ¬βββββββββββ¬βββββββββββ β β
β β β Level β Edge Len β Area β Use Case β Precisionβ β β
β β ββββββββββΌβββββββββββΌβββββββββββΌβββββββββββΌβββββββββββ€ β β
β β β 0 β 1,107 km β 4.2M kmΒ² β Country β ~1000km β β β
β β β 4 β 17.7 km β 1,770 kmΒ²β City β ~10km β β β
β β β 8 β 463 m β 0.74 kmΒ² β Street β ~500m β β β
β β β 12 β 12 m β 0.002 kmΒ²β Building β ~10m β β β
β β β 15 β 0.9 m β 1e-5 kmΒ² β Object β ~1m β β β
β β ββββββββββ΄βββββββββββ΄βββββββββββ΄βββββββββββ΄βββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β S2 (Google's Spherical Geometry Library) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Face 0 Face 1 Face 2 β β β
β β β βββββββββββββ βββββββββββββ βββββββββββββ β β β
β β β β Cell 0-0 β β Cell 1-0 β β Cell 2-0 β β β β
β β β β Cell 0-1 β β Cell 1-1 β β Cell 2-1 β β β β
β β β β Cell 0-2 β β Cell 1-2 β β Cell 2-2 β β β β
β β β β Cell 0-3 β β Cell 1-3 β β Cell 2-3 β β β β
β β β βββββββββββββ βββββββββββββ βββββββββββββ β β β
β β β β β β
β β β Projection: Cube β Sphere via quadratic transform β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Geohash (Prefix-based) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Level 1: 9 subdivisions β β
β β βββββ¬ββββ¬ββββ β β
β β β 0 β 1 β 2 β β β
β β βββββΌββββΌββββ€ β β
β β β 3 β 4 β 5 β β "s" prefix β β
β β βββββΌββββΌββββ€ β β
β β β 6 β 7 β 8 β β β
β β βββββ΄ββββ΄ββββ β β
β β β β
β β Level 2: 9Β² = 81 subdivisions β β
β β βββββ¬ββββ¬ββββ¬ββββ¬ββββ¬ββββ¬ββββ¬ββββ¬ββββ β β
β β β00 β01 β02 β10 β11 β12 β20 β21 β22 β β β
β β βββββΌββββΌββββΌββββΌββββΌββββΌββββΌββββΌββββ€ β β
β β β...β...β...β...β...β...β...β...β...β β "s9" prefix β β
β β βββββ΄ββββ΄ββββ΄ββββ΄ββββ΄ββββ΄ββββ΄ββββ΄ββββ β β
β β β β
β β Longer prefix = smaller area = higher precision β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SPATIAL JOIN STRATEGIES β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Strategy 1: Broadcast Join (Small + Large) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Small table (< 10MB) Large table (TB) β β
β β ββββββββββββ ββββββββββββββββββββββββββ β β
β β β Polygons β βββββββββββΆβ Points (partitioned) β β β
β β β (1000) β broadcast β β β β
β β ββββββββββββ β ββββββ ββββββ βββββββ β β
β β β β P1 β β P2 β β P3 ββ β β
β β Each partition checks β ββββββ ββββββ βββββββ β β
β β against all polygons ββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Strategy 2: KD-Tree Join (Both Large) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Table A Table B β β
β β ββββββββββββ ββββββββββββ β β
β β β Points β β Points β β β
β β ββββββ¬ββββββ ββββββ¬ββββββ β β
β β β β β β
β β βΌ βΌ β β
β β ββββββββββββ ββββββββββββ β β
β β β Build β β Build β β β
β β β KD-Tree β β KD-Tree β β β
β β ββββββ¬ββββββ ββββββ¬ββββββ β β
β β β β β β
β β ββββββββββββ¬ββββββββββββββ β β
β β βΌ β β
β β ββββββββββββββββββββββββββββββββββββββββ β β
β β β For each partition pair: β β β
β β β Query KD-tree with bounding box β β β
β β β Filter by exact distance predicate β β β
β β ββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Strategy 3: Grid-based Join (Uniform Distribution) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β ββββββ¬βββββ¬βββββ¬βββββ¬βββββ β β
β β β A1 β A2 β A3 β A4 β A5 β Points assigned to cells β β
β β ββββββΌβββββΌβββββΌβββββΌβββββ€ β β
β β β B1 β B2 β B3 β B4 β B5 β Polygons assigned to cells β β
β β ββββββΌβββββΌβββββΌβββββΌβββββ€ β β
β β β C1 β C2 β C3 β C4 β C5 β Only same-cell pairs join β β
β β ββββββΌβββββΌβββββΌβββββΌβββββ€ β β
β β β D1 β D2 β D3 β D4 β D5 β β β
β β ββββββ΄βββββ΄βββββ΄βββββ΄βββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Detailed Explanation
Geospatial data processing in PySpark involves handling spatial coordinates, performing geometric operations, and executing spatial joins at scale. The fundamental challenge is that spatial data has geometric properties (points, lines, polygons) that cannot be directly compared using standard relational operators. Spatial operations require specialized indexing structures and join strategies that account for the two-dimensional nature of geographic data.
Coordinate Reference Systems (CRS) define how spatial coordinates map to locations on Earth. The most common systems are WGS84 (EPSG:4326) for latitude/longitude coordinates and UTM (Universal Transverse Mercator) for projected coordinates in meters. Converting between coordinate systems is essential for accurate distance calculationsβusing latitude/longitude directly for distance introduces significant errors due to the Earth's curvature. The Haversine formula accounts for this by computing great-circle distances on a sphere.
Spatial indexing structures enable efficient querying of spatial data. Without indexing, a spatial join requires checking every pair of geometriesβa quadratic operation that is infeasible for large datasets. H3 (Uber's hexagonal index) partitions the world into hierarchical hexagonal cells, where each cell has a unique identifier. S2 (Google's library) projects the sphere onto a cube and uses quadtree cells. Geohashes use a space-filling curve (Morton order) to encode 2D coordinates as 1D strings, enabling efficient prefix-based proximity queries.
Spatial joins in PySpark follow one of three strategies depending on the data sizes and distribution. Broadcast joins are optimal when one dataset is small enough to fit in memoryβall partitions of the large dataset can check against the broadcast dataset. KD-tree joins build a spatial index on each partition and exchange only bounding-box candidates across partitions, reducing the number of exact geometry checks. Grid-based joins assign geometries to grid cells and only join geometries that share a cell, which works well for uniformly distributed data but fails with skewed distributions.
Point-in-polygon operations are the most common spatial query, determining whether a geographic point lies within a polygon boundary. This operation requires the ray-casting algorithm or winding number algorithm, both of which have O(n) complexity where n is the number of polygon vertices. For large polygon datasets, spatial indexing dramatically reduces the search space by first filtering polygons whose bounding boxes contain the point, then performing the exact containment test only on candidates.
Trajectory analysis processes sequences of location points to extract movement patterns. This requires temporal ordering, stay-point detection (identifying locations where an entity remained stationary), and trajectory segmentation (dividing continuous movement into meaningful segments). Stay-point detection uses both distance and time thresholdsβa stay point exists when an entity remains within a distance threshold for at least a minimum duration. Trajectory segmentation uses stay points as boundaries to create individual trip records.
Mathematical Foundations
Definition: Haversine Distance
The great-circle distance between two points and on a sphere of radius is:
Spatial Index Resolution
For H3 index resolution (0-15), the average hexagon area is:
Resolution 8 gives 464m edge length; resolution 9 gives 174m.
Spatial Join Complexity
Without indexing, a spatial join between datasets of sizes and has complexity . With grid-based spatial indexing dividing the space into cells:
when data is uniformly distributed across cells.
Convex Hull Area
For polygon with vertices , the shoelace formula gives area:
Nearest Neighbor Search
For -NN with spatial index, the search prunes cells whose minimum bounding rectangle distance exceeds the current -th nearest distance :
Key Insight
Spark's ST_* functions from Sedona compute exact geometry operations, while H3/S2 provide approximate but indexable representations. Use exact geometry for correctness-critical joins, approximate indexing for exploratory analysis at scale.
Summary
Geospatial processing on Spark combines exact geometric operations (Haversine, polygon intersection) with hierarchical spatial indices (H3, S2). Spatial indexing reduces join complexity from to near-linear. Coordinate reference system transformations must precede any distance calculations.
Key Concepts Table
| Concept | Description | Use Case |
|---|---|---|
| H3 Index | Hexagonal hierarchical spatial index | Location analytics, ride-sharing |
| S2 Cell | Quadtree cell on sphere | Maps, street view, global coverage |
| Geohash | Prefix-based 1D encoding | Proximity search, caching |
| Bounding Box | Minimum enclosing rectangle | Index filtering, quick rejection |
| Point-in-Polygon | Containment test for points | Store finder, zone assignment |
| KNN Query | K nearest neighbors | Nearest facility, POI search |
| Buffer | Points within distance | Service area, proximity analysis |
| Convex Hull | Smallest convex polygon | Area of coverage, boundary |
| Spatial Join | Join based on geometric predicates | Region-based aggregation |
| Trajectory | Ordered sequence of locations | Movement analysis, route optimization |
Code Examples
Coordinate Operations and Distance Calculations
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import math
spark = SparkSession.builder \
.appName("GeospatialData") \
.getOrCreate()
# Define UDFs for spatial calculations
@udf(returnType=DoubleType())
def haversine_distance(lat1, lon1, lat2, lon2):
"""Calculate great-circle distance between two points using Haversine formula."""
R = 6371000 # Earth's radius in meters
lat1_rad = math.radians(lat1)
lat2_rad = math.radians(lat2)
delta_lat = math.radians(lat2 - lat1)
delta_lon = math.radians(lon2 - lon1)
a = (math.sin(delta_lat / 2) ** 2 +
math.cos(lat1_rad) * math.cos(lat2_rad) *
math.sin(delta_lon / 2) ** 2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
return R * c
@udf(returnType=StringType())
def geohash_encode(lat, lon, precision=12):
"""Encode coordinates to geohash."""
BASE32 = '0123456789bcdefghjkmnpqrstuvwxyz'
lat_range = [-90.0, 90.0]
lon_range = [-180.0, 180.0]
geohash = []
bit = 0
ch = 0
is_lon = True
while len(geohash) < precision:
if is_lon:
mid = (lon_range[0] + lon_range[1]) / 2
if lon >= mid:
ch |= (1 << (4 - bit))
lon_range[0] = mid
else:
lon_range[1] = mid
else:
mid = (lat_range[0] + lat_range[1]) / 2
if lat >= mid:
ch |= (1 << (4 - bit))
lat_range[0] = mid
else:
lat_range[1] = mid
is_lon = not is_lon
bit += 1
if bit == 5:
geohash.append(BASE32[ch])
bit = 0
ch = 0
return ''.join(geohash)
# Create point data (restaurants)
restaurants = [
("R001", "Pizza Palace", 40.7128, -74.0060, "Italian"),
("R002", "Burger Barn", 40.7580, -73.9855, "American"),
("R003", "Sushi Spot", 40.7484, -73.9857, "Japanese"),
("R004", "Taco Town", 40.7614, -73.9776, "Mexican"),
("R005", "Noodle Nest", 40.7831, -73.9712, "Chinese"),
]
restaurants_df = spark.createDataFrame(restaurants, [
"id", "name", "latitude", "longitude", "cuisine"
])
# Create polygon data (neighborhoods)
neighborhoods = [
("N001", "Manhattan", "POLYGON((-74.01 40.70, -73.97 40.70, -73.97 40.78, -74.01 40.78, -74.01 40.70))"),
("N002", "Brooklyn", "POLYGON((-73.99 40.57, -73.94 40.57, -73.94 40.70, -73.99 40.70, -73.99 40.57))"),
]
neighborhoods_df = spark.createDataFrame(neighborhoods, [
"id", "name", "geometry"
])
# Add geohash to restaurants
restaurants_with_geohash = restaurants_df \
.withColumn("geohash", geohash_encode(col("latitude"), col("longitude"), lit(8)))
# Find restaurants within 1km of a reference point
reference_lat = 40.7580
reference_lon = -73.9855
nearby_restaurants = restaurants_df \
.withColumn("distance_meters",
haversine_distance(
lit(reference_lat), lit(reference_lon),
col("latitude"), col("longitude")
)
) \
.filter(col("distance_meters") <= 1000) \
.orderBy("distance_meters")
print("Restaurants within 1km:")
nearby_restaurants.show(truncate=False)
Spatial Joins and Indexing
# Point-in-polygon using ray-casting algorithm
@udf(returnType=BooleanType())
def point_in_polygon(lat, lon, polygon_wkt):
"""Check if point is inside polygon using ray-casting."""
# Parse WKT polygon (simplified)
coords_str = polygon_wkt.replace("POLYGON((", "").replace("))", "")
coords = []
for pair in coords_str.split(","):
lng, lt = pair.strip().split()
coords.append((float(lng), float(lt)))
# Ray-casting algorithm
n = len(coords)
inside = False
j = n - 1
for i in range(n):
xi, yi = coords[i]
xj, yj = coords[j]
if ((yi > lat) != (yj > lat)) and (lon < (xj - xi) * (lat - yi) / (yj - yi) + xi):
inside = not inside
j = i
return inside
# Spatial join: assign restaurants to neighborhoods
spatial_join_df = restaurants_df.crossJoin(neighborhoods_df) \
.filter(point_in_polygon(col("latitude"), col("longitude"), col("geometry"))) \
.select(
restaurants_df.id,
restaurants_df.name,
neighborhoods_df.name.alias("neighborhood"),
restaurants_df.cuisine
)
print("Restaurants by neighborhood:")
spatial_join_df.show(truncate=False)
# KNN query: find 3 nearest restaurants to a point
from pyspark.sql.window import Window
knn_df = restaurants_df \
.withColumn("distance",
haversine_distance(
lit(40.7580), lit(-73.9855),
col("latitude"), col("longitude")
)
) \
.withColumn("rank", rank().over(Window.orderBy("distance"))) \
.filter(col("rank") <= 3)
print("3 nearest restaurants:")
knn_df.show(truncate=False)
H3 Index Operations
# H3 index functions (simplified implementation)
@udf(returnType=LongType())
def h3_latlng_to_cell(lat, lng, resolution):
"""Convert lat/lng to H3 cell index."""
# This is a simplified version - use h3 library in production
# H3 uses icosahedron projection and hexagonal tessellation
return int(f"8{resolution}{abs(int(lat*1000))}{abs(int(lng*1000))}")
@udf(returnType=ArrayType(DoubleType()))
def h3_cell_to_boundary(cell_index):
"""Get polygon boundary of H3 cell."""
# Simplified - returns approximate boundary
lat = (cell_index % 10000) / 1000.0
lng = ((cell_index // 10000) % 10000) / 1000.0
return [lat - 0.001, lng - 0.001, lat + 0.001, lng + 0.001]
# Add H3 index at different resolutions
h3_restaurants = restaurants_df \
.withColumn("h3_res8", h3_latlng_to_cell(col("latitude"), col("longitude"), lit(8))) \
.withColumn("h3_res10", h3_latlng_to_cell(col("latitude"), col("longitude"), lit(10))) \
.withColumn("h3_res12", h3_latlng_to_cell(col("latitude"), col("longitude"), lit(12)))
# Aggregate by H3 cell (spatial clustering)
h3_clusters = h3_restaurants \
.groupBy("h3_res8") \
.agg(
count("*").alias("restaurant_count"),
avg("latitude").alias("center_lat"),
avg("longitude").alias("center_lng"),
collect_list("cuisine").alias("cuisines")
)
print("H3 clusters (resolution 8):")
h3_clusters.show(truncate=False)
# Find neighboring cells
@udf(returnType=ArrayType(LongType()))
def h3_grid_disk(cell_index, k):
"""Get cells within k steps of given cell."""
# Simplified - returns grid disk neighbors
return [cell_index + i for i in range(-k, k+1) if i != 0]
# Find all restaurants within 2 H3 cells of a reference
reference_cell = h3_latlng_to_cell(lit(40.7580), lit(-73.9855), lit(8))
neighbor_cells = h3_grid_disk(reference_cell, lit(2))
nearby_h3 = h3_restaurants.filter(
col("h3_res8").isin(neighbor_cells)
)
Performance Metrics
| Operation | 1K Points | 100K Points | 10M Points | 100M Points |
|---|---|---|---|---|
| Haversine Distance (all pairs) | < 1 sec | 5-10 sec | 5-10 min | 5-10 hours |
| Point-in-Polygon (100 polys) | < 1 sec | 2-5 sec | 30-60 sec | 5-10 min |
| Spatial Join (broadcast) | < 1 sec | 1-3 sec | 10-20 sec | 2-5 min |
| KNN Query (k=10) | < 1 sec | 1-2 sec | 5-10 sec | 1-2 min |
| H3 Index Build | < 1 sec | 1-2 sec | 10-20 sec | 2-5 min |
| Geohash Encode | < 1 sec | 1-2 sec | 10-20 sec | 2-5 min |
| Buffer Operation | < 1 sec | 3-8 sec | 60-120 sec | 10-20 min |
| Convex Hull | < 1 sec | 1-3 sec | 10-20 sec | 2-5 min |
| Trajectory Segmentation | < 1 sec | 2-5 sec | 20-40 sec | 3-8 min |
| Spatial Clustering (DBSCAN) | < 1 sec | 5-15 sec | 2-5 min | 30-60 min |
Best Practices
- Use spatial indexing (H3, S2, Geohash) before spatial joins to reduce the number of exact geometry checks from O(nΓm) to O(nΓk) where k << m
- Broadcast small datasets when performing spatial joinsβthe broadcast threshold should be based on the polygon complexity, not just file size
- Convert to projected CRS (UTM) for accurate distance calculationsβdo not use latitude/longitude directly for Euclidean distance
- Use bounding box pre-filtering before expensive geometric operations to quickly reject non-candidate pairs
- Partition spatial data by geohash prefix to ensure nearby points land in the same partition, reducing shuffle during joins
- Cache H3 cell assignments when performing multiple spatial queries on the same dataset to avoid recomputing index values
- Implement spatial partitioning for large polygon datasets using R-tree or quadtree to enable efficient point-in-polygon queries
- Use
st_pointandst_containsfrom Sedona/Mosaic libraries for production spatial operations instead of custom UDFs - Monitor spatial skew when using grid-based partitioningβurban areas have much higher point density than rural areas
- Implement trajectory compression (Douglas-Peucker algorithm) to reduce the number of points in trajectories while preserving shape
- Use geohash prefix matching for proximity searchβtwo points within 1km will share a geohash prefix of length 5-6
- Avoid cross-join for spatial operations when possibleβuse broadcast or indexed joins to prevent quadratic explosion
See also: Snowflake Time Travel (snowflake/02), Kafka CDC (kafka/04), Airflow DAGs (airflow/02)