20. Monitoring & Metrics in PySpark
DfSpark UI
The Spark UI is a web interface (port 4040 during execution, port 4041 after termination) that provides real-time visibility into job progress, stage details, task metrics, storage usage, and SQL query plans.
DfMetrics System
The metrics system collects and exports Spark internal metrics (CPU, memory, shuffle I/O, GC time) to external monitoring systems like Graphite, Prometheus, or JMX via pluggable sinks.
Shuffle Read/Write Formula
Here,
- =Total shuffle data transferred
- =Number of shuffle partitions
- =Average size per partition
- =Compression ratio (typically 0.3β0.7)
Key Spark UI metrics to monitor: Task Duration (identify stragglers), Shuffle Read/Write (bottlenecks), GC Time (memory pressure), and Peak Execution Memory (data skew).
Enable event logging for post-mortem analysis: spark.eventLog.enabled=true. Use Spark History Server (port 18080) to replay completed applications and identify performance regressions.
ThStraggler Detection
Theorem: A task is a straggler if its duration exceeds median_task_duration Γ spark.speculation.multiplier (default 1.5). Speculative execution launches a backup copy; the first to complete is used. This bounds the tail latency to max(median, straggler_duration).
- Spark UI: real-time visibility into jobs, stages, tasks, storage, SQL
- Metrics system: export to Prometheus, Graphite, JMX via pluggable sinks
- Monitor: task duration, shuffle I/O, GC time, peak execution memory
- Enable event logging + History Server for post-mortem analysis
- Speculative execution handles stragglers by launching backup copies
ποΈ Monitoring Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β PYSPARK MONITORING ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β SPARK UI LAYERS β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Layer 1: APPLICATION LEVEL β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Spark UI (port 4040) β β β β
β β β β β’ Jobs tab: Job progress and status β β β β
β β β β β’ Stages tab: Stage details and tasks β β β β
β β β β β’ Storage tab: Cached data and storage usage β β β β
β β β β β’ Environment tab: Configuration and propertiesβ β β β
β β β β β’ Executors tab: Executor status and metrics β β β β
β β β β β’ SQL tab: Query execution plans β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Layer 2: HISTORY LEVEL β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β History Server (port 18080) β β β β
β β β β β’ Completed application history β β β β
β β β β β’ Job and stage statistics β β β β
β β β β β’ Executor metrics over time β β β β
β β β β β’ Configuration history β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Layer 3: CLUSTER LEVEL β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Cluster Manager UI β β β β
β β β β β’ YARN ResourceManager UI (port 8088) β β β β
β β β β β’ Kubernetes Dashboard β β β β
β β β β β’ Standalone Master UI (port 8080) β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β METRICS SYSTEMS β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Spark Metrics System β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Sources: β β β β
β β β β β’ ApplicationSource β β β β
β β β β β’ JVMSource β β β β
β β β β β’ CodeGenerationSource β β β β
β β β β β’ DAGSchedulerSource β β β β
β β β β β’ ExecutorAllocationManagerSource β β β β
β β β β β’ JvmMetricSource β β β β
β β β β β’ JobProgressSource β β β β
β β β β β’ MapOutputTrackerSource β β β β
β β β β β’ OperatingSystemSource β β β β
β β β β β’ PoolSource β β β β
β β β β β’ SparkSessionSource β β β β
β β β β β’ SystemMetricsSource β β β β
β β β β β’ TaburedStatisticsSource β β β β
β β β β β’ ThreadAllocationRateSource β β β β
β β β β β’ WheelerSource β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Sinks: β β β β
β β β β β’ ConsoleSink β β β β
β β β β β’ CsvSink β β β β
β β β β β’ JmxSink β β β β
β β β β β’ GraphiteSink β β β β
β β β β β’ StatsdSink β β β β
β β β β β’ PrometheusSink β β β β
β β β β β’ Slf4jSink β β β β
β β β β β’ ServletSink β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Metrics Collection Flow
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β METRICS COLLECTION FLOW β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β METRIC SOURCES β β
β β β β
β β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β β
β β β Application β β JVM β β System β β β
β β β Metrics β β Metrics β β Metrics β β β
β β β β β β β β β β
β β β β’ Job count β β β’ Heap usage β β β’ CPU usage β β β
β β β β’ Stage count β β β’ GC stats β β β’ Memory usage β β β
β β β β’ Task count β β β’ Thread count β β β’ Disk I/O β β β
β β β β’ Shuffle size β β β’ Buffer pool β β β’ Network I/O β β β
β β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β METRIC REGISTRY β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β MetricRegistry β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β’ Register metrics from sources β β β β
β β β β β’ Manage metric lifecycle β β β β
β β β β β’ Handle metric updates β β β β
β β β β β’ Provide metric access API β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β β Metric Types: β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Counter: Incrementing values β β β β
β β β β Gauge: Current values β β β β
β β β β Histogram: Value distributions β β β β
β β β β Meter: Event rates β β β β
β β β β Timer: Duration measurements β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β METRIC SINKS β β
β β β β
β β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β β
β β β Console Sink β β JMX Sink β β External Sinks β β β
β β β β β β β β β β
β β β β’ Debug output β β β’ JConsole β β β’ Graphite β β β
β β β β’ Development β β β’ Monitoring β β β’ Prometheus β β β
β β β β’ Testing β β tools β β β’ StatsD β β β
β β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β MONITORING DASHBOARD β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β βββββββββββββββ βββββββββββββββ βββββββββββββββ β β β
β β β β Real-time β β Historical β β Alerting β β β β
β β β β Dashboard β β Trends β β System β β β β
β β β β β β β β β β β β
β β β β β’ Live β β β’ Long β β β’ Thresholdβ β β β
β β β β metrics β β term β β alerts β β β β
β β β β β’ Current β β trends β β β’ Anomaly β β β β
β β β β status β β β’ Pattern β β detectionβ β β β
β β β β β β analysis β β β β β β
β β β βββββββββββββββ βββββββββββββββ βββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Performance Monitoring Dashboard
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β PERFORMANCE MONITORING DASHBOARD β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β JOB EXECUTION METRICS β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Job Progress: β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Job 1: ββββββββββββββββββββββββββββ 75% (Running)β β β β
β β β β Job 2: ββββββββββββββββββββββββββββ 100% (Done) β β β β
β β β β Job 3: ββββββββββββββββββββββββββββ 0% (Pending)β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β β Stage Progress: β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Stage 1: Map ββββββββββββββββββββ 80% β β β β
β β β β Stage 1: Reduce ββββββββββββββββββ 25% β β β β
β β β β Stage 2: Map ββββββββββββββββββββ 0% β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β EXECUTOR METRICS β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Executor Status: β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Executor 1: [β β β β β β β β ] Memory: 75% Cores: 4/4 β β β β
β β β β Executor 2: [β β β β β β β β ] Memory: 68% Cores: 3/4 β β β β
β β β β Executor 3: [β β β β β β β β ] Memory: 82% Cores: 4/4 β β β β
β β β β Executor 4: [β β β β β β β β ] Memory: 45% Cores: 2/4 β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β β Aggregate Metrics: β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Total Tasks: 1,250 Completed: 1,000 (80%) β β β β
β β β β Shuffle Read: 2.5 GB Shuffle Write: 1.8 GB β β β β
β β β β GC Time: 15s (2.5% of total time) β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β STORAGE METRICS β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Cached Data: β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Table: users β Size: 2.5 GB β Partitions: 20β β β β
β β β β Table: orders β Size: 1.8 GB β Partitions: 15β β β β
β β β β Table: products β Size: 0.5 GB β Partitions: 5 β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β β Storage Usage: β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Total Cached: 4.8 GB Max Storage: 10 GB β β β β
β β β β Memory Fraction: 0.6 Storage Fraction: 0.4 β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Detailed Explanation
Monitoring and metrics are essential for maintaining the health, performance, and reliability of PySpark applications. A comprehensive monitoring strategy provides visibility into application behavior, enables proactive issue detection, and supports capacity planning and optimization.
The Spark UI is the primary interface for monitoring application execution. It provides real-time information about jobs, stages, tasks, executors, and storage. The UI is available during application execution (port 4040) and can be accessed after completion through the History Server (port 18080). Understanding the UI's tabs and metrics is essential for debugging performance issues and optimizing applications.
The Spark Metrics System provides a flexible framework for collecting and reporting application metrics. It supports multiple sources (JVM, application, system metrics) and sinks (console, JMX, external systems). Metrics can be configured through the metrics.properties file or programmatically, allowing organizations to integrate Spark metrics with their existing monitoring infrastructure.
Key performance metrics include job and stage completion times, task execution statistics, shuffle read/write volumes, memory usage, and garbage collection overhead. These metrics help identify bottlenecks, detect resource constraints, and optimize application configurations.
The History Server provides persistent storage and visualization of completed application metrics. It enables historical analysis, trend identification, and comparison of different application runs. The History Server is essential for production environments where application performance needs to be tracked over time.
External monitoring systems like Prometheus, Graphite, and StatsD can integrate with Spark's metrics system to provide enterprise-grade monitoring capabilities. These systems offer advanced features like alerting, dashboards, and long-term data retention, making them suitable for production deployments.
Best practices for monitoring include: enabling event logging for all applications, configuring appropriate metrics sinks, setting up alerting for critical metrics, regularly reviewing performance trends, and maintaining monitoring documentation.
Advanced monitoring techniques include custom metrics sources for application-specific metrics, distributed tracing for complex workflows, and machine learning-based anomaly detection for predictive monitoring. These techniques help organizations move from reactive to proactive monitoring.
π Key Concepts Table
| Metric Category | Key Metrics | Threshold | Action |
|---|---|---|---|
| Job Performance | Job duration, stage count | > 30min | Optimize code, increase resources |
| Task Execution | Task duration, failure rate | > 10min, > 5% | Check data skew, increase parallelism |
| Memory Usage | Heap usage, GC time | > 80%, > 5% | Increase memory, tune GC |
| Shuffle I/O | Shuffle read/write | > 10GB | Optimize partitions, use broadcast |
| Storage | Cache hit ratio, storage usage | < 90%, > 80% | Tune caching, increase storage |
| Cluster Resources | Executor utilization, CPU usage | < 70%, > 90% | Adjust allocation, optimize workload |
π» Code Examples
Basic Monitoring Configuration
from pyspark.sql import SparkSession
# Configure Spark with monitoring
spark = SparkSession.builder \
.appName("MonitoredApplication") \
.config("spark.eventLog.enabled", "true") \
.config("spark.eventLog.dir", "hdfs:///spark-history") \
.config("spark.history.fs.logDirectory", "hdfs:///spark-history") \
.config("spark.history.ui.port", "18080") \
.config("spark.ui.enabled", "true") \
.config("spark.ui.port", "4040") \
.config("spark.ui.whitelist.axes", "jvm,system") \
.config("spark.metrics.conf", "/path/to/metrics.properties") \
.config("spark.metrics.listeners", "org.apache.spark.metrics.MetricsSystem") \
.getOrCreate()
# Access Spark UI
print(f"Spark UI available at: http://localhost:4040")
print(f"History Server available at: http://localhost:18080")
# Run application with monitoring
df = spark.range(1000000).repartition(100)
result = df.groupBy(col("id") % 10).count().collect()
print(f"Application completed with {len(result)} groups")
spark.stop()
Custom Metrics Collection
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import time
spark = SparkSession.builder \
.appName("CustomMetrics") \
.getOrCreate()
# Get Spark context for metrics
sc = spark.sparkContext
metrics = sc._jsc.status()
# Collect custom metrics
def collect_custom_metrics():
"""Collect custom application metrics"""
# Get JVM metrics
jvm = metrics.getJvm()
memory_pools = jvm.getMemoryPools()
custom_metrics = {}
# Memory metrics
for pool in memory_pools:
pool_name = pool.getName()
used = pool.getMemoryUsed()
max_size = pool.getMaxSize()
custom_metrics[f"memory.{pool_name}.used"] = used
custom_metrics[f"memory.{pool_name}.max"] = max_size
if max_size > 0:
custom_metrics[f"memory.{pool_name}.usage"] = used / max_size
# GC metrics
gc_pools = jvm.getGarbageCollectorPools()
for gc_pool in gc_pools:
gc_name = gc_pool.getName()
gc_count = gc_pool.getCollectionCount()
gc_time = gc_pool.getCollectionTime()
custom_metrics[f"gc.{gc_name}.count"] = gc_count
custom_metrics[f"gc.{gc_name}.time"] = gc_time
# Thread metrics
thread_count = jvm.getThreadCount()
peak_thread_count = jvm.getPeakThreadCount()
custom_metrics["jvm.thread_count"] = thread_count
custom_metrics["jvm.peak_thread_count"] = peak_thread_count
return custom_metrics
# Monitor application performance
def monitor_performance(df, operation_name):
"""Monitor performance of a DataFrame operation"""
start_time = time.time()
# Execute operation
result = df.collect()
end_time = time.time()
duration = end_time - start_time
# Collect metrics
custom_metrics = collect_custom_metrics()
# Log performance
print(f"\nOperation: {operation_name}")
print(f"Duration: {duration:.2f} seconds")
print(f"Records processed: {len(result)}")
print(f"Memory usage: {custom_metrics.get('memory.heap.used', 0) / 1024 / 1024:.2f} MB")
return result, duration, custom_metrics
# Test with different operations
df = spark.range(1000000)
# Monitor count operation
result, duration, metrics = monitor_performance(df, "count")
# Monitor groupBy operation
grouped_df = df.groupBy(col("id") % 100)
result, duration, metrics = monitor_performance(grouped_df, "groupBy")
# Monitor join operation
df2 = spark.range(1000000).withColumn("value", col("id") * 2)
joined_df = df.join(df2, "id")
result, duration, metrics = monitor_performance(joined_df, "join")
spark.stop()
Metrics Export to External System
from pyspark.sql import SparkSession
import json
import time
spark = SparkSession.builder \
.appName("MetricsExport") \
.getOrCreate()
class MetricsExporter:
"""Export Spark metrics to external systems"""
def __init__(self, app_name):
self.app_name = app_name
self.metrics_history = []
def export_to_console(self, metrics):
"""Export metrics to console"""
print(f"\n{'='*60}")
print(f"Application: {self.app_name}")
print(f"Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"{'='*60}")
for key, value in metrics.items():
print(f"{key}: {value}")
print(f"{'='*60}\n")
def export_to_json(self, metrics, filename):
"""Export metrics to JSON file"""
metrics_data = {
"app_name": self.app_name,
"timestamp": time.time(),
"metrics": metrics
}
self.metrics_history.append(metrics_data)
with open(filename, 'w') as f:
json.dump(self.metrics_history, f, indent=2)
print(f"Metrics exported to {filename}")
def export_to_prometheus(self, metrics, pushgateway_url):
"""Export metrics to Prometheus Pushgateway"""
try:
import requests
# Format metrics for Prometheus
prometheus_metrics = []
for key, value in metrics.items():
# Convert key to Prometheus format
prom_key = key.replace(".", "_")
prometheus_metrics.append(f"{prom_key} {value}")
# Push to Pushgateway
response = requests.post(
f"{pushgateway_url}/metrics/job/{self.app_name}",
data='\n'.join(prometheus_metrics)
)
if response.status_code == 200:
print(f"Metrics exported to Prometheus Pushgateway")
else:
print(f"Failed to export metrics: {response.status_code}")
except ImportError:
print("requests library not installed")
except Exception as e:
print(f"Error exporting to Prometheus: {e}")
# Create metrics exporter
exporter = MetricsExporter("MySparkApplication")
# Collect and export metrics
def collect_and_export_metrics():
"""Collect metrics and export to various systems"""
# Get Spark context
sc = spark.sparkContext
jvm = sc._jsc.status().getJvm()
# Collect metrics
metrics = {
"spark.executor.memory.used": jvm.getHeapMemoryUsed(),
"spark.executor.memory.max": jvm.getHeapMemoryMax(),
"spark.executor.memory.free": jvm.getHeapMemoryFree(),
"spark.jvm.thread.count": jvm.getThreadCount(),
"spark.jvm.uptime": jvm.getUptime(),
}
# Add GC metrics
gc_pools = jvm.getGarbageCollectorPools()
for i, gc_pool in enumerate(gc_pools):
metrics[f"spark.gc.{i}.count"] = gc_pool.getCollectionCount()
metrics[f"spark.gc.{i}.time"] = gc_pool.getCollectionTime()
# Export to different systems
exporter.export_to_console(metrics)
exporter.export_to_json(metrics, "metrics.json")
return metrics
# Monitor application
df = spark.range(1000000)
result = df.groupBy(col("id") % 10).count().collect()
# Export final metrics
final_metrics = collect_and_export_metrics()
spark.stop()
Alerting System
from pyspark.sql import SparkSession
import time
import smtplib
from email.mime.text import MIMEText
spark = SparkSession.builder \
.appName("AlertingSystem") \
.getOrCreate()
class AlertingSystem:
"""Alerting system for Spark applications"""
def __init__(self, email_config=None):
self.email_config = email_config
self.alert_history = []
def check_threshold(self, metric_name, value, threshold, alert_type="warning"):
"""Check if metric exceeds threshold"""
if alert_type == "warning" and value > threshold:
self.send_alert(metric_name, value, threshold, "WARNING")
return True
elif alert_type == "critical" and value > threshold:
self.send_alert(metric_name, value, threshold, "CRITICAL")
return True
return False
def send_alert(self, metric_name, value, threshold, level):
"""Send alert notification"""
alert_message = f"""
Spark Application Alert
Level: {level}
Metric: {metric_name}
Current Value: {value}
Threshold: {threshold}
Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}
"""
print(f"\n{'!'*60}")
print(f"ALERT [{level}]: {metric_name}")
print(f"Value: {value} exceeds threshold: {threshold}")
print(f"{'!'*60}\n")
# Store alert
self.alert_history.append({
"level": level,
"metric": metric_name,
"value": value,
"threshold": threshold,
"timestamp": time.time()
})
# Send email if configured
if self.email_config and level == "CRITICAL":
self.send_email_alert(alert_message)
def send_email_alert(self, message):
"""Send email alert for critical issues"""
try:
msg = MIMEText(message)
msg['Subject'] = 'Spark Application Critical Alert'
msg['From'] = self.email_config['from']
msg['To'] = self.email_config['to']
with smtplib.SMTP(self.email_config['smtp_server']) as server:
server.send_message(msg)
print("Critical alert email sent")
except Exception as e:
print(f"Failed to send email alert: {e}")
def get_alert_history(self):
"""Get alert history"""
return self.alert_history
# Create alerting system
alerting = AlertingSystem()
# Monitor metrics and check thresholds
def monitor_with_alerting():
"""Monitor metrics with alerting"""
sc = spark.sparkContext
jvm = sc._jsc.status().getJvm()
# Get metrics
heap_used = jvm.getHeapMemoryUsed()
heap_max = jvm.getHeapMemoryMax()
heap_usage = heap_used / heap_max if heap_max > 0 else 0
# Check thresholds
alerting.check_threshold("heap_usage", heap_usage * 100, 80, "warning")
alerting.check_threshold("heap_usage", heap_usage * 100, 95, "critical")
# Get GC metrics
gc_pools = jvm.getGarbageCollectorPools()
for gc_pool in gc_pools:
gc_time = gc_pool.getCollectionTime()
alerting.check_threshold(f"gc_time_{gc_pool.getName()}", gc_time, 5000, "warning")
return {
"heap_used": heap_used,
"heap_max": heap_max,
"heap_usage_percent": heap_usage * 100
}
# Monitor application
df = spark.range(1000000)
result = df.groupBy(col("id") % 10).count().collect()
# Check metrics with alerting
metrics = monitor_with_alerting()
# Get alert history
alert_history = alerting.get_alert_history()
print(f"\nTotal alerts: {len(alert_history)}")
spark.stop()
π Performance Metrics
| Metric | Target | Warning | Critical | Action |
|---|---|---|---|---|
| Job Duration | < 10min | 10-30min | > 30min | Optimize code, increase resources |
| Stage Failure Rate | < 1% | 1-5% | > 5% | Check data skew, increase parallelism |
| Task Duration | < 5min | 5-15min | > 15min | Repartition data, check skew |
| Heap Usage | < 70% | 70-85% | > 85% | Increase memory, optimize code |
| GC Overhead | < 5% | 5-10% | > 10% | Tune GC, increase memory |
| Shuffle Size | < 5GB | 5-20GB | > 20GB | Optimize partitions, use broadcast |
| Cache Hit Ratio | > 95% | 90-95% | < 90% | Tune caching strategy |
| Executor Utilization | > 80% | 60-80% | < 60% | Increase parallelism, check skew |
π Best Practices
- Enable event logging - Record all application events for historical analysis
- Configure appropriate metrics sinks - Use console for development, external systems for production
- Set up alerting - Define thresholds for critical metrics and implement notifications
- Monitor regularly - Review performance metrics daily for production applications
- Analyze trends - Track metrics over time to identify patterns and anomalies
- Document baselines - Establish performance baselines for comparison
- Use History Server - Analyze completed applications for optimization opportunities
- Integrate with monitoring systems - Connect Spark metrics to enterprise monitoring platforms
- Create dashboards - Build visual dashboards for easy performance monitoring
- Review and optimize - Continuously improve monitoring based on feedback
π Related Topics
- 17-cluster-management.mdx: Cluster resource monitoring
- 18-gc-tuning.mdx: Garbage collection monitoring and optimization
- 19-spark-submit.mdx: Deployment monitoring configuration
- 15-data-quality.mdx: Data quality monitoring and validation
See Also
- Kafka Streams (kafka/03): Monitoring Kafka consumer lag and throughput
- Data Engineering Streaming (data-engineering/022): Monitoring streaming pipeline health