Spark Configuration
# Create cluster with optimized Spark configuration
gcloud dataproc clusters create spark-optimized \
--region=us-central1 \
--master-machine-type=n1-standard-8 \
--num-workers=4 \
--worker-machine-type=n1-standard-8 \
--num-preemptible-workers=4 \
--image-version=2.1-debian12 \
--properties=\
spark.dynamicAllocation.enabled=true,\
spark.sql.adaptive.enabled=true,\
spark.sql.adaptive.coalescePartitions.enabled=true,\
spark.sql.shuffle.partitions=200,\
spark.executor.instances=4,\
spark.executor.memory=8g,\
spark.executor.cores=4
Initialization Actions
#!/bin/bash
# install-custom-packages.sh
# Install Python packages
pip install pandas==2.0.0 pyspark==3.4.1
# Configure Spark
cat > /etc/spark/conf/spark-defaults.conf << 'EOF'
spark.dynamicAllocation.enabled=true
spark.sql.adaptive.enabled=true
spark.eventLog.enabled=true
spark.eventLog.dir=gs://my-bucket/spark-logs/
spark.history.fs.logDirectory=gs://my-bucket/spark-logs/
EOF
# Download JARs
gsutil cp gs://my-bucket/jars/* /usr/lib/spark/jars/
# Configure YARN
cat > /etc/hadoop/conf/yarn-site.xml << 'YARN'
<configuration>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>24576</value>
</property>
</configuration>
YARN
Spark Performance Tuning
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Optimized Spark Job") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.skewJoin.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.minExecutors", "2") \
.config("spark.dynamicAllocation.maxExecutors", "20") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()
# Read with partitioning
df = spark.read \
.parquet("gs://my-data-lake/sales/") \
.repartition(200) # Optimize shuffle
# Use broadcast joins for small tables
from pyspark.sql.functions import broadcast
result = df.join(broadcast(small_df), "key")
# Write optimized
df.write \
.mode("overwrite") \
.partitionBy("date") \
.parquet("gs://my-data-lake/output/")
β¨
Best Practice: Use pre-emptible workers for batch processing (up to 91% savings). Enable adaptive query execution for runtime optimization. Use dynamic allocation to scale with workload. Configure appropriate shuffle partitions based on data size.
Common Interview Questions
Q1: How do you optimize Spark shuffle?
Answer: 1) Increase shuffle partitions (default 200), 2) Use broadcast joins for small tables, 3) Enable adaptive query execution, 4) Use salting for skew, 5) Cache frequently accessed DataFrames.
Q2: What is the benefit of pre-emptible workers?
Answer: Pre-emptible workers provide up to 91% cost savings. They're ideal for fault-tolerant batch workloads. The master node should always be on-demand. Workers can restart if pre-empted.
Q3: How do you handle data skew in Spark?
Answer: 1) Use salting to distribute skewed keys, 2) Enable adaptive skew join, 3) Repartition by different key, 4) Use broadcast joins for small tables, 5) Custom partitioning.
Q4: What is the difference between persist() and cache()?
Answer: cache() stores DataFrame in memory only. persist() allows storing in memory, disk, or both. Use persist(StorageLevel.DISK_ONLY) for large DataFrames that don't fit in memory.
Q5: How do you monitor Spark job performance?
Answer: 1) Spark UI (port 4040), 2) YARN ResourceManager UI, 3) Cloud Monitoring metrics, 4) Spark event logs, 5) Ganglia/Prometheus for cluster metrics.