π Graph Processing in PySpark with GraphX
Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β GRAPHX ARCHITECTURE IN SPARK β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β Vertex ββββββΆβ Edge ββββββΆβ Graph β β
β β RDD β β RDD β β Object β β
β ββββββββββββββββ ββββββββββββββββ ββββββββ¬ββββββββ β
β β β
β ββββββββββββββββββββββββ€ β
β β β β
β βΌ βΌ β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β Graph Ops β β Pregel API β β
β β βββββββββββββ β β (Iterative) β β
β β connectedComp β β βββββββββββββ β β
β β triangleCount β β PageRank β β
β β shortestPaths β β Label Propagationβ β
β ββββββββββ¬ββββββββββ ββββββββββ¬ββββββββββ β
β β β β
β βΌ βΌ β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β AggregateMessagesβ β Vertex Program β β
β β (Map-reduce on β β (Apply function β β
β β edges) β β to each vertex)β β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β PREGEL ITERATIVE COMPUTATION MODEL β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Superstep 0 Superstep 1 Superstep 2 β
β βββββββββββ βββββββββββ βββββββββββ β
β β (A,β) ββββββββββΆβ (A,0) ββββββββββΆβ (A,1) β β
β β (B,β) β β (B,1) β β (B,2) β β
β β (C,β) β β (C,2) β β (C,3) β β
β β (D,β) β β (D,3) β β (D,4) β β
β βββββββββββ βββββββββββ βββββββββββ β
β β β β β
β βΌ βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Each superstep: β β
β β 1. Each vertex computes message from incoming edges β β
β β 2. Messages sent along edges β β
β β 3. Each vertex aggregates received messages β β
β β 4. Vertex updates its value based on aggregated messages β β
β β 5. Repeat until convergence or max iterations β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Convergence Detection: β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β if no messages sent OR maxIterations reached: β β β
β β β return final graph β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β GRAPH ALGORITHMS COMPARISON β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Algorithm Complexity Use Case Parallelism β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β PageRank O(V Γ E Γ k) Web ranking High β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β V=vertices, E=edges, k=iterations β β
β β Each iteration: O(E) for message passing β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Connected Comp O(V + E) Social networks Very High β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Union-Find with path compression and union by rank β β
β β Near-linear in practice with good partitioning β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Shortest Paths O(V Γ E) Routing Medium β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Dijkstra-like relaxation across partitions β β
β β Requires careful message passing to handle edge cuts β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Triangle Count O(E^1.5) Network analysis Low-Medium β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Counts triangles incident on each vertex β β
β β Requires edge orientation for efficient counting β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Label Propagation O(V Γ E Γ k) Community detection High β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Semi-supervised learning on graph structure β β
β β Converges when labels stabilize β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Detailed Explanation
GraphX is Spark's API for graph-parallel computation. It extends the RDD abstraction by introducing the Graph abstraction, which consists of a Vertex RDD (representing vertices with attributes) and an Edge RDD (representing directed edges with attributes). GraphX leverages Spark's distributed computing model to process graphs that are too large to fit in memory on a single machine, partitioning vertices and edges across the cluster.
The Pregel API is the core abstraction for iterative graph algorithms. Pregel implements the "think like a vertex" paradigm, where each vertex executes the same user-defined function at each superstep. The vertex function receives messages from the previous superstep, updates the vertex's state, and sends messages to neighboring vertices. This model naturally expresses algorithms like PageRank, shortest paths, and label propagation. The computation terminates when no more messages are sent or when a maximum iteration count is reached.
PageRank measures the importance of vertices in a graph by counting the number and quality of links to a page. The algorithm assigns a score to each vertex, where vertices with more incoming high-score edges receive higher scores. In GraphX, PageRank is implemented as a iterative message-passing algorithm where each vertex distributes its rank equally among its outgoing neighbors, then sums the contributions received from its incoming neighbors. The damping factor (typically 0.85) accounts for the probability of following a random link versus jumping to a random page.
Connected components identify groups of vertices that are reachable from each other. In GraphX, this is implemented using a label propagation approach where each vertex starts with its own ID as its component label, then iteratively adopts the minimum label among its neighbors. This converges when all vertices in the same component share the same label. The algorithm is highly parallelizable and achieves near-linear time complexity with good graph partitioning.
Graph partitioning is critical for performance in distributed graph processing. GraphX uses edge partitioning by default, where edges are distributed across partitions to minimize cross-partition communication. The partition strategy can be configured using PartitionStrategy, with options like RandomVertexCut (random edge assignment), EdgePartition2D (2D grid partitioning), and CanonicalRandomVertexCut (consistent partitioning for undirected graphs). The quality of partitioning directly impacts the number of messages that must be shuffled between partitions during computation.
The aggregateMessages API is the low-level building block for graph algorithms. It allows users to define a sendMsg function that sends messages along edges and a mergeMsg function that combines messages arriving at the same vertex. This API provides maximum flexibility for implementing custom algorithms that cannot be expressed through the built-in Pregel API. The sendMsg function receives the source and destination vertex attributes along with the edge, and returns an optional message to send to either vertex.
Mathematical Foundations
Definition: Property Graph
A property graph consists of vertex set , edge set , vertex properties , and edge properties where is the property domain.
PageRank Iteration
PageRank at iteration is computed as:
where is the damping factor (typically 0.85) and is the set of vertices with edges into .
Connected Components Theorem
In an undirected graph , two vertices and are in the same connected component if and only if there exists a path where . The number of components equals the number of equivalence classes under the reachability relation.
Betweenness Centrality
For vertex , betweenness centrality is:
where is the total number of shortest paths from to , and is the number passing through .
Graph Partitioning Load Balance
For partitions, load imbalance is:
Target: Imbalance for efficient distributed computation.
Key Insight
GraphX uses edge-centric partitioning where edges are assigned to partitions by hashing source vertex IDs. This creates communication patterns dominated by vertex messages across partition boundaries, making vertex-cut partitioning critical for performance.
Summary
Graph processing on Spark leverages BSP (Bulk Synchronous Parallel) computation. PageRank converges via power iteration, connected components use label propagation, and graph partitioning quality directly impacts communication overhead in distributed settings.
Key Concepts Table
| Concept | Description | Performance Impact |
|---|---|---|
| Graph Object | Immutable collection of vertices and edges | Partitioned across cluster |
| Vertex RDD | Distributed set of (vertexId, attribute) pairs | Keyed by vertex ID |
| Edge RDD | Distributed set of (srcId, dstId, attribute) triples | Partitioned by strategy |
| Pregel API | Iterative vertex-centric computation model | Convergence-based termination |
| aggregateMessages | Low-level message passing on edges | Maximum flexibility |
| PageRank | Vertex importance based on link structure | O(V Γ E Γ iterations) |
| Connected Components | Groups of reachable vertices | O(V + E) near-linear |
| Partition Strategy | Edge distribution across partitions | Determines shuffle cost |
| Join Operations | Combine graphs with RDDs/DataFrames | Enable hybrid analytics |
| Graph Loading | Load from edge list or adjacency format | Initial partitioning matters |
Code Examples
Creating and Analyzing Graphs
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("GraphProcessing") \
.getOrCreate()
# Create edge DataFrame (social network)
edges_data = [
(1, 2, "friend"),
(1, 3, "friend"),
(2, 3, "colleague"),
(3, 4, "friend"),
(4, 5, "colleague"),
(5, 6, "friend"),
(6, 1, "friend"),
(2, 5, "friend"),
(3, 6, "colleague"),
(4, 7, "friend"),
(7, 8, "friend"),
(8, 9, "colleague"),
(9, 10, "friend"),
(10, 7, "friend"),
]
edges_df = spark.createDataFrame(edges_data, ["src_id", "dst_id", "relationship"])
# Create vertex DataFrame
vertices_data = [
(1, "Alice", 28, "Engineering"),
(2, "Bob", 35, "Marketing"),
(3, "Charlie", 42, "Engineering"),
(4, "Diana", 31, "Sales"),
(5, "Eve", 38, "Marketing"),
(6, "Frank", 45, "Engineering"),
(7, "Grace", 29, "Sales"),
(8, "Hank", 33, "Engineering"),
(9, "Ivy", 27, "Marketing"),
(10, "Jack", 40, "Sales"),
]
vertices_df = spark.createDataFrame(vertices_data, ["id", "name", "age", "department"])
# Convert to GraphX graph (using GraphFrames alternative)
# Since GraphX is Scala-only, we use GraphFrames API
from graphframes import GraphFrame
graph = GraphFrame(vertices_df, edges_df)
# Basic graph statistics
print("Vertices:", graph.vertices.count())
print("Edges:", graph.edges.count())
# Find strongly connected components
scc = graph.connectedComponents()
scc.groupBy("component").count().orderBy("count", ascending=False).show()
# Triangle count
triangle_count = graph.triangleCount()
triangle_count.select("id", "name", "count").orderBy("count", ascending=False).show()
# PageRank
pagerank = graph.pageRank(resetProbability=0.15, maxIter=20)
pagerank.vertices.select("id", "name", "pagerank") \
.orderBy("pagerank", ascending=False).show()
# Shortest paths from vertex 1
shortest_paths = graph.shortestPaths(landmarks=["1", "5"])
shortest_paths.select("id", "name", "distances").show(truncate=False)
Custom Graph Algorithm with Pregel
# Implementing personalized PageRank using GraphX-style Pregel
# (Using RDD-based approach for true GraphX simulation)
from pyspark import RDD
from typing import Tuple, Dict, List
import math
# Define vertex and edge types
VertexData = Tuple[int, Dict] # (vertex_id, {attr1: val1, ...})
EdgeData = Tuple[int, int, Dict] # (src_id, dst_id, {attr1: val1, ...})
def create_graph(vertices: List[Tuple], edges: List[Tuple]) -> Tuple[RDD, RDD]:
"""Create vertex and edge RDDs."""
vertex_rdd = spark.sparkContext.parallelize(vertices)
edge_rdd = spark.sparkContext.parallelize(edges)
return vertex_rdd, edge_rdd
# Pregel implementation for personalized PageRank
def personalized_pagerank(
vertex_rdd: RDD,
edge_rdd: RDD,
source_vertex: int,
damping: float = 0.85,
max_iterations: int = 20,
tolerance: float = 0.001
) -> RDD:
"""
Compute personalized PageRank from source vertex.
Each vertex maintains:
- rank: current PageRank score
- residual: accumulated rank from neighbors
"""
# Initialize: source vertex gets rank 1.0, others get 0.0
def init_vertex(vid, attrs):
rank = 1.0 if vid == source_vertex else 0.0
return {**attrs, 'rank': rank, 'residual': 0.0}
vertices = vertex_rdd.map(lambda v: (v[0], init_vertex(v[0], v[1])))
# Create adjacency list (outgoing edges)
adjacency = edge_rdd.map(lambda e: (e[0], e[1])) \
.groupByKey() \
.mapValues(list)
# Pregel iterations
for iteration in range(max_iterations):
# Send messages: each vertex sends rank/out_degree to neighbors
messages = vertices.join(adjacency) \
.flatMap(lambda x: [
(dst, x[1][0]['rank'] / len(x[1][1]))
for dst in x[1][1]
])
# Aggregate messages at destination vertices
aggregated = messages.reduceByKey(lambda a, b: a + b)
# Update vertex ranks
new_vertices = vertices.leftOuterJoin(aggregated) \
.map(lambda x: (
x[0],
{
**x[1][0],
'residual': x[1][1] if x[1][1] is not None else 0.0
}
))
# Apply rank update formula
def update_rank(vid, attrs):
new_rank = (1 - damping) + damping * attrs['residual']
return {**attrs, 'rank': new_rank, 'residual': 0.0}
updated_vertices = new_vertices.map(lambda v: (v[0], update_rank(v[0], v[1])))
# Check convergence
rank_diff = updated_vertices.join(vertices) \
.map(lambda x: abs(x[1][0]['rank'] - x[1][1]['rank'])) \
.max()
vertices = updated_vertices
if rank_diff < tolerance:
print(f"Converged after {iteration + 1} iterations")
break
return vertices
# Run personalized PageRank from vertex 1
vertex_rdd, edge_rdd = create_graph(vertices_data, edges_data)
result = personalized_pagerank(vertex_rdd, edge_rdd, source_vertex=1)
# Display results
result.map(lambda x: (x[0], x[1]['rank'])) \
.sortBy(lambda x: -x[1]) \
.collect()
Graph Analytics Pipeline
# Complete graph analytics pipeline
def graph_analytics_pipeline(edges_df, vertices_df, source_vertex):
"""End-to-end graph analytics pipeline."""
graph = GraphFrame(vertices_df, edges_df)
# 1. Basic statistics
stats = {
'vertices': graph.vertices.count(),
'edges': graph.edges.count(),
'in_degree': graph.inDegrees.orderBy("inDegree", ascending=False).first(),
'out_degree': graph.outDegrees.orderBy("outDegree", ascending=False).first(),
}
# 2. Connected components
components = graph.connectedComponents()
num_components = components.select("component").distinct().count()
largest_component = components.groupBy("component") \
.count() \
.orderBy("count", ascending=False) \
.first()
# 3. PageRank
pagerank = graph.pageRank(resetProbability=0.15, maxIter=20)
top_pagerank = pagerank.vertices \
.select("id", "name", "pagerank") \
.orderBy("pagerank", ascending=False) \
.first()
# 4. Triangle count
triangles = graph.triangleCount()
total_triangles = triangles.agg(sum("count")).first()[0]
# 5. Label propagation (community detection)
communities = graph.labelPropagation(maxIter=5)
num_communities = communities.select("label").distinct().count()
return {
'basic_stats': stats,
'num_components': num_components,
'largest_component_size': largest_component['count'],
'top_pagerank_vertex': top_pagerank,
'total_triangles': total_triangles,
'num_communities': num_communities,
}
results = graph_analytics_pipeline(edges_df, vertices_df, source_vertex=1)
for key, value in results.items():
print(f"{key}: {value}")
Performance Metrics
| Algorithm | 1K Vertices | 100K Vertices | 10M Vertices | 1B Vertices |
|---|---|---|---|---|
| PageRank (20 iter) | < 1 second | 5-10 seconds | 2-5 minutes | 30-60 minutes |
| Connected Components | < 1 second | 3-8 seconds | 1-3 minutes | 15-30 minutes |
| Shortest Paths | < 1 second | 10-20 seconds | 5-10 minutes | 60-120 minutes |
| Triangle Count | < 1 second | 15-30 seconds | 10-20 minutes | 2-4 hours |
| Label Propagation | < 1 second | 2-5 seconds | 30-60 seconds | 10-20 minutes |
| Graph Loading | < 1 second | 1-3 seconds | 10-30 seconds | 5-15 minutes |
| Partitioning (2D) | < 1 second | 2-5 seconds | 20-40 seconds | 10-20 minutes |
| Memory per Partition | ~1 MB | ~100 MB | ~10 GB | ~1 TB |
| Shuffle Volume | ~1 MB | ~50 MB | ~5 GB | ~500 GB |
| Fault Recovery | < 1 second | 1-2 seconds | 5-10 seconds | 1-5 minutes |
Best Practices
- Use edge partitioning (EdgePartition2D) for graphs with high vertex degree to minimize cross-partition communication
- Cache vertex and edge RDDs when performing multiple graph operations to avoid recomputation
- Tune partition count to approximately 2-3x the number of cores in your cluster for optimal parallelism
- Use
persist(StorageLevel.MEMORY_AND_DISK)for iterative algorithms to handle memory pressure gracefully - Implement early stopping in iterative algorithms based on convergence tolerance to avoid unnecessary iterations
- Pre-compute adjacency lists when performing multiple traversals from the same graph structure
- Use canonical vertex ordering for undirected graphs to avoid double-counting edges
- Monitor shuffle spill during graph operationsβif spill is high, increase executor memory or reduce partition count
- Use GraphFrames (Scala API) when possible for better performance than Python-based graph processing
- Implement checkpointing for long-running iterative algorithms to enable fault recovery without full restart
- Partition by edge direction when algorithms require different traversal patterns for incoming vs outgoing edges
- Use
aggregateMessagesinstead ofsendToSrc/sendToDstfor better performance when messages flow in one direction
See also: Snowflake Time Travel (snowflake/02), Kafka CDC (kafka/04), Airflow DAGs (airflow/02)