πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Spark UI & Monitoring

Apache SparkOperations⭐ Premium

Advertisement

Spark UI & Monitoring

Difficulty: Senior Level | Companies: Databricks, Netflix, Uber, Apple, Airbnb

Navigating the Spark UI

The Spark UI provides detailed execution information across Jobs, Stages, Tasks, Storage, and Environment tabs.

Key UI Sections

from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder \
    .appName("MonitoringExample") \
    .config("spark.ui.enabled", "true") \
    .config("spark.eventLog.enabled", "true") \
    .config("spark.eventLog.dir", "hdfs://logs/spark-events") \
    .config("spark.ui.port", "4040") \
    .getOrCreate()

# Access Spark UI at http://driver-host:4040

# Job tab: Shows all jobs and their status
# - Job ID, Description, Stages, Tasks, Duration
# - Click to see stage details

# Stage tab: Shows stage-level metrics
# - Input/Output sizes
# - Shuffle Read/Write
# - Task duration distribution
# - GC time

# Task tab: Shows per-task metrics
# - Task duration (identify stragglers)
# - Shuffle Read/Write per task
# - GC time per task
# - Executor uptime

# Storage tab: Shows cached DataFrames
# - Cached RDDs and DataFrames
# - Memory usage per partition
# - Storage level

# Environment tab: Shows configuration
# - All Spark configs
# - System properties
# - Classpath entries

ℹ️

Interview Insight: The Spark UI is your first debugging tool. Learn to quickly identify slow tasks, high shuffle, and memory issues from the UI.

Programmatic Monitoring

# Access Spark metrics programmatically
def get_spark_metrics(spark_context):
    """Extract key metrics from Spark"""
    # Get executor information
    executor_info = spark_context._jsc.sc().getExecutorMemoryStatus()
    
    # Get accumulator values
    accumulators = spark_context._jsc.sc().getAccumulators()
    
    return {
        "executors": executor_info,
        "accumulators": accumulators
    }

# Monitor job execution
df = spark.read.parquet("hdfs://data/large")

# Track execution metrics
start_time = spark.sparkContext._jsc.sc().startTime()
result = df.groupBy("key").agg(F.sum("value"))
result.count()

# Access job metrics via SparkListener
class MetricsListener:
    def __init__(self):
        self.job_durations = []
        self.stage_durations = []
    
    def onJobEnd(self, jobEnd):
        self.job_durations.append({
            "job_id": jobEnd.jobResult().jobId(),
            "duration": jobEnd.time()
        })
    
    def onStageCompleted(self, stageCompleted):
        self.stage_durations.append({
            "stage_id": stageCompleted.stageInfo().stageId(),
            "duration": stageCompleted.stageInfo().submissionTime()
        })

# Register listener
listener = MetricsListener()
spark.sparkContext._jsc.sc().addSparkListener(listener)

Event Logging and History Server

# Enable event logging
spark = SparkSession.builder \
    .appName("EventLogging") \
    .config("spark.eventLog.enabled", "true") \
    .config("spark.eventLog.dir", "hdfs://logs/spark-events") \
    .config("spark.eventLog.logStageExecutorMetrics", "true") \
    .getOrCreate()

# Events are logged to HDFS/S3
# Access via Spark History Server (port 18080)

# Query event logs programmatically
def analyze_event_log(log_path):
    """Parse Spark event log for analysis"""
    # Read JSON event log
    events_df = spark.read.json(log_path)
    
    # Analyze job durations
    job_events = events_df.filter(F.col("Event") == "SparkListenerJobEnd")
    job_events.select(
        "Job ID",
        "Job Result",
        "Timestamp"
    ).show()
    
    # Analyze task metrics
    task_events = events_df.filter(F.col("Event") == "SparkListenerTaskEnd")
    task_metrics = task_events.select(
        "Task Info.Task ID",
        "Task Metrics.Executor Run Time",
        "Task Metrics.Shuffle Read Metrics.Total Bytes Read",
        "Task Metrics.Shuffle Write Metrics.Total Bytes Written"
    )
    
    return task_metrics

⚠️

Warning: Event logs can grow large. Configure spark.eventLog.maxLogFileSize and retention policies to manage storage.

Custom Metrics with Dropwizard

# Spark integrates with Dropwizard metrics
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DropwizardMetrics") \
    .config("spark.metrics.conf", "metrics.properties") \
    .config("spark.metrics.conf.*.sink.class", "org.apache.spark.metrics.sink.JmxSink") \
    .getOrCreate()

# Custom metrics in your code
from com.codahale.metrics import Counter, Timer, Gauge

# Access metrics registry
metrics = spark.sparkContext._jsc.sc().getMetricsSystem()

# Create custom counters
def process_batch(batch_df, batch_id):
    """Process streaming batch with metrics"""
    record_count = Counter("records.processed")
    processing_time = Timer("processing.time")
    
    record_count.inc(batch_df.count())
    
    with processing_time.time():
        result = batch_df.filter(F.col("status") == "active")
        result.write.mode("append").parquet("hdfs://output/processed")

Performance Profiling

# Profile Spark applications
import cProfile
import pstats
from io import StringIO

def profile_spark_job():
    """Profile a Spark job"""
    profiler = cProfile.Profile()
    profiler.enable()
    
    # Your Spark job
    df = spark.read.parquet("hdfs://data/events")
    result = df \
        .filter(F.col("amount") > 100) \
        .groupBy("category") \
        .agg(F.sum("amount"))
    result.count()
    
    profiler.disable()
    
    # Print profiling results
    stream = StringIO()
    stats = pstats.Stats(profiler, stream=stream)
    stats.sort_stats('cumulative')
    stats.print_stats(20)
    print(stream.getvalue())

# Memory profiling
def profile_memory():
    """Monitor memory usage during execution"""
    import psutil
    import os
    
    process = psutil.Process(os.getpid())
    
    def get_memory():
        return process.memory_info().rss / 1024 / 1024
    
    print(f"Before: {get_memory():.1f} MB")
    
    df = spark.read.parquet("hdfs://data/large")
    df.cache()
    df.count()
    
    print(f"After cache: {get_memory():.1f} MB")
    
    result = df.groupBy("key").agg(F.sum("value"))
    result.count()
    
    print(f"After aggregation: {get_memory():.1f} MB")

Common Performance Issues and Solutions

# Issue 1: Skewed tasks
# Symptom: Some tasks take much longer than others
# Solution: Check task duration distribution in Stage tab

# Issue 2: High shuffle
# Symptom: Large shuffle read/write
# Solution: Optimize partitioning, use broadcast joins

# Issue 3: OOM errors
# Symptom: Executor OOM, GC overhead
# Solution: Increase memory, reduce data per partition

# Issue 4: Data skew
# Symptom: Uneven partition sizes
# Solution: Repartition, use salting techniques

# Issue 5: Small files
# Symptom: Many small partitions
# Solution: Coalesce, optimize file sizes

# Example: Diagnosing a slow job
def diagnose_slow_job(spark, df):
    """Common diagnostic checks"""
    
    # Check partition count
    print(f"Partitions: {df.rdd.getNumPartitions()}")
    
    # Check data distribution
    df.withColumn("partition_id", F.spark_partition_id()) \
      .groupBy("partition_id") \
      .agg(F.count("*").alias("count")) \
      .describe() \
      .show()
    
    # Check for skew
    stats = df.withColumn("partition_id", F.spark_partition_id()) \
      .groupBy("partition_id") \
      .agg(F.count("*").alias("count"))
    
    max_count = stats.agg(F.max("count")).collect()[0][0]
    avg_count = stats.agg(F.avg("count")).collect()[0][0]
    print(f"Max/Avg ratio: {max_count/avg_count:.2f}")

ℹ️

Key Takeaway: Master the Spark UI for quick diagnostics. Use programmatic monitoring for automated alerts. Event logs provide historical analysis. Custom metrics give application-specific insights.

Follow-Up Questions

  • How would you set up monitoring for a production Spark cluster?
  • Explain how to use the Spark History Server for post-mortem analysis.
  • How do you monitor Spark on Kubernetes vs YARN?
  • Describe strategies for alerting on Spark job failures.
  • How would you track data lineage across Spark applications?

Advertisement