Cost Management at Scale: Optimizing Cloud Data Infrastructure
Difficulty: Senior Level | Companies: Netflix, Uber, Airbnb, Spotify, Stripe
1. Cloud Cost Categories
Architecture Diagram
Data Platform Costs:
βββ Compute (60-70%)
β βββ Spark/EMR clusters
β βββ Snowflake/BigQuery credits
β βββ Kubernetes pods
βββ Storage (15-25%)
β βββ S3/GCS storage
β βββ Data warehouse storage
β βββ Database storage
βββ Network (5-10%)
β βββ Data transfer
β βββ API calls
β βββ Load balancers
βββ Other (5-10%)
βββ Monitoring tools
βββ CI/CD pipelines
βββ Third-party services
2. Compute Cost Optimization
Spot Instances for Spark
class CostOptimizedCluster:
"""Configure cost-optimized EMR/Spark clusters"""
@staticmethod
def get_spot_cluster_config():
return {
"Master": {
"InstanceType": "m5.xlarge",
"Market": "ON_DEMAND", # Master should be stable
},
"Core": {
"InstanceType": "r5.2xlarge",
"Market": "SPOT", # Core nodes can be interrupted
"BidPrice": "0.30", # 70% discount
},
"Task": {
"InstanceType": "m5.4xlarge",
"Market": "SPOT",
"InstanceCount": 10,
},
"Configurations": [
{
"Classification": "spark-defaults",
"Properties": {
"spark.dynamicAllocation.enabled": "true",
"spark.shuffle.service.enabled": "true",
"spark.emr.enableSpot": "true",
}
}
]
}
@staticmethod
def estimate_savings(on_demand_hours: int, spot_hours: int) -> dict:
on_demand_cost = on_demand_hours * 0.50 # $0.50/hr
spot_cost = spot_hours * 0.15 # $0.15/hr (70% discount)
return {
"on_demand_cost": on_demand_cost,
"spot_cost": spot_cost,
"savings": on_demand_cost - spot_cost,
"savings_percent": ((on_demand_cost - spot_cost) / on_demand_cost) * 100
}
Auto-Scaling Policies
class AutoScalingPolicy:
"""Right-size clusters based on workload"""
def __init__(self):
self.policies = {
"scale_up": {
"metric": "queue_depth",
"threshold": 100,
"action": "add_instances",
"count": 5,
},
"scale_down": {
"metric": "queue_depth",
"threshold": 10,
"duration_minutes": 30,
"action": "remove_instances",
"count": 3,
}
}
def evaluate(self, metrics: dict) -> str:
if metrics["queue_depth"] > self.policies["scale_up"]["threshold"]:
return "SCALE_UP"
elif metrics["queue_depth"] < self.policies["scale_down"]["threshold"]:
return "SCALE_DOWN"
return "HOLD"
3. Storage Cost Optimization
Storage Tiering
class StorageTierManager:
"""Automatically move data between storage tiers"""
TIER_RULES = {
"hot": {"days": 0, "cost_per_gb": 0.023}, # S3 Standard
"warm": {"days": 30, "cost_per_gb": 0.0125}, # S3 IA
"cold": {"days": 90, "cost_per_gb": 0.004}, # S3 Glacier IR
"archive": {"days": 365, "cost_per_gb": 0.00099}, # S3 Glacier
}
def optimize_storage(self, s3_client, bucket: str):
"""Move objects to cheaper tiers based on age"""
paginator = s3_client.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=bucket):
for obj in page.get('Contents', []):
age_days = (datetime.now() - obj['LastModified'].replace(tzinfo=None)).days
if age_days > 365 and obj['StorageClass'] != 'GLACIER':
self._transition(s3_client, bucket, obj['Key'], 'GLACIER')
elif age_days > 90 and obj['StorageClass'] != 'STANDARD_IA':
self._transition(s3_client, bucket, obj['Key'], 'STANDARD_IA')
elif age_days > 30 and obj['StorageClass'] != 'STANDARD_IA':
self._transition(s3_client, bucket, obj['Key'], 'STANDARD_IA')
def _transition(self, client, bucket, key, tier):
client.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
StorageClass=tier,
)
File Optimization
class FileOptimizer:
"""Optimize file sizes for cost and performance"""
def __init__(self, spark):
self.spark = spark
def compact_files(self, path: str, target_size_mb: int = 256):
"""Compact small files into larger ones"""
self.spark.sql(f"""
OPTIMIZE delta.`{path}`
ZORDER BY (date)
""")
def analyze_file_sizes(self, path: str) -> dict:
"""Analyze file size distribution"""
files = self.spark._jvm.org.apache.hadoop.fs.Path(path)
return {
"total_files": self.spark.read.parquet(path).inputFiles().__len__(),
"avg_size_mb": self._get_avg_size(path),
"small_files_count": self._count_small_files(path, threshold_mb=64),
}
4. Snowflake/BigQuery Cost Optimization
Warehouse Sizing
class WarehouseOptimizer:
"""Right-size data warehouse compute"""
def analyze_query_patterns(self, warehouse_metrics: list) -> dict:
"""Analyze query patterns to right-size warehouse"""
avg_queue_time = sum(m['queue_time_ms'] for m in warehouse_metrics) / len(warehouse_metrics)
avg_execution_time = sum(m['execution_time_ms'] for m in warehouse_metrics) / len(warehouse_metrics)
if avg_queue_time > 5000: # >5s queue time
recommendation = "SCALE_UP"
elif avg_execution_time < 1000 and avg_queue_time < 1000:
recommendation = "SCALE_DOWN"
else:
recommendation = "HOLD"
return {
"current_size": warehouse_metrics[0]['size'],
"avg_queue_time_ms": avg_queue_time,
"avg_execution_time_ms": avg_execution_time,
"recommendation": recommendation,
"estimated_savings": self._estimate_savings(recommendation)
}
def auto_suspend_config(self, idle_minutes: int = 5):
"""Configure auto-suspend to save costs"""
return {
"auto_suspend": idle_minutes,
"auto_resume": True,
"min_cluster_count": 1,
"max_cluster_count": 4,
"scaling_policy": "ECONOMY",
}
5. Cost Monitoring & Alerts
class CostMonitor:
"""Monitor and alert on cost anomalies"""
def __init__(self):
self.budgets = {}
self.alerts = []
def set_budget(self, service: str, monthly_limit: float):
self.budgets[service] = monthly_limit
def check_spend(self, service: str, current_spend: float) -> dict:
budget = self.budgets.get(service, float('inf'))
usage_percent = (current_spend / budget) * 100
if usage_percent > 90:
self.alerts.append({
"service": service,
"severity": "CRITICAL",
"message": f"Cost at {usage_percent:.1f}% of budget"
})
elif usage_percent > 75:
self.alerts.append({
"service": service,
"severity": "WARNING",
"message": f"Cost at {usage_percent:.1f}% of budget"
})
return {
"service": service,
"budget": budget,
"current_spend": current_spend,
"remaining": budget - current_spend,
"usage_percent": usage_percent,
}
def generate_cost_report(self) -> dict:
return {
"total_spend": sum(self.budgets.values()),
"alerts": self.alerts,
"top_services": self._get_top_services(),
"optimization_opportunities": self._find_optimizations(),
}
βΉοΈ
Best Practice: Implement cost tagging from day one. Tag all resources with team, project, and environment. This enables chargeback and helps identify waste.
Follow-Up Questions
- How would you reduce Spark job costs by 50% without changing business logic?
- Design a cost allocation system for a multi-tenant data platform.
- How do you choose between spot and on-demand instances?
- Design a system that automatically right-sizes resources based on workload.
- How would you present cost optimization recommendations to leadership?