Building Production LLM Applications
Moving from prototype to production requires careful consideration of reliability, cost, latency, and user experience. This tutorial covers the full stack of production LLM application development.
End-to-End Architecture
A system architecture that includes API layers, LLM inference, caching, rate limiting, monitoring, evaluation, and fallback mechanisms to deliver reliable, cost-effective AI-powered features.
Core Components
User Request
|
v
[API Gateway] -> Rate Limiting, Authentication
|
v
[Cache Layer] -> Response Cache, Embedding Cache
|
v
[Orchestration] -> Prompt Management, Routing
|
v
[LLM Inference] -> Model Selection, Fallbacks
|
v
[Post-Processing] -> Output Validation, Formatting
|
v
[Monitoring] -> Latency, Cost, Quality Metrics
API Design
OpenAI-Compatible API
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List, Optional, Union
import time
import hashlib
app = FastAPI(title="LLM API")
class ChatMessage(BaseModel):
role: str = Field(..., pattern="^(system|user|assistant)$")
content: str
class ChatCompletionRequest(BaseModel):
model: str
messages: List[ChatMessage]
temperature: Optional[float] = Field(0.7, ge=0, le=2)
max_tokens: Optional[int] = Field(100, ge=1, le=4096)
top_p: Optional[float] = Field(1.0, ge=0, le=1)
stream: Optional[bool] = False
class Usage(BaseModel):
prompt_tokens: int
completion_tokens: int
total_tokens: int
class ChatCompletionResponse(BaseModel):
id: str
object: str = "chat.completion"
created: int
model: str
choices: List[dict]
usage: Usage
@app.post("/v1/chat/completions", response_model=ChatCompletionResponse)
async def create_chat_completion(request: ChatCompletionRequest):
# Generate response
response_text = await generate_response(request)
# Calculate usage
prompt_tokens = count_tokens(request.messages)
completion_tokens = count_tokens(response_text)
total_tokens = prompt_tokens + completion_tokens
return ChatCompletionResponse(
id=f"chatcmpl-{hashlib.md5(str(time.time()).encode()).hexdigest()[:8]}",
created=int(time.time()),
model=request.model,
choices=[{
"index": 0,
"message": {"role": "assistant", "content": response_text},
"finish_reason": "stop"
}],
usage=Usage(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens
)
)
Rate Limiting
from collections import defaultdict
import asyncio
from datetime import datetime, timedelta
class RateLimiter:
def __init__(self, requests_per_minute: int = 60, tokens_per_minute: int = 100000):
self.rpm = requests_per_minute
self.tpm = tokens_per_minute
self.request_counts = defaultdict(list)
self.token_counts = defaultdict(list)
def _clean_old_entries(self, key: str):
now = datetime.now()
cutoff = now - timedelta(minutes=1)
self.request_counts[key] = [t for t in self.request_counts[key] if t > cutoff]
self.token_counts[key] = [(t, c) for t, c in self.token_counts[key] if t > cutoff]
def check_rate_limit(self, api_key: str, estimated_tokens: int = 100) -> dict:
self._clean_old_entries(api_key)
request_count = len(self.request_counts[api_key])
token_count = sum(c for _, c in self.token_counts[api_key])
allowed = (
request_count < self.rpm and
token_count + estimated_tokens <= self.tpm
)
return {
"allowed": allowed,
"requests_remaining": max(0, self.rpm - request_count),
"tokens_remaining": max(0, self.tpm - token_count - estimated_tokens),
"retry_after": 60 if not allowed else 0
}
def record_usage(self, api_key: str, tokens_used: int):
now = datetime.now()
self.request_counts[api_key].append(now)
self.token_counts[api_key].append((now, tokens_used))
rate_limiter = RateLimiter(requests_per_minute=60, tokens_per_minute=100000)
@app.middleware("http")
async def rate_limit_middleware(request, call_next):
api_key = request.headers.get("Authorization", "").replace("Bearer ", "")
if api_key:
limit_check = rate_limiter.check_rate_limit(api_key)
if not limit_check["allowed"]:
raise HTTPException(
status_code=429,
detail="Rate limit exceeded",
headers={
"Retry-After": str(limit_check["retry_after"]),
"X-RateLimit-Remaining-Requests": str(limit_check["requests_remaining"]),
"X-RateLimit-Remaining-Tokens": str(limit_check["tokens_remaining"])
}
)
response = await call_next(request)
return response
Caching
import redis
import json
import hashlib
from typing import Optional
class LLMCache:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
self.default_ttl = 3600 # 1 hour
def _make_key(self, request: ChatCompletionRequest) -> str:
content = json.dumps({
"model": request.model,
"messages": [m.dict() for m in request.messages],
"temperature": request.temperature,
"max_tokens": request.max_tokens
}, sort_keys=True)
return f"llm:cache:{hashlib.sha256(content.encode()).hexdigest()}"
def get(self, request: ChatCompletionRequest) -> Optional[dict]:
key = self._make_key(request)
cached = self.redis.get(key)
if cached:
return json.loads(cached)
return None
def set(self, request: ChatCompletionRequest, response: dict, ttl: int = None):
key = self._make_key(request)
self.redis.setex(
key,
ttl or self.default_ttl,
json.dumps(response)
)
def invalidate_pattern(self, pattern: str):
keys = self.redis.keys(f"llm:cache:{pattern}")
if keys:
self.redis.delete(*keys)
cache = LLMCache()
@app.post("/v1/chat/completions/cached")
async def create_cached_completion(request: ChatCompletionRequest):
# Check cache (only for non-streaming, deterministic requests)
if not request.stream and request.temperature < 0.1:
cached = cache.get(request)
if cached:
return cached
# Generate response
response = await generate_response(request)
# Cache deterministic responses
if not request.stream and request.temperature < 0.1:
cache.set(request, response)
return response
Cost Optimization
Cost Per Token Calculation
Here,
- =
- =
- =
- =
class CostTracker:
def __init__(self):
self.pricing = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-3.5-sonnet": {"input": 3.00, "output": 15.00},
"llama-3.1-8b": {"input": 0.05, "output": 0.20},
"llama-3.1-70b": {"input": 0.30, "output": 1.20}
}
self.total_cost = 0.0
self.token_counts = {"input": 0, "output": 0}
def calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
if model not in self.pricing:
return 0.0
pricing = self.pricing[model]
cost = (input_tokens * pricing["input"] + output_tokens * pricing["output"]) / 1000000
self.total_cost += cost
self.token_counts["input"] += input_tokens
self.token_counts["output"] += output_tokens
return cost
def get_monthly_projection(self, daily_avg_cost: float) -> float:
return daily_avg_cost * 30
def optimize_model_selection(self, task_complexity: str) -> str:
recommendations = {
"simple": "llama-3.1-8b",
"moderate": "gpt-4o-mini",
"complex": "gpt-4o",
"critical": "claude-3.5-sonnet"
}
return recommendations.get(task_complexity, "gpt-4o-mini")
cost_tracker = CostTracker()
Latency Monitoring
import time
from dataclasses import dataclass
from typing import List
import statistics
@dataclass
class LatencyRecord:
endpoint: str
model: str
first_token_latency: float
total_latency: float
tokens_generated: int
timestamp: float
class LatencyMonitor:
def __init__(self):
self.records: List[LatencyRecord] = []
def record(self, endpoint: str, model: str, first_token: float,
total: float, tokens: int):
self.records.append(LatencyRecord(
endpoint=endpoint,
model=model,
first_token_latency=first_token,
total_latency=total,
tokens_generated=tokens,
timestamp=time.time()
))
def get_stats(self, model: str = None) -> dict:
filtered = self.records
if model:
filtered = [r for r in self.records if r.model == model]
if not filtered:
return {}
ttft_values = [r.first_token_latency for r in filtered]
total_values = [r.total_latency for r in filtered]
tps_values = [r.tokens_generated / r.total_latency for r in filtered if r.total_latency > 0]
return {
"count": len(filtered),
"ttft_p50": statistics.median(ttft_values),
"ttft_p95": sorted(ttft_values)[int(len(ttft_values) * 0.95)],
"ttft_p99": sorted(ttft_values)[int(len(ttft_values) * 0.99)],
"total_latency_p50": statistics.median(total_values),
"tokens_per_second_median": statistics.median(tps_values),
"tokens_per_second_p95": sorted(tps_values)[int(len(tps_values) * 0.95)] if tps_values else 0
}
latency_monitor = LatencyMonitor()
Full RAG Application
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
class ProductionRAG:
def __init__(self, model_name: str = "llama-3.1-8b"):
self.embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
self.vectorstore = Chroma(
embedding_function=self.embeddings,
persist_directory="./chroma_db"
)
self.llm = self._load_model(model_name)
self.cache = LLMCache()
self.cost_tracker = CostTracker()
self.latency_monitor = LatencyMonitor()
def _load_model(self, model_name):
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
model = AutoModelForCausalLM.from_pretrained(
model_name, device_map="auto", torch_dtype="auto"
)
tokenizer = AutoTokenizer.from_pretrained(model_name)
return pipeline("text-generation", model=model, tokenizer=tokenizer)
def ingest_documents(self, documents: list, chunk_size: int = 1000):
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=200,
length_function=len
)
chunks = text_splitter.split_documents(documents)
self.vectorstore.add_documents(chunks)
return len(chunks)
def query(self, question: str, k: int = 3) -> dict:
start_time = time.time()
# Retrieve relevant documents
docs = self.vectorstore.similarity_search(question, k=k)
context = "\n\n".join([doc.page_content for doc in docs])
# Generate answer
prompt = PromptTemplate(
template="""Answer the question based on the context below.
Context: {context}
Question: {question}
Answer:""",
input_variables=["context", "question"]
)
response = self.llm(
prompt.format(context=context, question=question),
max_new_tokens=512,
temperature=0.7
)
answer = response[0]["generated_text"]
total_time = time.time() - start_time
# Track metrics
input_tokens = len(prompt.format(context=context, question=question).split())
output_tokens = len(answer.split())
cost = self.cost_tracker.calculate_cost("llama-3.1-8b", input_tokens, output_tokens)
return {
"answer": answer,
"sources": [doc.metadata for doc in docs],
"latency": total_time,
"cost": cost
}
# Initialize production RAG
rag = ProductionRAG("meta-llama/Llama-3.1-8B-Instruct")
@app.post("/rag/query")
async def rag_query(question: str):
result = rag.query(question)
return result
@app.post("/rag/ingest")
async def ingest_documents(documents: list):
count = rag.ingest_documents(documents)
return {"chunks_ingested": count}
Evaluation in Production
class ProductionEvaluator:
def __init__(self, rag_system):
self.rag = rag_system
self.feedback_data = []
def log_feedback(self, query: str, response: str, rating: int, feedback: str = ""):
self.feedback_data.append({
"query": query,
"response": response,
"rating": rating,
"feedback": feedback,
"timestamp": time.time()
})
def compute_metrics(self) -> dict:
if not self.feedback_data:
return {}
ratings = [d["rating"] for d in self.feedback_data]
return {
"average_rating": statistics.mean(ratings),
"rating_distribution": {
i: ratings.count(i) for i in range(1, 6)
},
"total_feedback": len(self.feedback_data),
"positive_rate": sum(1 for r in ratings if r >= 4) / len(ratings)
}
def a_b_test(self, query: str, model_a, model_b) -> dict:
response_a = model_a.query(query)
response_b = model_b.query(query)
return {
"query": query,
"response_a": response_a,
"response_b": response_b,
"latency_a": response_a["latency"],
"latency_b": response_b["latency"],
"cost_a": response_a["cost"],
"cost_b": response_b["cost"]
}
In production, log everything: user queries, model responses, latency, cost, and user feedback. This data is invaluable for identifying issues, optimizing costs, and improving quality over time.
Summary
- Production LLM apps require API design, rate limiting, caching, monitoring, and evaluation
- OpenAI-compatible APIs provide standard interfaces for LLM services
- Rate limiting prevents abuse; token-based limits are more accurate than request counts
- Caching deterministic responses (low temperature) significantly reduces costs
- Cost per token = (prompt_tokens x input_cost) + (generated_tokens x output_cost)
- Latency monitoring should track TTFT and tokens-per-second separately
- RAG applications need document ingestion, vector search, and generation pipelines
- Production evaluation combines automated metrics with human feedback
Practice Exercises
-
API Implementation: Build a complete OpenAI-compatible API with rate limiting and caching. Test with 100 concurrent requests.
-
Cost Optimization: Implement model routing that selects the cheapest model capable of handling each query. Compare total costs.
-
Caching Strategy: Implement semantic caching that matches similar queries. Measure cache hit rate and cost savings.
-
RAG Pipeline: Build a production RAG system with document ingestion, chunking, and retrieval. Evaluate on 50 test queries.
-
Monitoring Dashboard: Create a monitoring dashboard that tracks latency, cost, and quality metrics in real-time.
Previous: 24 - Scaling Laws & Chinchilla <- | See also: 11 - Retrieval Augmented Generation <-