πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

ML Cost Optimization

MLOpsCost Management⭐ Premium

Advertisement

ML Cost Optimization

Difficulty: Senior Level | Companies: Google, Meta, Netflix, Uber, Stripe

Cost Optimization Strategies

ML infrastructure costs can be reduced by 50-70% with proper optimization.

ℹ️

Netflix saves $10M+ annually through intelligent spot instance usage and auto-scaling.

Cost Monitoring

# cost_monitoring.py
import time
import json
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum

class ResourceType(Enum):
    GPU = "gpu"
    CPU = "cpu"
    MEMORY = "memory"
    STORAGE = "storage"
    NETWORK = "network"

@dataclass
class ResourceUsage:
    resource_type: ResourceType
    quantity: float
    duration_hours: float
    cost_per_hour: float
    total_cost: float

@dataclass
class MLPipelineCost:
    pipeline_id: str
    run_id: str
    start_time: str
    end_time: str
    resources: List[ResourceUsage]
    total_cost: float

class CostTracker:
    def __init__(self):
        self.pipeline_costs: List[MLPipelineCost] = []
        self.cost_by_model: Dict[str, float] = {}
        self.cost_by_team: Dict[str, float] = {}

    def record_pipeline_cost(self, cost: MLPipelineCost):
        self.pipeline_costs.append(cost)
        model_name = cost.pipeline_id.split("-")[0]
        self.cost_by_model[model_name] = self.cost_by_model.get(model_name, 0) + cost.total_cost

    def calculate_gpu_cost(self, gpu_hours: float, gpu_type: str = "A100") -> float:
        gpu_prices = {"A100": 3.0, "V100": 2.0, "T4": 0.5, "K80": 0.3}
        return gpu_hours * gpu_prices.get(gpu_type, 1.0)

    def get_cost_summary(self, days: int = 30) -> Dict:
        cutoff = datetime.now() - timedelta(days=days)
        recent_costs = [
            c for c in self.pipeline_costs
            if datetime.fromisoformat(c.start_time) > cutoff
        ]
        total_cost = sum(c.total_cost for c in recent_costs)
        avg_cost_per_run = total_cost / max(1, len(recent_costs))
        return {
            "total_cost": total_cost,
            "num_runs": len(recent_costs),
            "avg_cost_per_run": avg_cost_per_run,
            "cost_by_model": self.cost_by_model,
            "daily_average": total_cost / days
        }

    def identify_waste(self) -> List[Dict]:
        waste_items = []
        for cost in self.pipeline_costs:
            for resource in cost.resources:
                if resource.resource_type == ResourceType.GPU and resource.duration_hours < 1:
                    waste_items.append({
                        "pipeline_id": cost.pipeline_id,
                        "resource": resource.resource_type.value,
                        "wasted_hours": 1 - resource.duration_hours,
                        "potential_savings": (1 - resource.duration_hours) * resource.cost_per_hour
                    })
        return waste_items


# Usage
tracker = CostTracker()
cost = MLPipelineCost(
    pipeline_id="churn-training-run1",
    run_id="run-001",
    start_time="2024-01-15T10:00:00",
    end_time="2024-01-15T12:30:00",
    resources=[
        ResourceUsage(ResourceType.GPU, 1, 2.5, 3.0, 7.5),
        ResourceUsage(ResourceType.CPU, 4, 2.5, 0.1, 1.0),
        ResourceUsage(ResourceType.MEMORY, 16, 2.5, 0.02, 0.8),
    ],
    total_cost=9.3
)
tracker.record_pipeline_cost(cost)
print(tracker.get_cost_summary())

Spot Instance Strategy

# spot_strategy.py
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
import random

class InstanceStatus(Enum):
    RUNNING = "running"
    INTERRUPTED = "interrupted"
    TERMINATED = "terminated"

@dataclass
class SpotConfig:
    instance_type: str
    max_price: float
    availability_zone: str
    interruption_rate: float

class SpotInstanceManager:
    def __init__(self):
        self.instances: List[Dict] = []
        self.spot_configs: Dict[str, SpotConfig] = {
            "p3.2xlarge": SpotConfig("p3.2xlarge", 1.50, "us-east-1a", 0.1),
            "p3.8xlarge": SpotConfig("p3.8xlarge", 6.0, "us-east-1a", 0.15),
            "g4dn.xlarge": SpotConfig("g4dn.xlarge", 0.50, "us-east-1a", 0.05),
        }

    def get_spot_recommendation(self, workload_type: str, budget: float) -> Optional[SpotConfig]:
        recommendations = []
        for name, config in self.spot_configs.items():
            if config.max_price <= budget:
                score = (1 - config.interruption_rate) * (1 / config.max_price)
                recommendations.append((score, config))

        if recommendations:
            recommendations.sort(key=lambda x: x[0], reverse=True)
            return recommendations[0][1]
        return None

    def estimate_cost_savings(self, on_demand_hours: float, instance_type: str) -> Dict:
        config = self.spot_configs.get(instance_type)
        if not config:
            return {"error": "Unknown instance type"}

        on_demand_price = config.max_price * 2
        spot_price = config.max_price

        on_demand_cost = on_demand_hours * on_demand_price
        spot_cost = on_demand_hours * spot_price
        interruption_cost = on_demand_hours * config.interruption_rate * on_demand_price

        return {
            "on_demand_cost": on_demand_cost,
            "spot_cost": spot_cost + interruption_cost,
            "savings": on_demand_cost - (spot_cost + interruption_cost),
            "savings_percentage": ((on_demand_cost - (spot_cost + interruption_cost)) / on_demand_cost) * 100
        }


manager = SpotInstanceManager()
recommendation = manager.get_spot_recommendation("training", budget=2.0)
if recommendation:
    print(f"Recommended: {recommendation.instance_type} at ${recommendation.max_price}/hr")

savings = manager.estimate_cost_savings(100, "p3.2xlarge")
print(f"Potential savings: ${savings['savings']:.2f} ({savings['savings_percentage']:.1f}%)")

Auto-Scaling Configuration

# autoscaling-config.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: ml-server-hpa
  namespace: production
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: ml-server
  minReplicas: 2
  maxReplicas: 20
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
        - type: Pods
          value: 4
          periodSeconds: 60
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
        - type: Percent
          value: 10
          periodSeconds: 60
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 60
    - type: Pods
      pods:
        metric:
          name: inference_queue_length
        target:
          type: AverageValue
          averageValue: "10"

Follow-Up Questions

  1. How do you implement cost allocation tags for ML workloads?
  2. What are the trade-offs between spot and on-demand instances for training?
  3. How would you optimize costs for batch inference workloads?
  4. What metrics should drive auto-scaling decisions for ML serving?

Advertisement