ML Cost Optimization
Difficulty: Senior Level | Companies: Google, Meta, Netflix, Uber, Stripe
Cost Optimization Strategies
ML infrastructure costs can be reduced by 50-70% with proper optimization.
βΉοΈ
Netflix saves $10M+ annually through intelligent spot instance usage and auto-scaling.
Cost Monitoring
# cost_monitoring.py
import time
import json
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum
class ResourceType(Enum):
GPU = "gpu"
CPU = "cpu"
MEMORY = "memory"
STORAGE = "storage"
NETWORK = "network"
@dataclass
class ResourceUsage:
resource_type: ResourceType
quantity: float
duration_hours: float
cost_per_hour: float
total_cost: float
@dataclass
class MLPipelineCost:
pipeline_id: str
run_id: str
start_time: str
end_time: str
resources: List[ResourceUsage]
total_cost: float
class CostTracker:
def __init__(self):
self.pipeline_costs: List[MLPipelineCost] = []
self.cost_by_model: Dict[str, float] = {}
self.cost_by_team: Dict[str, float] = {}
def record_pipeline_cost(self, cost: MLPipelineCost):
self.pipeline_costs.append(cost)
model_name = cost.pipeline_id.split("-")[0]
self.cost_by_model[model_name] = self.cost_by_model.get(model_name, 0) + cost.total_cost
def calculate_gpu_cost(self, gpu_hours: float, gpu_type: str = "A100") -> float:
gpu_prices = {"A100": 3.0, "V100": 2.0, "T4": 0.5, "K80": 0.3}
return gpu_hours * gpu_prices.get(gpu_type, 1.0)
def get_cost_summary(self, days: int = 30) -> Dict:
cutoff = datetime.now() - timedelta(days=days)
recent_costs = [
c for c in self.pipeline_costs
if datetime.fromisoformat(c.start_time) > cutoff
]
total_cost = sum(c.total_cost for c in recent_costs)
avg_cost_per_run = total_cost / max(1, len(recent_costs))
return {
"total_cost": total_cost,
"num_runs": len(recent_costs),
"avg_cost_per_run": avg_cost_per_run,
"cost_by_model": self.cost_by_model,
"daily_average": total_cost / days
}
def identify_waste(self) -> List[Dict]:
waste_items = []
for cost in self.pipeline_costs:
for resource in cost.resources:
if resource.resource_type == ResourceType.GPU and resource.duration_hours < 1:
waste_items.append({
"pipeline_id": cost.pipeline_id,
"resource": resource.resource_type.value,
"wasted_hours": 1 - resource.duration_hours,
"potential_savings": (1 - resource.duration_hours) * resource.cost_per_hour
})
return waste_items
# Usage
tracker = CostTracker()
cost = MLPipelineCost(
pipeline_id="churn-training-run1",
run_id="run-001",
start_time="2024-01-15T10:00:00",
end_time="2024-01-15T12:30:00",
resources=[
ResourceUsage(ResourceType.GPU, 1, 2.5, 3.0, 7.5),
ResourceUsage(ResourceType.CPU, 4, 2.5, 0.1, 1.0),
ResourceUsage(ResourceType.MEMORY, 16, 2.5, 0.02, 0.8),
],
total_cost=9.3
)
tracker.record_pipeline_cost(cost)
print(tracker.get_cost_summary())
Spot Instance Strategy
# spot_strategy.py
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
import random
class InstanceStatus(Enum):
RUNNING = "running"
INTERRUPTED = "interrupted"
TERMINATED = "terminated"
@dataclass
class SpotConfig:
instance_type: str
max_price: float
availability_zone: str
interruption_rate: float
class SpotInstanceManager:
def __init__(self):
self.instances: List[Dict] = []
self.spot_configs: Dict[str, SpotConfig] = {
"p3.2xlarge": SpotConfig("p3.2xlarge", 1.50, "us-east-1a", 0.1),
"p3.8xlarge": SpotConfig("p3.8xlarge", 6.0, "us-east-1a", 0.15),
"g4dn.xlarge": SpotConfig("g4dn.xlarge", 0.50, "us-east-1a", 0.05),
}
def get_spot_recommendation(self, workload_type: str, budget: float) -> Optional[SpotConfig]:
recommendations = []
for name, config in self.spot_configs.items():
if config.max_price <= budget:
score = (1 - config.interruption_rate) * (1 / config.max_price)
recommendations.append((score, config))
if recommendations:
recommendations.sort(key=lambda x: x[0], reverse=True)
return recommendations[0][1]
return None
def estimate_cost_savings(self, on_demand_hours: float, instance_type: str) -> Dict:
config = self.spot_configs.get(instance_type)
if not config:
return {"error": "Unknown instance type"}
on_demand_price = config.max_price * 2
spot_price = config.max_price
on_demand_cost = on_demand_hours * on_demand_price
spot_cost = on_demand_hours * spot_price
interruption_cost = on_demand_hours * config.interruption_rate * on_demand_price
return {
"on_demand_cost": on_demand_cost,
"spot_cost": spot_cost + interruption_cost,
"savings": on_demand_cost - (spot_cost + interruption_cost),
"savings_percentage": ((on_demand_cost - (spot_cost + interruption_cost)) / on_demand_cost) * 100
}
manager = SpotInstanceManager()
recommendation = manager.get_spot_recommendation("training", budget=2.0)
if recommendation:
print(f"Recommended: {recommendation.instance_type} at ${recommendation.max_price}/hr")
savings = manager.estimate_cost_savings(100, "p3.2xlarge")
print(f"Potential savings: ${savings['savings']:.2f} ({savings['savings_percentage']:.1f}%)")
Auto-Scaling Configuration
# autoscaling-config.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: ml-server-hpa
namespace: production
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: ml-server
minReplicas: 2
maxReplicas: 20
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Pods
value: 4
periodSeconds: 60
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 60
- type: Pods
pods:
metric:
name: inference_queue_length
target:
type: AverageValue
averageValue: "10"
Follow-Up Questions
- How do you implement cost allocation tags for ML workloads?
- What are the trade-offs between spot and on-demand instances for training?
- How would you optimize costs for batch inference workloads?
- What metrics should drive auto-scaling decisions for ML serving?