πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Topic: Monitoring, Metrics, and Debugging

PySpark AdvancedMonitoring and Debugging⭐ Premium

Advertisement

PySpark Advanced Interview Series

Module 18: Monitoring & Debugging β€” Observability at Scale

AmazonUberDifficulty: Hard

Interview Question

"At Amazon, we monitor thousands of Spark jobs daily. Walk us through the Spark UI, key metrics to watch, and how you would debug a job that's running slowly due to data skew." β€” Amazon Senior Data Engineer Interview

"At Uber, we need real-time visibility into Spark job health. Explain how to collect custom metrics, set up alerting for performance degradation, and use the Event Log to diagnose historical issues." β€” Uber Data Engineer Interview


Spark UI Overview

The Spark UI is available at http://driver-node:4040 while the application is running.

Main Tabs

TabPurposeKey Metrics
JobsJob progress and statusDuration, stages, tasks
StagesStage-level detailsShuffle read/write, input/output
StorageCached DataFramesSize in memory/disk, partitions
ExecutorsExecutor resourcesMemory, cores, GC time
SQLQuery plans and executionPhysical plan, duration
StreamingStreaming query metricsBatch duration, records processed

Job Monitoring

from pyspark.sql import SparkSession
import time

spark = SparkSession.builder \
    .appName("MonitoringInterview") \
    .config("spark.eventLog.enabled", "true") \
    .config("spark.eventLog.dir", "s3a://spark-logs/event-logs/") \
    .config("spark.ui.port", "4040") \
    .getOrCreate()

# Track job execution
start = time.time()

# Your Spark job
df = spark.read.parquet("s3a://bucket/data/")
result = df.filter(col("status") == "active") \
    .groupBy("category") \
    .agg(sum("revenue").alias("total_revenue"))

# Force execution and track time
result.count()
job_duration = time.time() - start

print(f"Job completed in {job_duration:.2f}s")

# Access job metrics through Spark UI
# Navigate to http://localhost:4040/jobs/

Key Metrics to Monitor

1. Task Metrics

# Through Spark UI - Stages tab
# - Task Duration: Time per task
# - Shuffle Read/Write: Data movement
# - Input/Output: Data processed
# - GC Time: Garbage collection overhead

# Look for:
# - Stragglers (tasks much slower than others)
# - High shuffle (indicates need for optimization)
# - High GC time (indicates memory pressure)

2. Executor Metrics

# Through Spark UI - Executors tab
# - Memory Usage: Heap and off-heap
# - Disk Usage: Spilled data
# - Active Tasks: Concurrent tasks
# - GC Time: Garbage collection overhead
# - Total Tasks: Tasks completed

# Look for:
# - Memory near capacity (risk of OOM)
# - High disk usage (data spilling)
# - Uneven task distribution (skew)

3. SQL Metrics

# Through Spark UI - SQL tab
# - Query Plan: Physical execution plan
# - Duration: Time per operation
# - Rows Processed: Data volume
# - Spill: Memory spilling to disk

# Look for:
# - Missing predicate pushdown
# - Unexpected shuffles
# - Large data movement

Custom Metrics Collection

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark import SparkContext

spark = SparkSession.builder \
    .appName("CustomMetrics") \
    .getOrCreate()

# Method 1: Using Spark's built-in metrics
# Access through Spark UI

# Method 2: Custom logging
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("SparkMetrics")

# Track custom metrics
start = time.time()
df = spark.read.parquet("s3a://bucket/data/")
read_time = time.time() - start
logger.info(f"Data read time: {read_time:.2f}s")

start = time.time()
filtered_df = df.filter(col("status") == "active")
filter_count = filtered_df.count()
filter_time = time.time() - start
logger.info(f"Filter time: {filter_time:.2f}s, Rows: {filter_count}")

# Method 3: Write metrics to external system
metrics = {
    "job_name": "daily_aggregation",
    "read_time": read_time,
    "filter_time": filter_time,
    "filter_count": filter_count,
    "total_time": time.time() - start
}

# Send to monitoring system (e.g., CloudWatch, Datadog)
import json
print(json.dumps(metrics))

Real-World Scenario: Uber Debugging Pipeline

Problem Statement

Debug a slow Spark job that processes ride data. The job takes 3 hours instead of the expected 30 minutes.

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("UberDebugging") \
    .config("spark.eventLog.enabled", "true") \
    .config("spark.eventLog.dir", "s3a://spark-logs/") \
    .getOrCreate()

# === STEP 1: Identify the slow stage ===
# Check Spark UI β†’ Jobs tab β†’ Find slow job β†’ Check stages

# === STEP 2: Analyze task distribution ===
# Check Spark UI β†’ Stages tab β†’ Find slow stage β†’ Check task metrics

# Example: Detect stragglers
rides_df = spark.read.parquet("s3a://uber-data/rides/")

# Check partition distribution
partition_sizes = rides_df.rdd \
    .mapPartitionsWithIndex(lambda idx, it: [(idx, sum(1 for _ in it))]) \
    .collect()

sizes = [size for _, size in partition_sizes]
print(f"Partition stats:")
print(f"  Min: {min(sizes)}")
print(f"  Max: {max(sizes)}")
print(f"  Mean: {sum(sizes) / len(sizes):.0f}")
print(f"  Skew ratio: {max(sizes) / (sum(sizes) / len(sizes)):.2f}")

# === STEP 3: Check shuffle metrics ===
# High shuffle = data movement = slow

# Example: Identify expensive operations
result = rides_df \
    .join(drivers_df, "driver_id") \  # Shuffle join
    .groupBy("city") \  # Another shuffle
    .agg(sum("fare_amount").alias("total_fare"))

# Check explain plan
result.explain(True)
# Look for Exchange nodes (shuffles)

# === STEP 4: Check for data skew ===
# Check key distribution
key_distribution = rides_df \
    .groupBy("driver_id") \
    .count() \
    .orderBy(col("count").desc())

key_distribution.show(20)
# If top keys have 100x more data = skew

# === STEP 5: Optimize ===
# Enable AQE for automatic optimization
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

# Broadcast small tables
from pyspark.sql.functions import broadcast
optimized_result = rides_df \
    .join(broadcast(drivers_df), "driver_id") \
    .repartition(400, "city") \
    .groupBy("city") \
    .agg(sum("fare_amount").alias("total_fare"))

# Cache intermediate results
rides_df.cache()

spark.stop()

Spark Event Log Analysis

# Event Log contains detailed metrics for every job, stage, task
# Location: s3a://spark-logs/event-logs/

# Analyze Event Log using Spark's History Server
# Start: $SPARK_HOME/sbin/start-history-server.sh
# Access: http://history-server:18080/

# Or analyze programmatically
event_log_path = "s3a://spark-logs/event-logs/app-20240101-000000"

# Read event log
events = spark.read.json(event_log_path)

# Find slow tasks
slow_tasks = events \
    .filter(col("Event") == "SparkListenerTaskEnd") \
    .select("TaskInfo.TaskID", "TaskInfo.Duration", "TaskInfo.Host") \
    .orderBy(col("Duration").desc())

slow_tasks.show(20)

# Find shuffle data
shuffle_data = events \
    .filter(col("Event") == "SparkListenerStageCompleted") \
    .select("StageInfo.StageID", "StageInfo.TaskMetrics.ShuffleWriteMetrics.BytesWritten")

Debugging Common Issues

1. OutOfMemoryError

# Symptoms:
# - java.lang.OutOfMemoryError: Java heap space
# - java.lang.OutOfMemoryError: GC overhead limit exceeded
# - Container killed by YARN

# Debug steps:
# 1. Check Spark UI β†’ Executors β†’ Memory Usage
# 2. Look for high memory usage or data spilling
# 3. Check if caching is consuming too much memory

# Solutions:
# Increase executor memory
spark.conf.set("spark.executor.memory", "32g")

# Reduce partition size
spark.conf.set("spark.sql.shuffle.partitions", "4000")

# Use serialized caching
df.persist(StorageLevel.MEMORY_ONLY_SER)

2. Data Skew

# Symptoms:
# - Some tasks take much longer than others
# - One executor is busier than others
# - Job seems stuck on certain stages

# Debug steps:
# 1. Check Spark UI β†’ Stages β†’ Task Duration Distribution
# 2. Look for tasks with much higher duration
# 3. Check key distribution

# Solutions:
# Salt skewed keys
salted_df = df.withColumn(
    "salt",
    (rand() * 100).cast("int")
).withColumn(
    "salted_key",
    concat(col("skewed_key"), lit("_"), col("salt"))
)

# Enable AQE skew handling
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

3. Slow Shuffle

# Symptoms:
# - High shuffle read/write in Spark UI
# - Long duration for exchange stages

# Debug steps:
# 1. Check Spark UI β†’ Stages β†’ Shuffle Read/Write
# 2. Look for large shuffle data
# 3. Check partition count

# Solutions:
# Increase shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", "4000")

# Use broadcast joins for small tables
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")

# Cache before joins
large_df.cache()

Alerting and Production Monitoring

# Set up monitoring for production jobs

# 1. Job duration alerting
import time

start = time.time()
# Run job
job_duration = time.time() - start

if job_duration > 3600:  # 1 hour threshold
    send_alert(f"Job {app_name} took {job_duration:.0f}s")

# 2. Data quality alerting
source_count = source_df.count()
output_count = output_df.count()

if output_count < source_count * 0.9:
    send_alert(f"Data loss: {source_count} β†’ {output_count}")

# 3. Resource utilization alerting
# Monitor through Spark UI or external tools

Best Practices

πŸ’‘Monitoring Checklist

  • Enable event logging for all production jobs
  • Monitor Spark UI during development
  • Set up alerting for job duration and failures
  • Track custom metrics for business KPIs
  • Use structured logging for machine parsing
  • Monitor executor memory and GC time
  • Check for data skew in key distributions
  • Review shuffle metrics for optimization opportunities

Summary

Monitoring and debugging are critical skills for production Spark workloads. The Spark UI provides deep visibility into job execution, while event logs enable historical analysis. At Amazon and Uber, proactive monitoring prevents failures and identifies optimization opportunities before they impact users.

Advertisement