Building Production LLM Applications

ProductionDeploymentFree Lesson

Advertisement

Building Production LLM Applications

Moving from prototype to production requires careful consideration of reliability, cost, latency, and user experience. This tutorial covers the full stack of production LLM application development.

End-to-End Architecture

A system architecture that includes API layers, LLM inference, caching, rate limiting, monitoring, evaluation, and fallback mechanisms to deliver reliable, cost-effective AI-powered features.

Core Components

Architecture Diagram
User Request
    |
    v
[API Gateway] -> Rate Limiting, Authentication
    |
    v
[Cache Layer] -> Response Cache, Embedding Cache
    |
    v
[Orchestration] -> Prompt Management, Routing
    |
    v
[LLM Inference] -> Model Selection, Fallbacks
    |
    v
[Post-Processing] -> Output Validation, Formatting
    |
    v
[Monitoring] -> Latency, Cost, Quality Metrics

API Design

OpenAI-Compatible API

from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List, Optional, Union
import time
import hashlib

app = FastAPI(title="LLM API")

class ChatMessage(BaseModel):
    role: str = Field(..., pattern="^(system|user|assistant)$")
    content: str

class ChatCompletionRequest(BaseModel):
    model: str
    messages: List[ChatMessage]
    temperature: Optional[float] = Field(0.7, ge=0, le=2)
    max_tokens: Optional[int] = Field(100, ge=1, le=4096)
    top_p: Optional[float] = Field(1.0, ge=0, le=1)
    stream: Optional[bool] = False

class Usage(BaseModel):
    prompt_tokens: int
    completion_tokens: int
    total_tokens: int

class ChatCompletionResponse(BaseModel):
    id: str
    object: str = "chat.completion"
    created: int
    model: str
    choices: List[dict]
    usage: Usage

@app.post("/v1/chat/completions", response_model=ChatCompletionResponse)
async def create_chat_completion(request: ChatCompletionRequest):
    # Generate response
    response_text = await generate_response(request)

    # Calculate usage
    prompt_tokens = count_tokens(request.messages)
    completion_tokens = count_tokens(response_text)
    total_tokens = prompt_tokens + completion_tokens

    return ChatCompletionResponse(
        id=f"chatcmpl-{hashlib.md5(str(time.time()).encode()).hexdigest()[:8]}",
        created=int(time.time()),
        model=request.model,
        choices=[{
            "index": 0,
            "message": {"role": "assistant", "content": response_text},
            "finish_reason": "stop"
        }],
        usage=Usage(
            prompt_tokens=prompt_tokens,
            completion_tokens=completion_tokens,
            total_tokens=total_tokens
        )
    )

Rate Limiting

from collections import defaultdict
import asyncio
from datetime import datetime, timedelta

class RateLimiter:
    def __init__(self, requests_per_minute: int = 60, tokens_per_minute: int = 100000):
        self.rpm = requests_per_minute
        self.tpm = tokens_per_minute
        self.request_counts = defaultdict(list)
        self.token_counts = defaultdict(list)

    def _clean_old_entries(self, key: str):
        now = datetime.now()
        cutoff = now - timedelta(minutes=1)
        self.request_counts[key] = [t for t in self.request_counts[key] if t > cutoff]
        self.token_counts[key] = [(t, c) for t, c in self.token_counts[key] if t > cutoff]

    def check_rate_limit(self, api_key: str, estimated_tokens: int = 100) -> dict:
        self._clean_old_entries(api_key)

        request_count = len(self.request_counts[api_key])
        token_count = sum(c for _, c in self.token_counts[api_key])

        allowed = (
            request_count < self.rpm and
            token_count + estimated_tokens <= self.tpm
        )

        return {
            "allowed": allowed,
            "requests_remaining": max(0, self.rpm - request_count),
            "tokens_remaining": max(0, self.tpm - token_count - estimated_tokens),
            "retry_after": 60 if not allowed else 0
        }

    def record_usage(self, api_key: str, tokens_used: int):
        now = datetime.now()
        self.request_counts[api_key].append(now)
        self.token_counts[api_key].append((now, tokens_used))

rate_limiter = RateLimiter(requests_per_minute=60, tokens_per_minute=100000)

@app.middleware("http")
async def rate_limit_middleware(request, call_next):
    api_key = request.headers.get("Authorization", "").replace("Bearer ", "")

    if api_key:
        limit_check = rate_limiter.check_rate_limit(api_key)
        if not limit_check["allowed"]:
            raise HTTPException(
                status_code=429,
                detail="Rate limit exceeded",
                headers={
                    "Retry-After": str(limit_check["retry_after"]),
                    "X-RateLimit-Remaining-Requests": str(limit_check["requests_remaining"]),
                    "X-RateLimit-Remaining-Tokens": str(limit_check["tokens_remaining"])
                }
            )

    response = await call_next(request)
    return response

Caching

import redis
import json
import hashlib
from typing import Optional

class LLMCache:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self.default_ttl = 3600  # 1 hour

    def _make_key(self, request: ChatCompletionRequest) -> str:
        content = json.dumps({
            "model": request.model,
            "messages": [m.dict() for m in request.messages],
            "temperature": request.temperature,
            "max_tokens": request.max_tokens
        }, sort_keys=True)
        return f"llm:cache:{hashlib.sha256(content.encode()).hexdigest()}"

    def get(self, request: ChatCompletionRequest) -> Optional[dict]:
        key = self._make_key(request)
        cached = self.redis.get(key)
        if cached:
            return json.loads(cached)
        return None

    def set(self, request: ChatCompletionRequest, response: dict, ttl: int = None):
        key = self._make_key(request)
        self.redis.setex(
            key,
            ttl or self.default_ttl,
            json.dumps(response)
        )

    def invalidate_pattern(self, pattern: str):
        keys = self.redis.keys(f"llm:cache:{pattern}")
        if keys:
            self.redis.delete(*keys)

cache = LLMCache()

@app.post("/v1/chat/completions/cached")
async def create_cached_completion(request: ChatCompletionRequest):
    # Check cache (only for non-streaming, deterministic requests)
    if not request.stream and request.temperature < 0.1:
        cached = cache.get(request)
        if cached:
            return cached

    # Generate response
    response = await generate_response(request)

    # Cache deterministic responses
    if not request.stream and request.temperature < 0.1:
        cache.set(request, response)

    return response

Cost Optimization

Cost Per Token Calculation

textCost=(PtimesCp)+(GtimesCg)\\text{Cost} = (P \\times C_p) + (G \\times C_g)

Here,

  • =
  • =
  • =
  • =
class CostTracker:
    def __init__(self):
        self.pricing = {
            "gpt-4o": {"input": 2.50, "output": 10.00},
            "gpt-4o-mini": {"input": 0.15, "output": 0.60},
            "claude-3.5-sonnet": {"input": 3.00, "output": 15.00},
            "llama-3.1-8b": {"input": 0.05, "output": 0.20},
            "llama-3.1-70b": {"input": 0.30, "output": 1.20}
        }
        self.total_cost = 0.0
        self.token_counts = {"input": 0, "output": 0}

    def calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
        if model not in self.pricing:
            return 0.0

        pricing = self.pricing[model]
        cost = (input_tokens * pricing["input"] + output_tokens * pricing["output"]) / 1000000

        self.total_cost += cost
        self.token_counts["input"] += input_tokens
        self.token_counts["output"] += output_tokens

        return cost

    def get_monthly_projection(self, daily_avg_cost: float) -> float:
        return daily_avg_cost * 30

    def optimize_model_selection(self, task_complexity: str) -> str:
        recommendations = {
            "simple": "llama-3.1-8b",
            "moderate": "gpt-4o-mini",
            "complex": "gpt-4o",
            "critical": "claude-3.5-sonnet"
        }
        return recommendations.get(task_complexity, "gpt-4o-mini")

cost_tracker = CostTracker()

Latency Monitoring

import time
from dataclasses import dataclass
from typing import List
import statistics

@dataclass
class LatencyRecord:
    endpoint: str
    model: str
    first_token_latency: float
    total_latency: float
    tokens_generated: int
    timestamp: float

class LatencyMonitor:
    def __init__(self):
        self.records: List[LatencyRecord] = []

    def record(self, endpoint: str, model: str, first_token: float,
               total: float, tokens: int):
        self.records.append(LatencyRecord(
            endpoint=endpoint,
            model=model,
            first_token_latency=first_token,
            total_latency=total,
            tokens_generated=tokens,
            timestamp=time.time()
        ))

    def get_stats(self, model: str = None) -> dict:
        filtered = self.records
        if model:
            filtered = [r for r in self.records if r.model == model]

        if not filtered:
            return {}

        ttft_values = [r.first_token_latency for r in filtered]
        total_values = [r.total_latency for r in filtered]
        tps_values = [r.tokens_generated / r.total_latency for r in filtered if r.total_latency > 0]

        return {
            "count": len(filtered),
            "ttft_p50": statistics.median(ttft_values),
            "ttft_p95": sorted(ttft_values)[int(len(ttft_values) * 0.95)],
            "ttft_p99": sorted(ttft_values)[int(len(ttft_values) * 0.99)],
            "total_latency_p50": statistics.median(total_values),
            "tokens_per_second_median": statistics.median(tps_values),
            "tokens_per_second_p95": sorted(tps_values)[int(len(tps_values) * 0.95)] if tps_values else 0
        }

latency_monitor = LatencyMonitor()

Full RAG Application

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate

class ProductionRAG:
    def __init__(self, model_name: str = "llama-3.1-8b"):
        self.embeddings = HuggingFaceEmbeddings(
            model_name="sentence-transformers/all-MiniLM-L6-v2"
        )
        self.vectorstore = Chroma(
            embedding_function=self.embeddings,
            persist_directory="./chroma_db"
        )
        self.llm = self._load_model(model_name)
        self.cache = LLMCache()
        self.cost_tracker = CostTracker()
        self.latency_monitor = LatencyMonitor()

    def _load_model(self, model_name):
        from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
        model = AutoModelForCausalLM.from_pretrained(
            model_name, device_map="auto", torch_dtype="auto"
        )
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        return pipeline("text-generation", model=model, tokenizer=tokenizer)

    def ingest_documents(self, documents: list, chunk_size: int = 1000):
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=200,
            length_function=len
        )
        chunks = text_splitter.split_documents(documents)
        self.vectorstore.add_documents(chunks)
        return len(chunks)

    def query(self, question: str, k: int = 3) -> dict:
        start_time = time.time()

        # Retrieve relevant documents
        docs = self.vectorstore.similarity_search(question, k=k)
        context = "\n\n".join([doc.page_content for doc in docs])

        # Generate answer
        prompt = PromptTemplate(
            template="""Answer the question based on the context below.
Context: {context}
Question: {question}
Answer:""",
            input_variables=["context", "question"]
        )

        response = self.llm(
            prompt.format(context=context, question=question),
            max_new_tokens=512,
            temperature=0.7
        )

        answer = response[0]["generated_text"]
        total_time = time.time() - start_time

        # Track metrics
        input_tokens = len(prompt.format(context=context, question=question).split())
        output_tokens = len(answer.split())
        cost = self.cost_tracker.calculate_cost("llama-3.1-8b", input_tokens, output_tokens)

        return {
            "answer": answer,
            "sources": [doc.metadata for doc in docs],
            "latency": total_time,
            "cost": cost
        }

# Initialize production RAG
rag = ProductionRAG("meta-llama/Llama-3.1-8B-Instruct")

@app.post("/rag/query")
async def rag_query(question: str):
    result = rag.query(question)
    return result

@app.post("/rag/ingest")
async def ingest_documents(documents: list):
    count = rag.ingest_documents(documents)
    return {"chunks_ingested": count}

Evaluation in Production

class ProductionEvaluator:
    def __init__(self, rag_system):
        self.rag = rag_system
        self.feedback_data = []

    def log_feedback(self, query: str, response: str, rating: int, feedback: str = ""):
        self.feedback_data.append({
            "query": query,
            "response": response,
            "rating": rating,
            "feedback": feedback,
            "timestamp": time.time()
        })

    def compute_metrics(self) -> dict:
        if not self.feedback_data:
            return {}

        ratings = [d["rating"] for d in self.feedback_data]
        return {
            "average_rating": statistics.mean(ratings),
            "rating_distribution": {
                i: ratings.count(i) for i in range(1, 6)
            },
            "total_feedback": len(self.feedback_data),
            "positive_rate": sum(1 for r in ratings if r >= 4) / len(ratings)
        }

    def a_b_test(self, query: str, model_a, model_b) -> dict:
        response_a = model_a.query(query)
        response_b = model_b.query(query)

        return {
            "query": query,
            "response_a": response_a,
            "response_b": response_b,
            "latency_a": response_a["latency"],
            "latency_b": response_b["latency"],
            "cost_a": response_a["cost"],
            "cost_b": response_b["cost"]
        }

In production, log everything: user queries, model responses, latency, cost, and user feedback. This data is invaluable for identifying issues, optimizing costs, and improving quality over time.

Summary

  • Production LLM apps require API design, rate limiting, caching, monitoring, and evaluation
  • OpenAI-compatible APIs provide standard interfaces for LLM services
  • Rate limiting prevents abuse; token-based limits are more accurate than request counts
  • Caching deterministic responses (low temperature) significantly reduces costs
  • Cost per token = (prompt_tokens x input_cost) + (generated_tokens x output_cost)
  • Latency monitoring should track TTFT and tokens-per-second separately
  • RAG applications need document ingestion, vector search, and generation pipelines
  • Production evaluation combines automated metrics with human feedback

Practice Exercises

  1. API Implementation: Build a complete OpenAI-compatible API with rate limiting and caching. Test with 100 concurrent requests.

  2. Cost Optimization: Implement model routing that selects the cheapest model capable of handling each query. Compare total costs.

  3. Caching Strategy: Implement semantic caching that matches similar queries. Measure cache hit rate and cost savings.

  4. RAG Pipeline: Build a production RAG system with document ingestion, chunking, and retrieval. Evaluate on 50 test queries.

  5. Monitoring Dashboard: Create a monitoring dashboard that tracks latency, cost, and quality metrics in real-time.


Previous: 24 - Scaling Laws & Chinchilla <- | See also: 11 - Retrieval Augmented Generation <-

Advertisement

Need Expert LLM Help?

Get personalized tutoring, RAG system design, or production LLM consulting.

Advertisement