πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

AI Operations (MLOps)

🟒 Free Lesson

Advertisement

AI Operations (MLOps)

MLOps Pipeline for GenAIData PipelineModel TrainingEvaluationDeploymentMonitoringFeedbackLLMOps Componentsβ€’ Prompt Version Controlβ€’ Chain Orchestrationβ€’ Token Managementβ€’ Cost Trackingβ€’ Guardrails IntegrationModel Registryβ€’ Version Managementβ€’ Artifact Storageβ€’ Lineage Trackingβ€’ A/B Testingβ€’ Rollback SupportProduction Monitoringβ€’ Latency Trackingβ€’ Quality Metricsβ€’ Drift Detectionβ€’ Cost Analyticsβ€’ Alert Systems

LLMOps Pipeline

LLMOps extends MLOps with specific concerns for large language models: prompt management, token efficiency, cost optimization, and safety guardrails.

Model Versioning

import json
import hashlib
from datetime import datetime
from typing import Dict, Optional
from dataclasses import dataclass
import os

@dataclass
class ModelVersion:
    version_id: str
    name: str
    artifact_path: str
    metrics: Dict
    metadata: Dict
    created_at: str
    status: str = "registered"

class ModelRegistry:
    def __init__(self, registry_path: str):
        self.registry_path = registry_path
        os.makedirs(registry_path, exist_ok=True)
    
    def register_model(
        self, 
        name: str, 
        artifact_path: str,
        metrics: Dict,
        metadata: Dict = None
    ) -> ModelVersion:
        version_id = hashlib.sha256(
            f"{name}-{datetime.now().isoformat()}".encode()
        ).hexdigest()[:12]
        
        version = ModelVersion(
            version_id=version_id,
            name=name,
            artifact_path=artifact_path,
            metrics=metrics,
            metadata=metadata or {},
            created_at=datetime.now().isoformat()
        )
        
        version_path = os.path.join(self.registry_path, f"{name}_{version_id}.json")
        with open(version_path, "w") as f:
            json.dump(vars(version), f, indent=2)
        
        return version
    
    def get_version(self, name: str, version_id: str) -> Optional[ModelVersion]:
        version_path = os.path.join(self.registry_path, f"{name}_{version_id}.json")
        
        if os.path.exists(version_path):
            with open(version_path) as f:
                data = json.load(f)
            return ModelVersion(**data)
        
        return None
    
    def list_versions(self, name: str) -> list:
        versions = []
        for filename in os.listdir(self.registry_path):
            if filename.startswith(f"{name}_") and filename.endswith(".json"):
                with open(os.path.join(self.registry_path, filename)) as f:
                    data = json.load(f)
                versions.append(ModelVersion(**data))
        return sorted(versions, key=lambda x: x.created_at, reverse=True)

registry = ModelRegistry("./model_registry")
version = registry.register_model(
    name="my-llm",
    artifact_path="./models/my-llm-v1.pt",
    metrics={"accuracy": 0.92, "latency_ms": 45},
    metadata={"base_model": "llama-2-7b", "epochs": 3}
)

Prompt Version Control

import hashlib
import json
from typing import Dict, List
from dataclasses import dataclass, asdict
import os

@dataclass
class PromptVersion:
    prompt_id: str
    name: str
    template: str
    variables: List[str]
    version: int
    metadata: Dict

class PromptManager:
    def __init__(self, storage_path: str):
        self.storage_path = storage_path
        os.makedirs(storage_path, exist_ok=True)
        self.prompts: Dict[str, List[PromptVersion]] = {}
    
    def create_prompt(
        self, 
        name: str, 
        template: str,
        variables: List[str] = None
    ) -> PromptVersion:
        if name not in self.prompts:
            self.prompts[name] = []
        
        version_num = len(self.prompts[name]) + 1
        prompt_id = hashlib.sha256(f"{name}-v{version_num}".encode()).hexdigest()[:8]
        
        prompt = PromptVersion(
            prompt_id=prompt_id,
            name=name,
            template=template,
            variables=variables or [],
            version=version_num,
            metadata={"created_at": datetime.now().isoformat()}
        )
        
        self.prompts[name].append(prompt)
        self._save_prompt(prompt)
        
        return prompt
    
    def get_prompt(self, name: str, version: int = None) -> PromptVersion:
        if name not in self.prompts:
            raise ValueError(f"Prompt {name} not found")
        
        if version is None:
            return self.prompts[name][-1]
        
        for p in self.prompts[name]:
            if p.version == version:
                return p
        
        raise ValueError(f"Version {version} not found")
    
    def render_prompt(self, name: str, **kwargs) -> str:
        prompt = self.get_prompt(name)
        
        rendered = prompt.template
        for key, value in kwargs.items():
            rendered = rendered.replace(f"{{{key}}}", str(value))
        
        return rendered
    
    def _save_prompt(self, prompt: PromptVersion):
        path = os.path.join(
            self.storage_path,
            f"{prompt.name}_v{prompt.version}.json"
        )
        with open(path, "w") as f:
            json.dump(asdict(prompt), f, indent=2)

prompt_mgr = PromptManager("./prompts")
prompt_mgr.create_prompt(
    name="summarize",
    template="Summarize the following text concisely:\n\n{text}",
    variables=["text"]
)
rendered = prompt_mgr.render_prompt("summarize", text="Long document...")

Token Usage Tracking

from dataclasses import dataclass
from typing import Dict, List
import tiktoken

@dataclass
class TokenUsage:
    prompt_tokens: int
    completion_tokens: int
    total_tokens: int
    cost: float

class TokenTracker:
    def __init__(self, pricing: Dict[str, float] = None):
        self.pricing = pricing or {
            "gpt-4": {"input": 0.03, "output": 0.06},
            "gpt-3.5-turbo": {"input": 0.001, "output": 0.002}
        }
        self.usage_log: List[Dict] = []
    
    def count_tokens(self, text: str, model: str = "gpt-4") -> int:
        encoding = tiktoken.encoding_for_model(model)
        return len(encoding.encode(text))
    
    def log_usage(
        self, 
        model: str,
        prompt: str,
        response: str,
        request_id: str = None
    ) -> TokenUsage:
        prompt_tokens = self.count_tokens(prompt, model)
        completion_tokens = self.count_tokens(response, model)
        total_tokens = prompt_tokens + completion_tokens
        
        cost = (
            prompt_tokens * self.pricing[model]["input"] / 1000 +
            completion_tokens * self.pricing[model]["output"] / 1000
        )
        
        usage = TokenUsage(
            prompt_tokens=prompt_tokens,
            completion_tokens=completion_tokens,
            total_tokens=total_tokens,
            cost=cost
        )
        
        self.usage_log.append({
            "request_id": request_id,
            "model": model,
            "usage": usage,
            "timestamp": datetime.now().isoformat()
        })
        
        return usage
    
    def get_summary(self) -> Dict:
        total_cost = sum(log["usage"].cost for log in self.usage_log)
        total_tokens = sum(log["usage"].total_tokens for log in self.usage_log)
        
        return {
            "total_requests": len(self.usage_log),
            "total_tokens": total_tokens,
            "total_cost": total_cost,
            "avg_tokens_per_request": total_tokens / len(self.usage_log) if self.usage_log else 0
        }

tracker = TokenTracker()
usage = tracker.log_usage("gpt-4", prompt_text, response_text)
print(f"Cost: ${usage.cost:.4f}")

Model Monitoring

from collections import deque
import statistics

class LLMMonitor:
    def __init__(self, window_size: int = 1000):
        self.window_size = window_size
        self.latencies = deque(maxlen=window_size)
        self.token_counts = deque(maxlen=window_size)
        self.quality_scores = deque(maxlen=window_size)
    
    def log_request(self, latency_ms: float, tokens: int, quality_score: float = None):
        self.latencies.append(latency_ms)
        self.token_counts.append(tokens)
        if quality_score is not None:
            self.quality_scores.append(quality_score)
    
    def get_metrics(self) -> Dict:
        metrics = {
            "latency": {
                "mean": statistics.mean(self.latencies) if self.latencies else 0,
                "p95": sorted(self.latencies)[int(len(self.latencies) * 0.95)] if self.latencies else 0,
            },
            "tokens": {
                "mean": statistics.mean(self.token_counts) if self.token_counts else 0,
                "total": sum(self.token_counts)
            }
        }
        
        if self.quality_scores:
            metrics["quality"] = {
                "mean": statistics.mean(self.quality_scores),
                "min": min(self.quality_scores)
            }
        
        return metrics
    
    def check_alerts(self) -> List[str]:
        alerts = []
        metrics = self.get_metrics()
        
        if metrics["latency"]["p95"] > 5000:
            alerts.append("High latency detected (p95 > 5s)")
        
        if metrics.get("quality", {}).get("mean", 1) < 0.7:
            alerts.append("Quality score below threshold")
        
        return alerts

monitor = LLMMonitor()
monitor.log_request(latency_ms=120, tokens=500, quality_score=0.85)
metrics = monitor.get_metrics()
alerts = monitor.check_alerts()

Best Practices

  • Implement comprehensive logging for all LLM calls
  • Use prompt templates with version control
  • Track token usage and costs per endpoint
  • Set up automated quality evaluation
  • Implement rate limiting and retry logic
  • Use guardrails for safety and compliance
⭐

Premium Content

AI Operations (MLOps)

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Generative AI Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement