Spark Architecture Deep Dive
Difficulty: Senior Level | Companies: Databricks, Netflix, Uber, Apple, Airbnb
Understanding the Spark Runtime Architecture
Spark follows a master-slave architecture where the Driver program coordinates with Executors running on worker nodes. Understanding this architecture is critical for debugging performance issues and optimizing resource utilization.
The Role of the Driver
The driver is the process running the main() function of your application. It creates the SparkContext, plans execution through the DAG scheduler, and coordinates task scheduling.
from pyspark.sql import SparkSession
# The SparkSession is your entry point - it creates the SparkContext
# In cluster mode, this runs on the cluster's driver node
spark = SparkSession.builder \
.appName("ArchitectureDeepDive") \
.master("yarn") \
.config("spark.driver.memory", "8g") \
.config("spark.driver.cores", 4) \
.config("spark.driver.maxResultSize", "2g") \
.getOrCreate()
# Access the underlying SparkContext for lower-level operations
sc = spark.sparkContext
print(f"Application ID: {sc.applicationId}")
print(f"Default Parallelism: {sc.defaultParallelism}")
print(f"Master: {sc.master}")
Executor Configuration and Resource Allocation
Executors are JVM processes that execute tasks and store data. Proper configuration of executor resources directly impacts application performance.
spark = SparkSession.builder \
.appName("ExecutorConfig") \
.config("spark.executor.instances", 10) \
.config("spark.executor.cores", 4) \
.config("spark.executor.memory", "16g") \
.config("spark.executor.memoryOverhead", "4g") \
.config("spark.executor.pyspark.memory", "2g") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.minExecutors", 2) \
.config("spark.dynamicAllocation.maxExecutors", 50) \
.config("spark.dynamicAllocation.executorIdleTimeout", "60s") \
.getOrCreate()
βΉοΈ
Interview Insight: When asked about executor configuration, always mention that memoryOverhead is additional memory used by off-heap operations (like PySpark's Python process) and defaults to max(384MB, 0.1 * executorMemory).
DAG Scheduler and Stage Boundaries
The DAG Scheduler splits the job into stages based on shuffle boundaries. Understanding what triggers a new stage is essential for optimization.
Narrow vs Wide Transformations
from pyspark.sql import functions as F
df = spark.read.parquet("hdfs://data/sales")
# Narrow transformations - NO shuffle, can be pipelined in one stage
stage_1 = df \
.filter(F.col("amount") > 100) \
.withColumn("tax", F.col("amount") * 0.08) \
.select("order_id", "amount", "tax")
# Wide transformations - TRIGGER shuffle, create new stage
stage_2 = stage_1 \
.groupBy("region") \
.agg(
F.sum("amount").alias("total_amount"),
F.sum("tax").alias("total_tax"),
F.count("*").alias("order_count")
)
# Each shuffle boundary creates a new stage
stage_3 = stage_2 \
.withColumn("avg_amount", F.col("total_amount") / F.col("order_count"))
stage_3.explain(mode="extended")
Understanding the Physical Plan
# Always inspect the physical plan to understand execution strategy
df = spark.read.parquet("hdfs://data/transactions") \
.join(spark.read.parquet("hdfs://data/customers"), "customer_id") \
.filter(F.col("amount") > 500) \
.groupBy("product_category") \
.agg(F.sum("amount"))
# See the optimized plan after Catalyst
df.explain(mode="cost")
# Or for more detailed analysis
df.explain(mode="simple")
df.explain(mode="formatted")
Cluster Manager Integration
Spark supports multiple cluster managers: Standalone, YARN, Mesos, and Kubernetes. YARN is the most common in enterprise environments.
# YARN configuration for production
spark = SparkSession.builder \
.appName("YARNCluster") \
.master("yarn") \
.config("spark.submit.deployMode", "cluster") \
.config("spark.yarn.maxAppAttempts", 2) \
.config("spark.yarn.am.memory", "4g") \
.config("spark.yarn.am.cores", 2) \
.config("spark.yarn.queue", "production") \
.config("spark.yarn.historyServer.address", "history-server:18080") \
.config("spark.eventLog.enabled", "true") \
.config("spark.eventLog.dir", "hdfs://logs/spark-events") \
.getOrCreate()
β οΈ
Critical: In YARN cluster mode, the driver runs inside the ApplicationMaster. Ensure spark.yarn.am.memory is sufficient for your driver's needs, or allocate separate driver resources.
Task Scheduling and Locality
Spark optimizes data locality to minimize network transfer. The scheduler waits for better locality before launching tasks remotely.
# Control locality settings
spark = SparkSession.builder \
.appName("LocalityTuning") \
.config("spark.locality.wait", "3s") \
.config("spark.locality.wait.node", "3s") \
.config("spark.locality.wait.rack", "3s") \
.config("spark.scheduler.mode", "FAIR") \
.getOrCreate()
# Read data and check partition locations
df = spark.read.parquet("hdfs://data/events")
# Repartition by a key to improve locality for joins
from pyspark.sql import functions as F
df_repartitioned = df.repartition(200, "user_id")
# Now joins on user_id will have better locality
user_profiles = spark.read.parquet("hdfs://data/profiles") \
.repartition(200, "user_id")
joined = df_repartitioned.join(user_profiles, "user_id")
Shared Variables: Broadcast and Accumulators
# Broadcast variables - read-only, sent to all executors
lookup_table = {"US": "United States", "UK": "United Kingdom", "CA": "Canada"}
broadcast_lookup = sc.broadcast(lookup_table)
# Use in transformations
def map_country(code):
return broadcast_lookup.value.get(code, "Unknown")
from pyspark.sql import functions as F
udf_map_country = F.udf(map_country)
result = df.withColumn("country_name", udf_map_country(F.col("country_code")))
# Accumulators - write-only, used for aggregating metrics
error_count = sc.accumulator(0)
def process_record(record):
global error_count
try:
# processing logic
return True
except Exception:
error_count.add(1)
return False
rdd = sc.parallelize(range(10000))
rdd.foreach(process_record)
print(f"Errors encountered: {error_count.value}")
Serialization and Network Efficiency
# Kyro serialization reduces memory usage and network transfer
spark = SparkSession.builder \
.appName("SerializationConfig") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.kryoserializer.buffer.max", "512m") \
.config("spark.kryo.registrationRequired", "false") \
.getOrCreate()
# Register classes for better Kryo performance
from pyspark import SparkConf
conf = SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses([MyCustomClass])
Memory Layout and Tungsten
Spark 3.x uses Tungsten's unsafe memory management for binary processing.
# Enable off-heap memory for reduced GC pressure
spark = SparkSession.builder \
.appName("TungstenMemory") \
.config("spark.memory.offHeap.enabled", "true") \
.config("spark.memory.offHeap.size", "4g") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()
# Whole-stage code generation improves CPU efficiency
spark.conf.set("spark.sql.codegen.wholeStage", "true")
spark.conf.set("spark.sql.codegen.maxFields", "100")
βΉοΈ
Key Takeaway: Spark's architecture optimizes for data locality and minimal serialization. Understanding the DAG scheduler, stage boundaries, and memory management helps you architect efficient solutions for large-scale data processing.
Follow-Up Questions
- How does Spark determine task locality preferences, and what happens when local executors are busy?
- Explain the difference between Spark's physical plan and logical plan. When does each get created?
- What are the implications of running PySpark vs Scala Spark on executor memory and performance?
- How does dynamic allocation interact with shuffle data when executors are removed?
- Describe the lifecycle of an RDD from creation to action execution.