Cloud Dataproc for Spark/Hadoop Workloads
Dataproc Architecture
Cloud Dataproc is a managed Spark and Hadoop service for running Apache Spark, Apache Hadoop, and other data processing frameworks on GCP.
Core Components
Master Node:
- Runs YARN ResourceManager
- HDFS NameNode
- Spark History Server
- Jupyter notebook server (optional)
Worker Nodes:
- Run YARN NodeManager
- HDFS DataNode
- Spark Executor containers
- Handle data processing tasks
External Services:
- Hive Metastore
- HBase (optional)
- Presto (optional)
- Kafka (optional)
Cluster Creation
Basic Cluster Creation
# Create a basic cluster
gcloud dataproc clusters create my-cluster \
--region=us-central1 \
--zone=us-central1-a \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--num-workers=3 \
--image-version=2.1-debian
# Create a cluster with autoscaling
gcloud dataproc clusters create my-autoscaling-cluster \
--region=us-central1 \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--num-workers=2 \
--max-num-workers=10 \
--autoscaling-policy=my-autoscaling-policy
Advanced Cluster Configuration
# Create cluster with initialization actions
gcloud dataproc clusters create my-advanced-cluster \
--region=us-central1 \
--master-machine-type=n1-standard-8 \
--worker-machine-type=n1-standard-8 \
--num-workers=5 \
--image-version=2.1-debian \
--initialization-actions=gs://my-bucket/init-script.sh \
--properties=spark:spark.dynamicAllocation.enabled=true \
--properties=yarn:yarn.nodemanager.resource.memory-mb=12288 \
--properties=spark:spark.executor.memory=8g \
--properties=spark:spark.executor.cores=4
Python Client Library
from google.cloud import dataproc_v1 as dataproc
import time
def create_cluster(project_id, region, cluster_name):
"""Create a Dataproc cluster."""
client = dataproc.ClusterControllerClient(
client_info=dataproc.gapic.transports.cluster_controller_transports.ClusterControllerGrpcTransport
)
cluster = {
'project_id': project_id,
'cluster_name': cluster_name,
'config': {
'master_config': {
'num_instances': 1,
'machine_type_uri': 'n1-standard-4',
'disk_config': {
'boot_disk_type': 'pd-ssd',
'boot_disk_size_gb': 100
}
},
'worker_config': {
'num_instances': 3,
'machine_type_uri': 'n1-standard-4',
'disk_config': {
'boot_disk_type': 'pd-standard',
'boot_disk_size_gb': 100
}
},
'software_config': {
'image_version': '2.1-debian',
'properties': {
'spark:spark.dynamicAllocation.enabled': 'true',
'spark:spark.shuffle.service.enabled': 'true'
}
},
'initialization_actions': [
{
'executable_file': 'gs://my-bucket/init-script.sh'
}
]
}
}
operation = client.cluster(
request={'project_id': project_id, 'region': region, 'cluster': cluster}
)
return operation.result()
# Create cluster
create_cluster('my-project', 'us-central1', 'my-cluster')
Image Versions
Dataproc uses image versions that bundle specific versions of Spark, Hadoop, and other components.
Supported Image Versions
| Image Version | Spark | Hadoop | Hive | Python |
|---|---|---|---|---|
| 2.1-debian | 3.3.2 | 3.3.6 | 3.1.3 | 3.10 |
| 2.0-debian | 3.2.3 | 3.3.2 | 3.1.2 | 3.9 |
| 1.5-debian | 2.4.8 | 3.2.3 | 2.3.9 | 3.7 |
| 1.4-debian | 2.3.4 | 2.10.1 | 2.3.7 | 2.7 |
Using Image Versions
# Specify image version
gcloud dataproc clusters create my-cluster \
--image-version=2.1-debian \
--region=us-central1
# Use custom image
gcloud dataproc clusters create my-cluster \
--image-uri=projects/my-project/global/images/my-custom-image \
--region=us-central1
Preemptible VMs
Preemptible VMs are short-lived, lower-cost instances that can be preempted by GCP for system needs.
Configuring Preemptible Workers
# Create cluster with preemptible workers
gcloud dataproc clusters create my-cluster \
--num-workers=10 \
--num-preemptible-workers=5 \
--worker-machine-type=n1-standard-4 \
--preemptible-worker-boot-disk-size=100 \
--region=us-central1
Preemptible VM Configuration
# Configure preemptible workers
cluster = {
'config': {
'worker_config': {
'num_instances': 10,
'machine_type_uri': 'n1-standard-4',
'is_preemptible': True,
'disk_config': {
'boot_disk_size_gb': 100,
'num_local_ssds': 0
}
},
'autoscaling_config': {
'policy_uri': 'projects/my-project/regions/us-central1/autoscalingPolicies/my-policy'
}
}
}
Always monitor your BigQuery costs using INFORMATION_SCHEMA. Set up budget alerts at 50%, 80%, and 100% thresholds.
Cost Optimization
# Monitor preemptible VM usage
from google.cloud import monitoring_v3
import time
def get_preemptible_usage(project_id, cluster_name):
"""Get preemptible VM usage metrics."""
client = monitoring_v3.MetricServiceClient()
project_name = f"projects/{project_id}"
interval = monitoring_v3.TimeInterval()
interval.end_time = time.time()
interval.start_time = time.time() - 3600 # Last hour
results = client.list_time_series(
request={
'name': project_name,
'filter': f'resource.label.cluster_name = "{cluster_name}" AND metric.type = "compute.googleapis.com/instance/preempted"',
'interval': interval,
'view': monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL
}
)
for result in results:
print(f'Preempted instances: {result.points[0].value.int64_value}')
Initialization Actions
Initialization actions are scripts that run on cluster nodes during cluster setup.
Creating Initialization Actions
#!/bin/bash
# init-script.sh - Install additional packages on cluster nodes
# Update package list
sudo apt-get update -y
# Install Python packages
pip install pandas numpy scikit-learn
# Install custom Spark packages
/opt/spark/bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.3.2
# Configure Spark
echo 'spark.executor.memory=8g' >> /etc/spark/conf/spark-defaults.conf
echo 'spark.executor.cores=4' >> /etc/spark/conf/spark-defaults.conf
echo 'spark.dynamicAllocation.enabled=true' >> /etc/spark/conf/spark-defaults.conf
# Mount additional storage
sudo mount -o discard,defaults /dev/sdb /mnt/scratch
Using Initialization Actions
# Upload initialization action to GCS
gsutil cp init-script.sh gs://my-bucket/init-script.sh
# Use initialization action when creating cluster
gcloud dataproc clusters create my-cluster \
--initialization-actions=gs://my-bucket/init-script.sh \
--region=us-central1
# Multiple initialization actions
gcloud dataproc clusters create my-cluster \
--initialization-actions=gs://my-bucket/script1.sh,gs://my-bucket/script2.sh \
--region=us-central1
Initialization Action Best Practices
# Best practices for initialization actions
best_practices = {
'idempotency': 'Scripts should be idempotent and handle reruns',
'error_handling': 'Implement proper error handling and logging',
'timeout': 'Keep initialization actions under 10 minutes',
'testing': 'Test initialization actions before production use',
'logging': 'Use consistent logging for debugging',
'dependencies': 'Check for dependencies before installing'
}
# Example initialization action with error handling
script = '''#!/bin/bash
set -e # Exit on error
echo "Starting initialization..."
# Check if package is already installed
if ! dpkg -l | grep -q "package-name"; then
sudo apt-get update
sudo apt-get install -y package-name
fi
# Verify installation
if command -v package-name &> /dev/null; then
echo "Package installed successfully"
else
echo "Failed to install package"
exit 1
fi
'''
Spark on Dataproc
Running Spark Jobs
# Submit Spark job
gcloud dataproc jobs submit spark \
--cluster=my-cluster \
--region=us-central1 \
--class=org.apache.spark.examples.SparkPi \
--jars=gs://spark-samples/spark-pi.jar
# Submit PySpark job
gcloud dataproc jobs submit pyspark \
--cluster=my-cluster \
--region=us-central1 \
gs://my-bucket/my-spark-job.py
# Submit with dependencies
gcloud dataproc jobs submit pyspark \
--cluster=my-cluster \
--region=us-central1 \
--py-files=gs://my-bucket/dependencies.zip \
--packages=org.apache.spark:spark-avro_2.12:3.3.2 \
gs://my-bucket/my-job.py
Spark Configuration
# Spark job configuration
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("My Dataproc Job") \
.config("spark.executor.memory", "8g") \
.config("spark.executor.cores", "4") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.shuffle.service.enabled", "true") \
.config("spark.sql.shuffle.partitions", "200") \
.config("spark.default.parallelism", "200") \
.getOrCreate()
# Read data from GCS
df = spark.read.parquet("gs://my-bucket/data/*.parquet")
# Process data
result = df.groupBy("category").agg({"amount": "sum"})
# Write to BigQuery
result.write \
.format("bigquery") \
.option("table", "my-project:analytics.summary") \
.option("writeDisposition", "WRITE_APPEND") \
.save()
Spark SQL
-- Spark SQL on Dataproc
CREATE TEMPORARY VIEW sales
USING parquet
OPTIONS (
path "gs://my-bucket/sales/*.parquet"
);
-- Analyze data
SELECT
category,
SUM(amount) as total_sales,
COUNT(*) as transaction_count
FROM sales
WHERE sale_date >= '2024-01-01'
GROUP BY category
ORDER BY total_sales DESC;
Auto-scaling
Auto-scaling dynamically adjusts the number of worker nodes based on workload.
Creating Autoscaling Policies
# Create autoscaling policy
gcloud dataproc autoscaling-policies import my-policy \
--source=policy.yaml \
--region=us-central1
Autoscaling Policy Configuration
# policy.yaml
autoscalingPolicy:
basicAlgorithm:
yarnConfig:
gracefulDecommissionTimeout: "3600s"
scaleUpFactor: 1.0
scaleDownFactor: 1.0
cooldownPeriod: "300s"
workerConfig:
minInstances: 2
maxInstances: 10
secondaryWorkerConfig:
minInstances: 0
maxInstances: 5
Auto-scaling Monitoring
# Monitor autoscaling
from google.cloud import dataproc_v1 as dataproc
def get_cluster_status(project_id, region, cluster_name):
"""Get cluster status including autoscaling."""
client = dataproc.ClusterControllerClient()
cluster = client.get_cluster(
request={
'project_id': project_id,
'region': region,
'cluster_name': cluster_name
}
)
print(f'Cluster status: {cluster.status.state}')
print(f'Master nodes: {cluster.config.master_config.num_instances}')
print(f'Worker nodes: {cluster.config.worker_config.num_instances}')
print(f'Secondary workers: {cluster.config.secondary_worker_config.num_instances}')
if cluster.config.autoscaling_config:
print(f'Autoscaling policy: {cluster.config.autoscaling_config.policy_uri}')
GCS Integration
Dataproc integrates seamlessly with Google Cloud Storage for data storage.
Using GCS as HDFS
# Read from GCS using Spark
df = spark.read.parquet("gs://my-bucket/data/*.parquet")
# Write to GCS
df.write.parquet("gs://my-bucket/output/")
# Use GCS as primary storage
spark.conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
spark.conf.set("fs.gs.auth.service.account.json.keyfile", "/path/to/key.json")
GCS Connector Configuration
# Configure GCS connector
gcloud dataproc clusters create my-cluster \
--properties=google.cloud.dataproc.spark.dataproc.gcs.connector.enabled=true \
--properties=fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
Monitoring and Debugging
Cluster Monitoring
# Monitor cluster health
from google.cloud import monitoring_v3
import time
def monitor_cluster(project_id, cluster_name):
"""Monitor Dataproc cluster metrics."""
client = monitoring_v3.MetricServiceClient()
project_name = f"projects/{project_id}"
# Monitor YARN metrics
interval = monitoring_v3.TimeInterval()
interval.end_time = time.time()
interval.start_time = time.time() - 3600
results = client.list_time_series(
request={
'name': project_name,
'filter': f'resource.label.cluster_name = "{cluster_name}" AND metric.type = "compute.googleapis.com/instance/cpu/utilization"',
'interval': interval,
'view': monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL
}
)
for result in results:
print(f'CPU utilization: {result.points[0].value.double_value}')
Debugging Common Issues
# Check cluster logs
gcloud dataproc clusters describe my-cluster --region=us-central1
# Access Spark History Server
gcloud compute ssh my-cluster-m --zone=us-central1-a -- -L 18080:localhost:18080
# Check YARN logs
gcloud compute ssh my-cluster-m --zone=us-central1-a -- -L 8088:localhost:8088
# View HDFS status
gcloud compute ssh my-cluster-m --zone=us-central1-a -- -L 9870:localhost:9870
Best Practices
- Use appropriate image versions - Match versions to your workload requirements
- Implement auto-scaling - Optimize costs with dynamic scaling
- Use preemptible VMs - Reduce costs for fault-tolerant workloads
- Leverage GCS integration - Use GCS as primary storage for flexibility
- Monitor cluster health - Track metrics and set up alerts
- Optimize Spark configuration - Tune memory, cores, and parallelism
- Use initialization actions - Customize clusters for your specific needs