Cost Optimization for PySpark Workloads
Architecture Diagram: Cost Optimization Framework
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β PYSPARK COST OPTIMIZATION FRAMEWORK β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ£
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β COST OPTIMIZATION PILLARS β β
β β β β
β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β
β β β COMPUTE β β STORAGE β β NETWORK β β OPERATIONS β β β
β β β β β β β β β β β β
β β β β’ Spot/Pre- β β β’ Tiered β β β’ Data β β β’ Scheduling β β β
β β β emptible β β Storage β β Locality β β β’ Monitoring β β β
β β β β’ Auto- β β β’ Compaction β β β’ Compressionβ β β’ Right- β β β
β β β Scaling β β β’ Lifecycle β β β’ Shuffle β β sizing β β β
β β β β’ Right- β β Policies β β Optimizationβ β β’ Automation β β β
β β β sizing β β β’ Format β β β’ VPC β β β β β
β β β β’ Scheduling β β Selection β β Endpoints β β β β β
β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β COST BREAKDOWN (Typical PySpark Workload) β β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β COMPUTE: 60% ββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β (EC2/EMR instances, Databricks DBUs, Spark executors) β β β
β β β β β β
β β β STORAGE: 25% ββββββββββββββββββββββββββ β β β
β β β (S3/ADLS/GCS, Delta Lake files, checkpoints, temp data) β β β
β β β β β β
β β β NETWORK: 10% ββββββββββ β β β
β β β (Data transfer, cross-AZ traffic, internet egress) β β β
β β β β β β
β β β OPERATIONS: 5% βββββ β β β
β β β (Monitoring, logging, CI/CD, orchestration) β β β
β β β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β SAVINGS OPPORTUNITY MATRIX β β
β β β β
β β ββββββββββββββββββββββ¬βββββββββββββββ¬βββββββββββββββ¬βββββββββββββββββββββββββββ β β
β β β Strategy β Effort β Savings β Risk β β β
β β ββββββββββββββββββββββΌβββββββββββββββΌβββββββββββββββΌβββββββββββββββββββββββββββ€ β β
β β β Spot Instances β Low β 60-80% β Interruption (low risk) β β β
β β β Right-Sizing β Medium β 30-50% β Performance (low risk) β β β
β β β Auto-Scaling β Medium β 20-40% β Over-provisioning β β β
β β β Storage Tiering β Low β 40-60% β Access latency (low) β β β
β β β File Compaction β Medium β 30-50% β Compute cost (low) β β β
β β β Format Optimizationβ Low β 20-40% β Compatibility (low) β β β
β β β Scheduling β Low β 15-30% β SLA compliance (medium) β β β
β β β Data Locality β Medium β 10-25% β Complexity (low) β β β
β β ββββββββββββββββββββββ΄βββββββββββββββ΄βββββββββββββββ΄βββββββββββββββββββββββββββ β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Architecture Diagram: Spot Instance Strategy
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SPOT INSTANCE STRATEGY FOR PYSPARK β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β HYBRID INSTANCE STRATEGY β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β DRIVER NODE (Always On - On-Demand) β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Instance: r5.xlarge (4 vCPU, 32 GB RAM) β β β β
β β β β Cost: $0.252/hour Γ 24 hours Γ 30 days = $181/month β β β β
β β β β Role: Job scheduling, result collection, DAG management β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β EXECUTOR NODES (Auto-Scaled - Mixed Spot/On-Demand) β β β
β β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β BASE LAYER: On-Demand (Always Available) β β β β
β β β β βββββββββββ βββββββββββ βββββββββββ β β β β
β β β β βr5.2xl β βr5.2xl β βr5.2xl β 3 instances β β β β
β β β β β$0.504/h β β$0.504/h β β$0.504/h β $363/month β β β β
β β β β βββββββββββ βββββββββββ βββββββββββ β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β BURST LAYER: Spot (Up to 80% Savings) β β β β
β β β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β β β
β β β β βr5.2xl β βr5.2xl β βr5.2xl β βr5.2xl β β β β β
β β β β βSPOT β βSPOT β βSPOT β βSPOT β β β β β
β β β β β$0.151/h β β$0.151/h β β$0.151/h β β$0.151/h β β β β β
β β β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β β β
β β β β 4 instances Γ $0.151/h Γ 24h Γ 30d = $435/month β β β β
β β β β (vs $1,451/month On-Demand = 70% savings) β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β β Total Monthly: $363 (On-Demand) + $435 (Spot) = $798/month β β β
β β β vs All On-Demand: $363 + $1,451 = $1,814/month β β β
β β β Savings: $1,016/month (56%) β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β SPOT INTERRUPTION HANDLING β β
β β β β
β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β
β β β Spot βββββΆβ 2-min βββββΆβ CheckpointββββΆβ Resume β β β
β β β Price β β Warning β β State β β on New β β β
β β β Spike β β Received β β to S3 β β Spot β β β
β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β
β β β β
β β Key: Enable checkpointing every 10-30 seconds to minimize data loss β β
β β Use multiple instance types (r5, r4, m5) for diversification β β
β β Configure Spot Fleet with allocation strategy = capacity-optimized β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Architecture Diagram: TCO Analysis Dashboard
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β TOTAL COST OF OWNERSHIP (TCO) ANALYSIS β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β SCENARIO: Processing 1 TB/day, 30-day retention, 10 concurrent users β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β OPTION A: EMR Cluster (Self-Managed) β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Compute (m5.2xlarge Γ 10 nodes): β β β β
β β β β β’ On-Demand: $1,814/month β β β β
β β β β β’ With Spot (70%): $726/month β β β β
β β β β β β β β
β β β β Storage (S3): β β β β
β β β β β’ 30 TB Γ $0.023/GB = $690/month β β β β
β β β β β β β β
β β β β Network: β β β β
β β β β β’ Cross-AZ: $200/month β β β β
β β β β β β β β
β β β β Operations: β β β β
β β β β β’ 0.25 FTE DevOps: $3,125/month β β β β
β β β β β β β β
β β β β TOTAL: $5,331/month (Spot) vs $6,429/month (On-Demand) β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β β OPTION B: Databricks (Managed) β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β DBUs (Premium Tier): β β β β
β β β β β’ Driver (1 Γ DB10.4): $0.55/hr Γ 8760h = $4,818/year β β β β
β β β β β’ Executors (10 Γ DB10.4): $5.50/hr Γ 4380h = $24,090/year β β β β
β β β β β’ Total DBUs: $28,908/year = $2,409/month β β β β
β β β β β β β β
β β β β Storage (DBFS + S3): β β β β
β β β β β’ 30 TB Γ $0.023/GB = $690/month β β β β
β β β β β β β β
β β β β Network: β β β β
β β β β β’ Included in DBU pricing β β β β
β β β β β β β β
β β β β Operations: β β β β
β β β β β’ 0.1 FTE Data Engineer: $1,250/month β β β β
β β β β β β β β
β β β β TOTAL: $4,349/month β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β β OPTION C: EMR Serverless (AWS) β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Compute: β β β β
β β β β β’ vCPU-hours: 100 vCPU Γ 8h/day Γ 30d = 24,000 β β β β
β β β β β’ Cost: 24,000 Γ $0.05265 = $1,264/month β β β β
β β β β β’ Memory: 200 GB Γ 8h/day Γ 30d = 48,000 GB-hours β β β β
β β β β β’ Cost: 48,000 Γ $0.00575 = $276/month β β β β
β β β β β β β β
β β β β Storage: $690/month (same) β β β β
β β β β Operations: $1,250/month (0.1 FTE) β β β β
β β β β β β β β
β β β β TOTAL: $3,480/month β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β COST COMPARISON (Annual) β β β
β β β β β β
β β β EMR (Spot): ββββββββββββββββββββββββββββββββββββββ $63,972 β β β
β β β EMR (On-Demand): ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ $77,148 β β β
β β β Databricks: ββββββββββββββββββββββββββββββββββ $52,188 β β β
β β β EMR Serverless: ββββββββββββββββββββββββββββ $41,760 β β β
β β β β β β
β β β Winner: EMR Serverless (lowest total cost) β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Detailed Explanation
Cost optimization for PySpark workloads requires a systematic approach that addresses compute, storage, network, and operational costs simultaneously. The largest cost component is typically compute (60-70% of total spend), followed by storage (20-25%), network (5-10%), and operations (5%). Each component offers distinct optimization opportunities with varying effort-to-savings ratios.
Compute optimization focuses on right-sizing instances, leveraging spot/preemptible instances, implementing auto-scaling, and optimizing Spark configurations. Spot instances offer 60-80% savings over on-demand pricing with minimal risk when properly configured. The key is to use a hybrid strategy: maintain a small base of on-demand instances for guaranteed capacity, and use spot instances for burst capacity. Enable checkpointing every 10-30 seconds to minimize data loss on spot interruptions, and configure multiple instance types (r5, r4, m5) for diversification across spot pools.
Right-sizing involves matching instance types to workload characteristics. CPU-bound transformations (joins, aggregations) benefit from compute-optimized instances (c5, c6g). Memory-intensive operations (large shuffles, caching) benefit from memory-optimized instances (r5, r6g). Mixed workloads benefit from general-purpose instances (m5, m6g). Profiling your Spark applications with the Spark UI and cloud monitoring tools reveals which resource type is the bottleneck.
Auto-scaling dynamically adjusts cluster size based on workload demands. Configure minimum and maximum executor counts based on historical patterns, and use scale-down cooldown periods to prevent thrashing. For streaming workloads, use fixed-size clusters or schedule-based scaling. For batch workloads, use load-based auto-scaling with aggressive scale-down.
Storage optimization addresses both the cost of storing data and the performance impact of storage choices. Tiered storage moves older data to cheaper storage classes (S3 Standard β S3 Infrequent Access β S3 Glacier). File compaction (OPTIMIZE in Delta Lake) reduces the number of small files, which reduces storage costs (fewer file metadata objects) and improves query performance (fewer files to scan). Columnar formats (Parquet, ORC) with compression (Zstd, Snappy) reduce storage footprint by 60-80% compared to raw text.
Network optimization focuses on minimizing data movement. Data locality keeps processing close to data (same AZ, same region). Columnar formats with predicate pushdown reduce the amount of data transferred over the network. Compression reduces network transfer volume. VPC endpoints for S3/ADLS eliminate internet egress charges. Cross-region replication should be minimized and scheduled during off-peak hours.
Key Concepts Table
Mathematical Foundations
Definition: Total Cost of Ownership
TCO for a Spark cluster over period :
Spot Instance Savings
Cost reduction with spot instances for fault-tolerant workloads:
Expected interruption rate: to per hour.
Right-Sizing Theorem
Optimal instance count minimizes total cost while meeting SLO:
Over-provisioning by factor : .
Storage Tiering Benefit
Cost savings from tiered storage with access frequency :
where is the fraction of data at tier .
Autoscaling Efficiency
Autoscaling reduces cost by when utilization variance is :
Higher variance = more savings from autoscaling.
Key Insight
The biggest cost lever is usually instance type selection, not count. Memory-optimized instances cost 2-3x more but may process data 5-10x faster, reducing total cost. Always benchmark with your specific workload.
Summary
Cost optimization involves right-sizing instances, using spot/reserved capacity, tiered storage, and autoscaling. TCO includes compute, storage, network, and labor costs. Spot instances provide 60-90% savings with acceptable interruption risk. Autoscaling benefits scale with workload variance.
Key Concepts Table (cont.)
| Optimization Area | Strategy | Savings | Effort | Risk Level |
|---|---|---|---|---|
| Spot Instances | Use spot for executors | 60-80% | Low | Low (with checkpointing) |
| Right-Sizing | Match instance to workload | 30-50% | Medium | Low |
| Auto-Scaling | Dynamic cluster sizing | 20-40% | Medium | Low |
| Storage Tiering | Move old data to IA/Glacier | 40-60% | Low | Low |
| File Compaction | Merge small files | 30-50% | Medium | Low |
| Format Optimization | Use Parquet + Zstd | 20-40% | Low | Low |
| Shuffle Optimization | Reduce data movement | 10-25% | Medium | Low |
| Data Locality | Process data near storage | 10-25% | Medium | Low |
| Caching | Cache hot data in memory | 15-30% | Low | Low |
| Scheduling | Off-peak execution | 15-30% | Low | Medium |
Code Examples
Example 1: Spot Instance Configuration with Auto-Scaling
from pyspark.sql import SparkSession
import json
# βββ EMR Cluster Configuration (Cost-Optimized) βββ
emr_config = {
"Name": "pyspark-cost-optimized",
"ReleaseLabel": "emr-6.15.0",
"Applications": [
{"Name": "Spark"},
{"Name": "JupyterEnterpriseGateway"},
],
"Instances": {
"MasterInstanceType": "m5.xlarge", # On-Demand (always available)
"MasterInstanceMarket": "ON_DEMAND",
"SlaveInstanceType": "r5.2xlarge", # Spot (cost-optimized)
"SlaveInstanceMarket": "SPOT",
"InstanceCount": 3, # Base instances
"Ec2KeyName": "my-key-pair",
"Ec2SubnetId": "subnet-xxx",
"KeepJobFlowAliveWhenNoSteps": True,
"TerminationProtected": True,
# Auto-scaling configuration
"AutoScaling": {
"Constraints": {
"MinCapacity": 3,
"MaxCapacity": 20
},
"Rules": [
{
"Name": "ScaleOutCPU",
"Description": "Scale out when CPU > 70%",
"Action": {
"SimpleScalingPolicyConfiguration": {
"AdjustmentType": "ChangeInCapacity",
"ScalingAdjustment": 2,
"CoolDown": 300
}
},
"Trigger": {
"CloudWatchAlarmDefinition": {
"MetricName": "CPUUtilization",
"Statistic": "Average",
"Period": 300,
"EvaluationPeriods": 2,
"Threshold": 70,
"ComparisonOperator": "GreaterThanThreshold"
}
}
},
{
"Name": "ScaleInCPU",
"Description": "Scale in when CPU < 30%",
"Action": {
"SimpleScalingPolicyConfiguration": {
"AdjustmentType": "ChangeInCapacity",
"ScalingAdjustment": -1,
"CoolDown": 600
}
},
"Trigger": {
"CloudWatchAlarmDefinition": {
"MetricName": "CPUUtilization",
"Statistic": "Average",
"Period": 300,
"EvaluationPeriods": 3,
"Threshold": 30,
"ComparisonOperator": "LessThanThreshold"
}
}
}
]
}
},
"Configurations": [
{
"Classification": "spark-defaults",
"Properties": {
# Cost-optimized Spark settings
"spark.dynamicAllocation.enabled": "true",
"spark.dynamicAllocation.minExecutors": "3",
"spark.dynamicAllocation.maxExecutors": "20",
"spark.dynamicAllocation.executorIdleTimeout": "120s",
"spark.dynamicAllocation.schedulerBacklogTimeout": "60s",
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout": "60s",
# Spot instance handling
"spark.executor.instances": "3",
"spark.executor.memory": "8g",
"spark.executor.cores": "4",
"spark.driver.memory": "4g",
"spark.driver.cores": "2",
# Enable checkpointing for spot resilience
"spark.sql.streaming.checkpointLocation": "s3://my-bucket/checkpoints",
# Adaptive query execution
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.sql.adaptive.skewJoin.enabled": "true",
# Serialization
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.kryoserializer.buffer.max": "512m",
# Memory management
"spark.memory.fraction": "0.8",
"spark.memory.storageFraction": "0.3",
}
}
]
}
# βββ Create Cost-Optimized Spark Session βββ
spark = (
SparkSession.builder
.appName("Cost-Optimized-PySpark")
.config("spark.dynamicAllocation.enabled", "true")
.config("spark.dynamicAllocation.minExecutors", "3")
.config("spark.dynamicAllocation.maxExecutors", "20")
.config("spark.executor.instances", "3")
.config("spark.executor.memory", "8g")
.config("spark.executor.cores", "4")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.config("spark.sql.adaptive.skewJoin.enabled", "true")
.config("spark.sql.shuffle.partitions", "200")
.config("spark.default.parallelism", "200")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
)
Example 2: Storage Optimization with Delta Lake
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
spark = SparkSession.builder \
.appName("Storage-Optimization") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.getOrCreate()
# βββ Optimize Table Storage βββ
def optimize_table_storage(spark, table_path, z_order_columns=None):
"""Comprehensive storage optimization for Delta tables."""
delta_table = DeltaTable.forPath(spark, table_path)
# 1. Compaction: Merge small files
print("Running compaction...")
delta_table.optimize().executeCompaction()
# 2. Z-ORDER: Multi-dimensional clustering
if z_order_columns:
print(f"Running Z-ORDER on {z_order_columns}...")
delta_table.optimize().executeZOrderBy(*z_order_columns)
# 3. VACUUM: Remove old files (default 7 days)
print("Running VACUUM...")
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
delta_table.vacuum(retentionHours=168) # 7 days
# 4. Get table statistics
detail = spark.sql(f"DESCRIBE DETAIL delta.`{table_path}`").first()
history = spark.sql(f"DESCRIBE HISTORY delta.`{table_path}` LIMIT 5").collect()
print(f"Table Statistics:")
print(f" Num Files: {detail.numFiles}")
print(f" Size: {detail.sizeInBytes / (1024**3):.2f} GB")
print(f" Num Versions: {detail.numVersions}")
return detail
# Optimize a high-traffic table
optimize_table_storage(
spark,
"/mnt/lakehouse/silver/transactions",
z_order_columns=["customer_id", "transaction_date"]
)
# βββ Implement Storage Lifecycle Policies βββ
def setup_storage_lifecycle(bucket_name):
"""Configure S3 lifecycle policies for tiered storage."""
import boto3
s3 = boto3.client('s3')
lifecycle_config = {
'Rules': [
{
'ID': 'TieredStorage',
'Status': 'Enabled',
'Filter': {'Prefix': 'lakehouse/'},
'Transitions': [
{
'Days': 30,
'StorageClass': 'STANDARD_IA',
},
{
'Days': 90,
'StorageClass': 'GLACIER',
},
{
'Days': 365,
'StorageClass': 'DEEP_ARCHIVE',
},
],
'Expiration': {'Days': 730}, # Delete after 2 years
},
{
'ID': 'CleanupTempFiles',
'Status': 'Enabled',
'Filter': {'Prefix': 'lakehouse/temp/'},
'Expiration': {'Days': 7}, # Delete temp files after 7 days
},
]
}
s3.put_bucket_lifecycle_configuration(
Bucket=bucket_name,
LifecycleConfiguration=lifecycle_config
)
# βββ Monitor Storage Costs βββ
def monitor_storage_costs(spark, table_path):
"""Monitor storage metrics for cost optimization."""
detail = spark.sql(f"DESCRIBE DETAIL delta.`{table_path}`").first()
history = spark.sql(f"DESCRIBE HISTORY delta.`{table_path}`").collect()
# Calculate storage metrics
total_size_gb = detail.sizeInBytes / (1024**3)
num_files = detail.numFiles
avg_file_size_mb = (detail.sizeInBytes / num_files) / (1024**2) if num_files > 0 else 0
# Estimate costs
s3_standard_cost = total_size_gb * 0.023 # $/GB/month
s3_ia_cost = total_size_gb * 0.0125
s3_glacier_cost = total_size_gb * 0.004
print(f"\nStorage Analysis for {table_path}:")
print(f" Total Size: {total_size_gb:.2f} GB")
print(f" Number of Files: {num_files}")
print(f" Average File Size: {avg_file_size_mb:.2f} MB")
print(f"\nEstimated Monthly Costs:")
print(f" S3 Standard: ${s3_standard_cost:.2f}")
print(f" S3 Infrequent Access: ${s3_ia_cost:.2f}")
print(f" S3 Glacier: ${s3_glacier_cost:.2f}")
if avg_file_size_mb < 64:
print(f"\n WARNING: Average file size is small ({avg_file_size_mb:.2f} MB)")
print(f" Consider running OPTIMIZE to merge small files")
return {
"total_size_gb": total_size_gb,
"num_files": num_files,
"avg_file_size_mb": avg_file_size_mb,
"estimated_cost_standard": s3_standard_cost,
}
monitor_storage_costs(spark, "/mnt/lakehouse/silver/transactions")
Example 3: Spark Configuration Tuning for Cost Efficiency
from pyspark.sql import SparkSession
# βββ Cost-Optimized Spark Configuration Templates βββ
# Template 1: Batch Processing (Cost-Optimized)
BATCH_CONFIG = {
"spark.sql.shuffle.partitions": "200", # Reduce from default 200
"spark.sql.adaptive.enabled": "true", # AQE for dynamic optimization
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.sql.adaptive.skewJoin.enabled": "true",
"spark.sql.files.maxPartitionBytes": "134217728", # 128 MB per partition
"spark.sql.files.openCostInBytes": "8388608", # 8 MB
"spark.memory.fraction": "0.8", # 80% for execution/storage
"spark.memory.storageFraction": "0.3", # 30% for storage
"spark.shuffle.compress": "true",
"spark.shuffle.spill.compress": "true",
"spark.rdd.compress": "true",
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.kryoserializer.buffer.max": "512m",
"spark.sql.autoBroadcastJoinThreshold": "10485760", # 10 MB
"spark.sql.sources.partitionOverwriteMode": "dynamic",
"spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite": "true",
"spark.databricks.delta.properties.defaults.autoOptimize.autoCompact": "true",
}
# Template 2: Streaming Processing (Low Latency)
STREAMING_CONFIG = {
"spark.sql.streaming.microBatchPartitions": "200",
"spark.sql.streaming.stateStore.providerClass": "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider",
"spark.sql.streaming.stateStore.rocksdb.blockingRestartTimeout": "60s",
"spark.sql.streaming.stateStore.rocksdb.compaction.enabled": "true",
"spark.sql.streaming.noDataMicroBatches.enabled": "true",
"spark.sql.streaming.schemaInference": "true",
"spark.sql.adaptive.enabled": "true",
"spark.sql.shuffle.partitions": "200",
"spark.memory.fraction": "0.8",
"spark.memory.storageFraction": "0.2", # Less storage for streaming
"spark.streaming.stopGracefullyOnShutdown": "true",
}
# Template 3: Machine Learning (GPU-Optimized)
ML_CONFIG = {
"spark.sql.shuffle.partitions": "200",
"spark.sql.adaptive.enabled": "true",
"spark.executor.instances": "4",
"spark.executor.memory": "16g",
"spark.executor.cores": "4",
"spark.driver.memory": "8g",
"spark.memory.fraction": "0.8",
"spark.memory.storageFraction": "0.2", # Less storage for ML
"spark.sql.autoBroadcastJoinThreshold": "52428800", # 50 MB
"spark.sql.execution.arrow.pyspark.enabled": "true",
"spark.sql.execution.arrow.maxRecordsPerBatch": "10000",
}
def create_cost_optimized_session(app_name, config_template="batch"):
"""Create a cost-optimized Spark session."""
config = {
"batch": BATCH_CONFIG,
"streaming": STREAMING_CONFIG,
"ml": ML_CONFIG,
}.get(config_template, BATCH_CONFIG)
builder = SparkSession.builder.appName(app_name)
for key, value in config.items():
builder = builder.config(key, value)
return builder.getOrCreate()
# Create sessions for different workload types
batch_spark = create_cost_optimized_session("Batch-Processing", "batch")
streaming_spark = create_cost_optimized_session("Streaming-Processing", "streaming")
ml_spark = create_cost_optimized_session("ML-Training", "ml")
Performance Metrics
| Optimization | Before | After | Savings | Implementation Time |
|---|---|---|---|---|
| Spot Instances (executors) | 726/month | 60% ($1,088) | 1 day | |
| Right-Sizing (r5.2xl β r5.xl) | 1,270/month | 30% ($544) | 2 days | |
| Auto-Scaling (3-20 nodes) | 1,180/month | 35% ($634) | 3 days | |
| File Compaction (OPTIMIZE) | 414/month (18TB) | 40% ($276) | 1 day | |
| Storage Tiering (IA after 30d) | 345/month | 50% ($345) | 0.5 days | |
| Format (CSV β Parquet+Zstd) | 276/month | 60% ($414) | 2 days | |
| Shuffle Optimization | 120/month | 40% ($80) | 2 days | |
| Caching (hot data) | 1,542/month | 15% ($272) | 1 day | |
| Scheduling (off-peak) | 1,542/month | 15% ($272) | 1 day | |
| TOTAL | 5,995/month | 37% ($3,511) | 13 days |
Best Practices
-
Always use Spot instances for executors β Spot instances offer 60-80% savings with minimal risk when checkpointing is enabled. Configure multiple instance types (r5, r4, m5) for diversification and use capacity-optimized allocation strategy.
-
Enable dynamic allocation β Set
spark.dynamicAllocation.enabled=truewith appropriate min/max executor counts. This allows the cluster to scale down during quiet periods and scale up during peaks. -
Right-size your executors β Profile your workload to determine the optimal executor size. For most workloads, 4-8 cores per executor with 8-16 GB memory provides the best cost-performance ratio.
-
Implement storage tiering β Configure S3/ADLS lifecycle policies to move data to cheaper storage classes after 30 days (IA) and 90 days (Glacier). Delta Lake time travel queries automatically use the appropriate storage tier.
-
Compact files regularly β Run
OPTIMIZEon all active tables to merge small files. Target 128-512 MB file sizes for optimal read performance and storage efficiency. Use auto-compaction for streaming workloads. -
Use columnar formats with compression β Store data in Parquet or ORC format with Zstd or Snappy compression. This reduces storage by 60-80% and improves query performance through predicate pushdown.
-
Monitor and alert on costs β Set up cloud cost monitoring dashboards and alerts for unexpected cost spikes. Track cost per query, cost per GB processed, and cost per data product.
-
Schedule batch jobs during off-peak hours β Run heavy batch processing during night/weekend hours when spot prices are lower and on-demand capacity is more available.
-
Use broadcast joins for small tables β Set
spark.sql.autoBroadcastJoinThresholdto 10-50 MB to automatically broadcast small tables, avoiding expensive shuffle joins. -
Profile before optimizing β Use the Spark UI, Databricks query profiles, or AWS/GCP cost explorer to identify the actual bottlenecks before applying optimizations. Focus on the highest-impact optimizations first.
See also: Snowflake Time Travel (snowflake/02), Kafka CDC (kafka/04), Airflow DAGs (airflow/02)