πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Dataproc: Managed Spark, Hadoop & Data Processing

GCP Data EngineeringDataproc⭐ Premium

Advertisement

Google Cloud Dataproc Deep Dive

Master Google Cloud Dataproc including managed Spark, Hadoop, pre-emptible workers, autoscaling, and migration patterns.

20 min readAdvanced

Dataproc Architecture

Cloud Dataproc is a fully managed Spark and Hadoop service. It runs on Compute Engine and provides a fast, easy, and cost-efficient way to run Apache Spark, Apache Hadoop, and other data processing frameworks.

Architecture Overview

🐝 Dataproc Architecture for Data Engineering
Dataproc: Managed Spark/Hadoop on GCPCLUSTER TOPOLOGYMaster Node (1)YARN ResourceManagerHDFS NameNodeJupyter, Zeppelin, Spark HistoryWorker 1YARN NodeManagerHDFS DataNode⚑ PreemptibleWorker 2YARN NodeManagerHDFS DataNode⚑ PreemptibleWorker 3YARN NodeManagerHDFS DataNodeWorker NYARN NodeManagerHDFS DataNode⚑ PreemptibleEdge NodeGCS connectorExternal accessOPTIONAL COMPONENTSApache Spark3.x / 4.xApache HadoopHDFS, YARNApache HiveSQL on HadoopApache PigLatin scriptingJupyterLabNotebooksPrestoInteractive SQLIMAGE VERSIONS2.0Spark 3.x, Hadoop 3.xβœ“ Recommended1.5Spark 2.x, Hadoop 2.x (legacy)PREEMPTIBLE / SPOT VMsβœ“ Up to 91% cost savingsβœ“ Max 24h lifetime, auto-restartLifecycle: Create β†’ Configure β†’ Run Jobs β†’ Auto-delete (optional) | Pub/Sub trigger for auto-start
Interview Tip: Dataproc is ideal for migrating existing Spark/Hadoop workloads to GCP. Use preemptible VMs for workers (not master) to save up to 91%. Cluster auto-delete and auto-scaling help control costs. Use image versions to control Spark/Hadoop versions.

Cluster Creation and Configuration

Standard Cluster

# Create a Dataproc cluster with standard configuration
gcloud dataproc clusters create my-spark-cluster \
  --region=us-central1 \
  --zone=us-central1-a \
  --master-machine-type=n1-standard-4 \
  --master-boot-disk-size=100GB \
  --num-workers=4 \
  --worker-machine-type=n1-standard-4 \
  --worker-boot-disk-size=200GB \
  --image-version=2.1-debian12 \
  --initialization-actions=gs://my-scripts/init.sh \
  --properties=yarn:yarn.nodemanager.resource.memory-mb=12288,\
spark:spark.dynamicAllocation.enabled=true

# Check cluster status
gcloud dataproc clusters describe my-spark-cluster --region=us-central1

Cluster with Pre-emptible Workers

# Create cluster with pre-emptible (spot) VMs for cost savings
gcloud dataproc clusters create cost-optimized-cluster \
  --region=us-central1 \
  --zone=us-central1-a \
  --master-machine-type=n1-standard-4 \
  --num-workers=4 \
  --worker-machine-type=n1-standard-4 \
  --num-preemptible-workers=4 \
  --preemptible-worker-boot-disk-size=100GB \
  --image-version=2.1-debian12 \
  --max-idle=10m \
  --auto-delete \
  --properties=yarn:yarn.nodemanager.resource.memory-mb=12288
GCP Pricing Models for Data Engineering
πŸ’³
On-Demand
0%
Pay per use, no commitment
Dev/Test
πŸ“‹
Committed (1yr)
Up to 37%
1-year commitment
Steady production
πŸ“
Committed (3yr)
Up to 55%
3-year commitment
Long-term infra
⚑
Preemptible/Spot
Up to 91%
Short-lived VMs
Batch processing
πŸ’°
Sustained Use
Up to 30%
Auto discounts for long use
Always-on
πŸ”₯
Serverless
N/A
Pay per query/invocation
Event-driven

Spark on Dataproc

Submitting Spark Jobs

# Submit a Spark job to Dataproc
gcloud dataproc jobs submit spark \
  --cluster=my-spark-cluster \
  --region=us-central1 \
  --jars=gs://my-bucket/libs/my-app.jar \
  --class=com.example.SparkETL \
  --properties=spark.dynamicAllocation.enabled=true,\
spark.sql.shuffle.partitions=200 \
  -- gs://my-data-lake/input/ gs://my-data-lake/output/

# Submit a PySpark job
gcloud dataproc jobs submit pyspark \
  --cluster=my-spark-cluster \
  --region=us-central1 \
  gs://my-bucket/scripts/etl_job.py \
  -- --input gs://my-data-lake/input/ --output gs://my-data-lake/output/

Spark Configuration

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Create Spark session with optimized configuration
spark = SparkSession.builder \
    .appName("Data Engineering Pipeline") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "2") \
    .config("spark.dynamicAllocation.maxExecutors", "20") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .getOrCreate()

# Read from GCS
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("gs://my-data-lake/raw/sales/")

# Transform data
transformed_df = df \
    .withColumn("date", to_date(col("date"), "yyyy-MM-dd")) \
    .withColumn("revenue", col("quantity") * col("price")) \
    .filter(col("revenue") > 0) \
    .groupBy("date", "product_category") \
    .agg(
        sum("revenue").alias("total_revenue"),
        count("*").alias("transaction_count"),
        avg("revenue").alias("avg_revenue")
    )

# Write to BigQuery
transformed_df.write \
    .format("bigquery") \
    .option("table", "project.dataset.sales_summary") \
    .option("temporaryGcsBucket", "my-temp-bucket") \
    .mode("append") \
    .save()

spark.stop()

✨

Best Practice: Use Spark dynamic allocation with adaptive query execution enabled. This allows Spark to automatically adjust resources based on workload and optimize queries at runtime. Configure spark.sql.adaptive.enabled=true for most workloads.

Autoscaling

Dataproc provides autoscaling that automatically adjusts cluster size based on workload demands.

# Create cluster with autoscaling
gcloud dataproc clusters create autoscale-cluster \
  --region=us-central1 \
  --zone=us-central1-a \
  --master-machine-type=n1-standard-4 \
  --num-workers=2 \
  --worker-machine-type=n1-standard-4 \
  --image-version=2.1-debian12 \
  --enable-autoscaling \
  --min-idle=5m \
  --max-idle=10m \
  --autoscaling-policy=gs://my-policies/autoscale-policy.yaml

Autoscaling Policy

# autoscale-policy.yaml
autoscalingLimits:
  minWorkers: 2
  maxWorkers: 20
autoscalingRules:
  - cooldownPeriodSec: 120
    metric:
      yarn:YARNMemoryAvailablePercentage
      source: yarn
    scalingPolicy:
      scaleDownFactor: 0.5
      scaleUpFactor: 1.0
  - cooldownPeriodSec: 120
    metric:
      yarn:YARNContainersPending
      source: yarn
    scalingPolicy:
      scaleDownFactor: 1.0
      scaleUpFactor: 2.0

Initialization Actions

Initialization actions run on cluster startup to customize the environment.

# Create initialization action
cat > init.sh << 'EOF'
#!/bin/bash
set -e

# Install Python packages
pip install pandas==2.0.0 sqlalchemy psycopg2-binary

# Install Jupyter extensions
jupyter contrib nbextension install --user

# Configure Spark
cat > /etc/spark/conf/spark-defaults.conf << 'SPARK_CONF'
spark.dynamicAllocation.enabled=true
spark.sql.adaptive.enabled=true
spark.eventLog.enabled=true
spark.eventLog.dir=gs://my-bucket/spark-logs/
SPARK_CONF

# Download JAR dependencies
gsutil cp gs://my-bucket/libs/postgres-42.6.0.jar /usr/lib/spark/jars/
gsutil cp gs://my-bucket/libs/mysql-connector-j-8.0.33.jar /usr/lib/spark/jars/
EOF

# Upload init action
gsutil cp init.sh gs://my-scripts/init.sh

# Create cluster with init action
gcloud dataproc clusters create custom-cluster \
  --region=us-central1 \
  --initialization-actions=gs://my-scripts/init.sh \
  --initialization-action-timeout=10m
⚠️ Cost Alert

Always monitor your BigQuery costs using INFORMATION_SCHEMA. Set up budget alerts at 50%, 80%, and 100% thresholds.

Cost Optimization

# Cost analysis for Dataproc
cost_analysis = {
    "standard_4_workers": {
        "master": "n1-standard-4 ($0.19/hr)",
        "workers": "4x n1-standard-4 ($0.76/hr)",
        "total_hourly": "$0.95/hr",
        "daily_24h": "$22.80/day",
        "monthly_730h": "$693.50/month"
    },
    "preemptible_4_workers": {
        "master": "n1-standard-4 ($0.19/hr)",
        "workers": "4x preemptible ($0.152/hr)",
        "total_hourly": "$0.342/hr",
        "savings": "64% vs on-demand"
    },
    "autoscale_2_to_10": {
        "min": "2 workers ($0.566/hr)",
        "max": "10 workers ($1.326/hr)",
        "avg": "~4 workers ($0.83/hr)",
        "savings": "30-50% vs fixed cluster"
    }
}

# Cost optimization strategies
strategies = {
    "preemptible_workers": "Up to 91% discount for fault-tolerant batch",
    "autoscaling": "Scale down when idle, up during peak",
    "max_idle": "Auto-delete clusters after idle period",
    "startup_scaling": "Start with minimal workers, scale as needed",
    "hdfs_to_gcs": "Use GCS instead of HDFS for data persistence",
    "right_sizing": "Use n1-standard instead of high-mem for most jobs"
}

ℹ️

Cost Tip: Dataproc with pre-emptible workers can reduce costs by up to 91%. Use --max-idle=10m to auto-delete clusters after 10 minutes of idle time. For persistent clusters, use --enable-autoscaling with appropriate min/max workers to scale with demand.

πŸ’¬

Common Interview Questions

Q1: When would you use Dataproc vs. Dataflow?

Answer: Dataproc is for existing Spark/Hadoop workloads, complex ML pipelines, or when you need cluster-level control. Dataflow is for new development, unified batch/streaming, or when you want serverless operation. Dataproc provides more control over cluster configuration; Dataflow eliminates cluster management.

Q2: How do you optimize Spark jobs on Dataproc?

Answer: 1) Enable dynamic allocation to auto-scale executors, 2) Use adaptive query execution for runtime optimization, 3) Tune shuffle partitions based on data size, 4) Use broadcast joins for small tables, 5) Partition data by frequently filtered columns, 6) Use columnar formats (Parquet/ORC) for I/O efficiency.

Q3: What is the difference between preemptible and on-demand workers?

Answer: On-demand workers have guaranteed uptime and standard pricing. Preemptible workers (spot VMs) can be terminated anytime by Google but provide up to 91% discount. Use pre-emptible workers for fault-tolerant batch workloads. The master node should always be on-demand.

Q4: How do you handle cluster failures in Dataproc?

Answer: 1) Use HDFS replication (factor 3) for data durability, 2) Enable YARN node labeling for application placement, 3) Implement retry logic in Spark jobs, 4) Use persistent history server for job recovery, 5) Configure automatic restart policies for critical applications.

Q5: How does Dataproc integrate with GCS?

Answer: Dataproc provides a GCS connector that allows Hadoop-compatible applications to read/write GCS as if it were HDFS. The connector is pre-installed on Dataproc clusters. Configure fs.gs.impl and fs.AbstractFileSystem.gs.impl for Hadoop-compatible access. Use GCS for data persistence since HDFS data is lost when the cluster is deleted.

Advertisement