AI Operations (MLOps)
LLMOps Pipeline
LLMOps extends MLOps with specific concerns for large language models: prompt management, token efficiency, cost optimization, and safety guardrails.
Model Versioning
import json
import hashlib
from datetime import datetime
from typing import Dict, Optional
from dataclasses import dataclass
import os
@dataclass
class ModelVersion:
version_id: str
name: str
artifact_path: str
metrics: Dict
metadata: Dict
created_at: str
status: str = "registered"
class ModelRegistry:
def __init__(self, registry_path: str):
self.registry_path = registry_path
os.makedirs(registry_path, exist_ok=True)
def register_model(
self,
name: str,
artifact_path: str,
metrics: Dict,
metadata: Dict = None
) -> ModelVersion:
version_id = hashlib.sha256(
f"{name}-{datetime.now().isoformat()}".encode()
).hexdigest()[:12]
version = ModelVersion(
version_id=version_id,
name=name,
artifact_path=artifact_path,
metrics=metrics,
metadata=metadata or {},
created_at=datetime.now().isoformat()
)
version_path = os.path.join(self.registry_path, f"{name}_{version_id}.json")
with open(version_path, "w") as f:
json.dump(vars(version), f, indent=2)
return version
def get_version(self, name: str, version_id: str) -> Optional[ModelVersion]:
version_path = os.path.join(self.registry_path, f"{name}_{version_id}.json")
if os.path.exists(version_path):
with open(version_path) as f:
data = json.load(f)
return ModelVersion(**data)
return None
def list_versions(self, name: str) -> list:
versions = []
for filename in os.listdir(self.registry_path):
if filename.startswith(f"{name}_") and filename.endswith(".json"):
with open(os.path.join(self.registry_path, filename)) as f:
data = json.load(f)
versions.append(ModelVersion(**data))
return sorted(versions, key=lambda x: x.created_at, reverse=True)
registry = ModelRegistry("./model_registry")
version = registry.register_model(
name="my-llm",
artifact_path="./models/my-llm-v1.pt",
metrics={"accuracy": 0.92, "latency_ms": 45},
metadata={"base_model": "llama-2-7b", "epochs": 3}
)
Prompt Version Control
import hashlib
import json
from typing import Dict, List
from dataclasses import dataclass, asdict
import os
@dataclass
class PromptVersion:
prompt_id: str
name: str
template: str
variables: List[str]
version: int
metadata: Dict
class PromptManager:
def __init__(self, storage_path: str):
self.storage_path = storage_path
os.makedirs(storage_path, exist_ok=True)
self.prompts: Dict[str, List[PromptVersion]] = {}
def create_prompt(
self,
name: str,
template: str,
variables: List[str] = None
) -> PromptVersion:
if name not in self.prompts:
self.prompts[name] = []
version_num = len(self.prompts[name]) + 1
prompt_id = hashlib.sha256(f"{name}-v{version_num}".encode()).hexdigest()[:8]
prompt = PromptVersion(
prompt_id=prompt_id,
name=name,
template=template,
variables=variables or [],
version=version_num,
metadata={"created_at": datetime.now().isoformat()}
)
self.prompts[name].append(prompt)
self._save_prompt(prompt)
return prompt
def get_prompt(self, name: str, version: int = None) -> PromptVersion:
if name not in self.prompts:
raise ValueError(f"Prompt {name} not found")
if version is None:
return self.prompts[name][-1]
for p in self.prompts[name]:
if p.version == version:
return p
raise ValueError(f"Version {version} not found")
def render_prompt(self, name: str, **kwargs) -> str:
prompt = self.get_prompt(name)
rendered = prompt.template
for key, value in kwargs.items():
rendered = rendered.replace(f"{{{key}}}", str(value))
return rendered
def _save_prompt(self, prompt: PromptVersion):
path = os.path.join(
self.storage_path,
f"{prompt.name}_v{prompt.version}.json"
)
with open(path, "w") as f:
json.dump(asdict(prompt), f, indent=2)
prompt_mgr = PromptManager("./prompts")
prompt_mgr.create_prompt(
name="summarize",
template="Summarize the following text concisely:\n\n{text}",
variables=["text"]
)
rendered = prompt_mgr.render_prompt("summarize", text="Long document...")
Token Usage Tracking
from dataclasses import dataclass
from typing import Dict, List
import tiktoken
@dataclass
class TokenUsage:
prompt_tokens: int
completion_tokens: int
total_tokens: int
cost: float
class TokenTracker:
def __init__(self, pricing: Dict[str, float] = None):
self.pricing = pricing or {
"gpt-4": {"input": 0.03, "output": 0.06},
"gpt-3.5-turbo": {"input": 0.001, "output": 0.002}
}
self.usage_log: List[Dict] = []
def count_tokens(self, text: str, model: str = "gpt-4") -> int:
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))
def log_usage(
self,
model: str,
prompt: str,
response: str,
request_id: str = None
) -> TokenUsage:
prompt_tokens = self.count_tokens(prompt, model)
completion_tokens = self.count_tokens(response, model)
total_tokens = prompt_tokens + completion_tokens
cost = (
prompt_tokens * self.pricing[model]["input"] / 1000 +
completion_tokens * self.pricing[model]["output"] / 1000
)
usage = TokenUsage(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens,
cost=cost
)
self.usage_log.append({
"request_id": request_id,
"model": model,
"usage": usage,
"timestamp": datetime.now().isoformat()
})
return usage
def get_summary(self) -> Dict:
total_cost = sum(log["usage"].cost for log in self.usage_log)
total_tokens = sum(log["usage"].total_tokens for log in self.usage_log)
return {
"total_requests": len(self.usage_log),
"total_tokens": total_tokens,
"total_cost": total_cost,
"avg_tokens_per_request": total_tokens / len(self.usage_log) if self.usage_log else 0
}
tracker = TokenTracker()
usage = tracker.log_usage("gpt-4", prompt_text, response_text)
print(f"Cost: ${usage.cost:.4f}")
Model Monitoring
from collections import deque
import statistics
class LLMMonitor:
def __init__(self, window_size: int = 1000):
self.window_size = window_size
self.latencies = deque(maxlen=window_size)
self.token_counts = deque(maxlen=window_size)
self.quality_scores = deque(maxlen=window_size)
def log_request(self, latency_ms: float, tokens: int, quality_score: float = None):
self.latencies.append(latency_ms)
self.token_counts.append(tokens)
if quality_score is not None:
self.quality_scores.append(quality_score)
def get_metrics(self) -> Dict:
metrics = {
"latency": {
"mean": statistics.mean(self.latencies) if self.latencies else 0,
"p95": sorted(self.latencies)[int(len(self.latencies) * 0.95)] if self.latencies else 0,
},
"tokens": {
"mean": statistics.mean(self.token_counts) if self.token_counts else 0,
"total": sum(self.token_counts)
}
}
if self.quality_scores:
metrics["quality"] = {
"mean": statistics.mean(self.quality_scores),
"min": min(self.quality_scores)
}
return metrics
def check_alerts(self) -> List[str]:
alerts = []
metrics = self.get_metrics()
if metrics["latency"]["p95"] > 5000:
alerts.append("High latency detected (p95 > 5s)")
if metrics.get("quality", {}).get("mean", 1) < 0.7:
alerts.append("Quality score below threshold")
return alerts
monitor = LLMMonitor()
monitor.log_request(latency_ms=120, tokens=500, quality_score=0.85)
metrics = monitor.get_metrics()
alerts = monitor.check_alerts()
Best Practices
- Implement comprehensive logging for all LLM calls
- Use prompt templates with version control
- Track token usage and costs per endpoint
- Set up automated quality evaluation
- Implement rate limiting and retry logic
- Use guardrails for safety and compliance