Building a Production RAG Pipeline
LangChain + Pinecone + GPT-4 | Enterprise-Grade Implementation
Project Overview
Problem Statement
Traditional LLMs are limited by their training data cutoff and cannot access proprietary or real-time information. Organizations need systems that can combine the reasoning capabilities of LLMs with accurate, up-to-date information retrieval from their internal knowledge bases.
Objectives
- Build an end-to-end RAG pipeline that ingests, indexes, and retrieves documents
- Implement hybrid search (semantic + keyword) for optimal retrieval
- Create a production-ready API with streaming responses
- Deploy with monitoring, evaluation, and continuous improvement loops
- Achieve sub-200ms retrieval latency at scale
Tech Stack
| Component | Technology |
|---|---|
| Orchestration | LangChain 0.2+ |
| Vector Store | Pinecone (serverless) |
| LLM | GPT-4 / GPT-4-Turbo |
| Embeddings | OpenAI text-embedding-3-large |
| API Framework | FastAPI |
| Containerization | Docker + Docker Compose |
| Monitoring | LangSmith + Prometheus |
| Experiment Tracking | MLflow |
Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β RAG Pipeline Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββ ββββββββββββββββ ββββββββββββββββββββ β
β β Document βββββΆβ Chunking & βββββΆβ Embedding β β
β β Ingest β β Processing β β Generation β β
β ββββββββββββ ββββββββββββββββ ββββββββββ¬ββββββββββ β
β β β
β βΌ β
β ββββββββββββ ββββββββββββββββ ββββββββββββββββββββ β
β β User βββββΆβ Query βββββΆβ Pinecone Vector β β
β β Query β β Processing β β Store β β
β ββββββββββββ ββββββββββββββββ ββββββββββ¬ββββββββββ β
β β β
β βΌ β
β ββββββββββββ ββββββββββββββββ ββββββββββββββββββββ β
β β Response ββββββ LLM ββββββ Context β β
β β Streaming β β Generation β β Assembly β β
β ββββββββββββ ββββββββββββββββ ββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Step-by-Step Implementation
Step 1: Environment Setup
# Create project directory
mkdir rag-pipeline && cd rag-pipeline
# Initialize Python project
python -m venv venv
source venv/bin/activate # Linux/Mac
# venv\Scripts\activate # Windows
# Install dependencies
pip install langchain langchain-openai langchain-pinecone pinecone-client
pip install fastapi uvicorn python-dotenv pydantic
pip install tiktoken unstructured pdfplumber
pip install mlflow langsmith
pip install docker prometheus-client
# Create project structure
mkdir -p src/{ingestion,retrieval,api,monitoring}
mkdir -p tests/{unit,integration}
mkdir -p configs data docs
touch src/__init__.py src/ingestion/__init__.py src/retrieval/__init__.py
touch src/api/__init__.py src/monitoring/__init__.py
Step 2: Configuration Management
# src/config.py
from pydantic_settings import BaseSettings
from typing import Optional
from functools import lru_cache
class Settings(BaseSettings):
# OpenAI
openai_api_key: str
openai_model: str = "gpt-4-turbo-preview"
embedding_model: str = "text-embedding-3-large"
embedding_dimensions: int = 3072
# Pinecone
pinecone_api_key: str
pinecone_index_name: str = "rag-production"
pinecone_namespace: str = "default"
# Chunking
chunk_size: int = 1000
chunk_overlap: int = 200
chunking_strategy: str = "recursive" # recursive, semantic, markdown
# Retrieval
top_k: int = 5
score_threshold: float = 0.7
rerank_top_k: int = 3
# API
api_host: str = "0.0.0.0"
api_port: int = 8000
max_concurrent_requests: int = 100
# Monitoring
langchain_tracing_v2: bool = True
langchain_project: str = "rag-pipeline-prod"
mlflow_tracking_uri: str = "http://localhost:5000"
# Rate Limiting
rate_limit_per_minute: int = 60
class Config:
env_file = ".env"
case_sensitive = False
@lru_cache()
def get_settings() -> Settings:
return Settings()
Step 3: Document Ingestion Pipeline
# src/ingestion/document_loader.py
from langchain_community.document_loaders import (
PyPDFLoader,
UnstructuredMarkdownLoader,
TextLoader,
CSVLoader,
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_pinecone import PineconeVectorStore
from pinecone import Pinecone, ServerlessSpec
from typing import List, Optional, Dict, Any
from dataclasses import dataclass
import hashlib
import logging
logger = logging.getLogger(__name__)
@dataclass
class DocumentMetadata:
source: str
page_number: Optional[int]
section: Optional[str]
doc_hash: str
ingestion_date: str
document_type: str
class DocumentIngestionPipeline:
"""Production document ingestion with deduplication and chunking."""
LOADER_MAP = {
".pdf": PyPDFLoader,
".md": UnstructuredMarkdownLoader,
".txt": TextLoader,
".csv": CSVLoader,
}
def __init__(self, settings):
self.settings = settings
self.embeddings = OpenAIEmbeddings(
model=settings.embedding_model,
dimensions=settings.embedding_dimensions,
)
self._init_pinecone()
def _init_pinecone(self):
"""Initialize Pinecone index with optimal configuration."""
self.pc = Pinecone(api_key=self.settings.pinecone_api_key)
existing_indexes = [idx.name for idx in self.pc.list_indexes()]
if self.settings.pinecone_index_name not in existing_indexes:
self.pc.create_index(
name=self.settings.pinecone_index_name,
dimension=self.settings.embedding_dimensions,
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1",
),
)
logger.info(f"Created Pinecone index: {self.settings.pinecone_index_name}")
self.vector_store = PineconeVectorStore(
index_name=self.settings.pinecone_index_name,
embedding=self.embeddings,
namespace=self.settings.pinecone_namespace,
)
def _compute_hash(self, content: str) -> str:
return hashlib.sha256(content.encode()).hexdigest()
def _get_text_splitter(self):
"""Return appropriate text splitter based on configuration."""
if self.settings.chunking_strategy == "recursive":
return RecursiveCharacterTextSplitter(
chunk_size=self.settings.chunk_size,
chunk_overlap=self.settings.chunk_overlap,
length_function=len,
separators=["\n\n", "\n", ". ", " ", ""],
)
elif self.settings.chunking_strategy == "markdown":
from langchain.text_splitter import MarkdownHeaderTextSplitter
headers = [
("#", "h1"),
("##", "h2"),
("###", "h3"),
]
return MarkdownHeaderTextSplitter(headers_to_split_on=headers)
else:
return RecursiveCharacterTextSplitter(
chunk_size=self.settings.chunk_size,
chunk_overlap=self.settings.chunk_overlap,
)
def load_documents(self, file_paths: List[str]) -> List:
"""Load documents from various file types."""
documents = []
for path in file_paths:
suffix = "." + path.rsplit(".", 1)[-1].lower()
loader_cls = self.LOADER_MAP.get(suffix)
if not loader_cls:
logger.warning(f"Unsupported file type: {suffix} for {path}")
continue
try:
loader = loader_cls(path)
docs = loader.load()
for doc in docs:
doc.metadata["doc_hash"] = self._compute_hash(doc.page_content)
doc.metadata["source"] = path
doc.metadata["document_type"] = suffix
documents.extend(docs)
logger.info(f"Loaded {len(docs)} documents from {path}")
except Exception as e:
logger.error(f"Error loading {path}: {e}")
continue
return documents
def process_and_index(
self,
file_paths: List[str],
batch_size: int = 100,
) -> Dict[str, Any]:
"""Full ingestion pipeline: load, split, embed, index."""
# Load raw documents
raw_docs = self.load_documents(file_paths)
logger.info(f"Loaded {len(raw_docs)} raw documents")
# Split into chunks
text_splitter = self._get_text_splitter()
chunks = text_splitter.split_documents(raw_docs)
logger.info(f"Created {len(chunks)} chunks")
# Deduplicate by hash
seen_hashes = set()
unique_chunks = []
for chunk in chunks:
chunk_hash = chunk.metadata.get("doc_hash", "")
if chunk_hash not in seen_hashes:
seen_hashes.add(chunk_hash)
unique_chunks.append(chunk)
logger.info(f"After deduplication: {len(unique_chunks)} unique chunks")
# Batch embed and upsert to Pinecone
for i in range(0, len(unique_chunks), batch_size):
batch = unique_chunks[i : i + batch_size]
self.vector_store.add_documents(batch)
logger.info(
f"Indexed batch {i // batch_size + 1}: "
f"documents {i + 1}-{min(i + batch_size, len(unique_chunks))}"
)
stats = {
"total_raw": len(raw_docs),
"total_chunks": len(chunks),
"unique_chunks": len(unique_chunks),
"batches_indexed": (len(unique_chunks) + batch_size - 1) // batch_size,
}
logger.info(f"Ingestion complete: {stats}")
return stats
Step 4: Query Processing and Retrieval
# src/retrieval/retriever.py
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_pinecone import PineconeVectorStore
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import CrossEncoderReranker
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
from langchain.prompts import ChatPromptTemplate
from langchain.schema import Document
from typing import List, Dict, Any, Optional, AsyncGenerator
from dataclasses import dataclass
import time
import logging
logger = logging.getLogger(__name__)
@dataclass
class RetrievalResult:
documents: List[Document]
scores: List[float]
query_time_ms: float
total_results: int
class ProductionRetriever:
"""Multi-strategy retriever with reranking and filtering."""
SYSTEM_PROMPT = """You are a helpful assistant that answers questions based on the provided context.
If the context does not contain enough information to answer the question, say so clearly.
Always cite the source document when possible.
Context:
{context}
Answer the user's question based on the above context. Be concise and accurate."""
def __init__(self, settings):
self.settings = settings
self.embeddings = OpenAIEmbeddings(
model=settings.embedding_model,
dimensions=settings.embedding_dimensions,
)
self.vector_store = PineconeVectorStore(
index_name=settings.pinecone_index_name,
embedding=self.embeddings,
namespace=settings.pinecone_namespace,
)
self.llm = ChatOpenAI(
model=settings.openai_model,
temperature=0.0,
streaming=True,
)
self._setup_retriever()
def _setup_retriever(self):
"""Configure retriever with reranking."""
base_retriever = self.vector_store.as_retriever(
search_type="similarity",
search_kwargs={
"k": self.settings.top_k,
"score_threshold": self.settings.score_threshold,
},
)
# Optional: Cross-encoder reranking
cross_encoder = HuggingFaceCrossEncoder(model_name="BAAI/bge-reranker-base")
compressor = CrossEncoderReranker(
model=cross_encoder,
top_n=self.settings.rerank_top_k,
)
self.retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=base_retriever,
)
def _format_context(self, documents: List[Document]) -> str:
"""Format retrieved documents into context string."""
context_parts = []
for i, doc in enumerate(documents, 1):
source = doc.metadata.get("source", "Unknown")
page = doc.metadata.get("page_number", "N/A")
context_parts.append(
f"[Document {i}] (Source: {source}, Page: {page})\n{doc.page_content}"
)
return "\n\n".join(context_parts)
def retrieve(self, query: str) -> RetrievalResult:
"""Synchronous retrieval with metadata."""
start = time.time()
documents = self.retriever.invoke(query)
scores = [] # Pinecone returns scores in metadata
query_time = (time.time() - start) * 1000
return RetrievalResult(
documents=documents,
scores=scores,
query_time_ms=query_time,
total_results=len(documents),
)
async def aretrieve(self, query: str) -> RetrievalResult:
"""Async retrieval."""
start = time.time()
documents = await self.retriever.ainvoke(query)
query_time = (time.time() - start) * 1000
return RetrievalResult(
documents=documents,
scores=[],
query_time_ms=query_time,
total_results=len(documents),
)
async def agenerate(
self, query: str, chat_history: Optional[List[Dict]] = None
) -> AsyncGenerator[str, None]:
"""Streaming RAG generation."""
# Retrieve relevant documents
retrieval_result = await self.aretrieve(query)
context = self._format_context(retrieval_result.documents)
# Build prompt
messages = [
("system", self.SYSTEM_PROMPT.format(context=context)),
]
if chat_history:
messages.extend(
[("human", msg["human"]) if "human" in msg else ("ai", msg["ai"])
for msg in chat_history[-5:]] # Last 5 exchanges
)
messages.append(("human", query))
prompt = ChatPromptTemplate.from_messages(messages)
# Stream response
chain = prompt | self.llm
async for chunk in chain.astream({}):
if chunk.content:
yield chunk.content
def generate(
self, query: str, chat_history: Optional[List[Dict]] = None
) -> Dict[str, Any]:
"""Synchronous RAG generation with full response."""
retrieval_result = self.retrieve(query)
context = self._format_context(retrieval_result.documents)
messages = [
("system", self.SYSTEM_PROMPT.format(context=context)),
]
if chat_history:
for msg in chat_history[-5:]:
if "human" in msg:
messages.append(("human", msg["human"]))
if "ai" in msg:
messages.append(("ai", msg["ai"]))
messages.append(("human", query))
prompt = ChatPromptTemplate.from_messages(messages)
chain = prompt | self.llm
response = chain.invoke({})
return {
"answer": response.content,
"sources": [
{
"content": doc.page_content[:200],
"source": doc.metadata.get("source"),
"page": doc.metadata.get("page_number"),
}
for doc in retrieval_result.documents
],
"retrieval_time_ms": retrieval_result.query_time_ms,
}
Step 5: FastAPI Application
# src/api/main.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from contextlib import asynccontextmanager
import uvicorn
import logging
from src.config import get_settings
from src.retrieval.retriever import ProductionRetriever
from src.ingestion.document_loader import DocumentIngestionPipeline
from src.monitoring.metrics import MetricsCollector
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifecycle management."""
settings = get_settings()
app.state.retriever = ProductionRetriever(settings)
app.state.ingestion = DocumentIngestionPipeline(settings)
app.state.metrics = MetricsCollector()
logger.info("RAG Pipeline initialized")
yield
logger.info("RAG Pipeline shutting down")
app = FastAPI(
title="Production RAG API",
version="1.0.0",
description="Enterprise RAG pipeline with LangChain + Pinecone + GPT-4",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class QueryRequest(BaseModel):
query: str = Field(..., min_length=1, max_length=2000)
chat_history: Optional[List[Dict[str, str]]] = None
top_k: Optional[int] = Field(5, ge=1, le=20)
class QueryResponse(BaseModel):
answer: str
sources: List[Dict[str, Any]]
retrieval_time_ms: float
total_tokens: Optional[int] = None
class IngestRequest(BaseModel):
file_paths: List[str]
@app.post("/api/v1/query", response_model=QueryResponse)
async def query_rag(request: QueryRequest):
"""Query the RAG pipeline."""
try:
result = await app.state.retriever.agenerate(
query=request.query,
chat_history=request.chat_history,
)
app.state.metrics.record_query(
retrieval_time_ms=result["retrieval_time_ms"],
num_sources=len(result["sources"]),
)
return QueryResponse(**result)
except Exception as e:
logger.error(f"Query error: {e}")
app.state.metrics.record_error("query_error")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/v1/query/stream")
async def query_rag_stream(request: QueryRequest):
"""Stream RAG response."""
async def generate():
async for chunk in app.state.retriever.agenerate(
query=request.query,
chat_history=request.chat_history,
):
yield f"data: {chunk}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
@app.post("/api/v1/ingest")
async def ingest_documents(request: IngestRequest, background_tasks: BackgroundTasks):
"""Ingest documents into the vector store."""
background_tasks.add_task(
app.state.ingestion.process_and_index,
request.file_paths,
)
return {"status": "ingestion_started", "file_count": len(request.file_paths)}
@app.get("/api/v1/health")
async def health_check():
return {"status": "healthy", "version": "1.0.0"}
@app.get("/api/v1/metrics")
async def get_metrics():
return app.state.metrics.get_summary()
if __name__ == "__main__":
uvicorn.run("src.api.main:app", host="0.0.0.0", port=8000, reload=True)
Step 6: MLflow Experiment Tracking
# src/monitoring/experiment_tracking.py
import mlflow
import mlflow.openai
from typing import Dict, Any, Optional
from dataclasses import dataclass
import time
@dataclass
class ExperimentConfig:
experiment_name: str = "rag-pipeline"
run_name: str = "production-run"
tracking_uri: str = "http://localhost:5000"
class RAGExperimentTracker:
"""Track RAG pipeline experiments with MLflow."""
def __init__(self, config: ExperimentConfig):
mlflow.set_tracking_uri(config.tracking_uri)
mlflow.set_experiment(config.experiment_name)
self.config = config
def log_retrieval_experiment(
self,
query: str,
retrieved_docs: list,
scores: list,
ground_truth: Optional[str] = None,
parameters: Dict[str, Any] = None,
):
with mlflow.start_run(run_name=self.config.run_name):
# Log parameters
if parameters:
mlflow.log_params(parameters)
mlflow.log_param("query", query[:500])
mlflow.log_param("num_retrieved", len(retrieved_docs))
mlflow.log_param("top_score", max(scores) if scores else 0)
# Log retrieval metrics
if scores:
mlflow.log_metric("avg_retrieval_score", sum(scores) / len(scores))
mlflow.log_metric("max_retrieval_score", max(scores))
mlflow.log_metric("min_retrieval_score", min(scores))
# Log documents as artifacts
for i, doc in enumerate(retrieved_docs):
mlflow.log_text(
doc.page_content,
f"retrieved_docs/doc_{i}.txt",
)
return mlflow.active_run().info.run_id
def log_generation_experiment(
self,
query: str,
response: str,
context: str,
latency_ms: float,
token_usage: Dict[str, int],
):
with mlflow.start_run(run_name=f"{self.config.run_name}-generation"):
mlflow.log_param("query_length", len(query))
mlflow.log_param("context_length", len(context))
mlflow.log_param("response_length", len(response))
mlflow.log_metric("latency_ms", latency_ms)
mlflow.log_metric("total_tokens", token_usage.get("total_tokens", 0))
mlflow.log_metric("prompt_tokens", token_usage.get("prompt_tokens", 0))
mlflow.log_metric("completion_tokens", token_usage.get("completion_tokens", 0))
# Log artifacts
mlflow.log_text(query, "query.txt")
mlflow.log_text(response, "response.txt")
mlflow.log_text(context, "context.txt")
return mlflow.active_run().info.run_id
def evaluate_retrieval(
self,
predictions: list,
ground_truth: list,
metric: str = "hit_rate",
) -> float:
"""Evaluate retrieval quality."""
if metric == "hit_rate":
hits = sum(
1 for pred, gt in zip(predictions, ground_truth)
if any(p in gt for p in pred)
)
score = hits / len(ground_truth) if ground_truth else 0
elif metric == "mrr":
# Mean Reciprocal Rank
rr_sum = 0
for pred_list, gt in zip(predictions, ground_truth):
for rank, pred in enumerate(pred_list, 1):
if pred in gt:
rr_sum += 1 / rank
break
score = rr_sum / len(ground_truth) if ground_truth else 0
else:
raise ValueError(f"Unknown metric: {metric}")
with mlflow.start_run(run_name=f"eval-{metric}"):
mlflow.log_metric(f"retrieval_{metric}", score)
mlflow.log_param("eval_metric", metric)
mlflow.log_param("num_samples", len(ground_truth))
return score
Step 7: Docker Deployment
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
build-essential \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "src.api.main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'
services:
rag-api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- PINECONE_API_KEY=${PINECONE_API_KEY}
- PINECONE_INDEX_NAME=rag-production
volumes:
- ./data:/app/data
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/api/v1/health"]
interval: 30s
timeout: 10s
retries: 3
mlflow:
image: ghcr.io/mlflow/mlflow:v2.10.0
ports:
- "5000:5000"
command: mlflow server --host 0.0.0.0 --port 5000
volumes:
- mlflow-data:/mlflow
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./configs/prometheus.yml:/etc/prometheus/prometheus.yml
volumes:
mlflow-data:
Step 8: Evaluation Framework
# src/monitoring/evaluation.py
from typing import List, Dict, Any
from dataclasses import dataclass
import numpy as np
@dataclass
class EvaluationResult:
metric_name: str
score: float
details: Dict[str, Any]
class RAGEvaluator:
"""Comprehensive RAG evaluation suite."""
def evaluate_answer_relevance(
self,
query: str,
answer: str,
llm,
) -> EvaluationResult:
"""Evaluate how relevant the answer is to the query."""
prompt = f"""Rate the relevance of the answer to the query on a scale of 1-5.
Query: {query}
Answer: {answer}
Rating (1-5):"""
response = llm.invoke(prompt)
try:
score = float(response.content.strip()) / 5.0
except ValueError:
score = 0.0
return EvaluationResult(
metric_name="answer_relevance",
score=score,
details={"query": query, "answer_preview": answer[:200]},
)
def evaluate_context_relevance(
self,
query: str,
contexts: List[str],
llm,
) -> EvaluationResult:
"""Evaluate how relevant contexts are to the query."""
scores = []
for ctx in contexts:
prompt = f"""Rate how relevant this context is to the query on a scale of 1-5.
Query: {query}
Context: {ctx[:500]}
Rating (1-5):"""
response = llm.invoke(prompt)
try:
scores.append(float(response.content.strip()) / 5.0)
except ValueError:
scores.append(0.0)
avg_score = np.mean(scores) if scores else 0.0
return EvaluationResult(
metric_name="context_relevance",
score=avg_score,
details={"num_contexts": len(contexts), "individual_scores": scores},
)
def evaluate_faithfulness(
self,
answer: str,
contexts: List[str],
llm,
) -> EvaluationResult:
"""Evaluate if the answer is grounded in the provided contexts."""
prompt = f"""Determine if the answer is fully supported by the contexts.
Answer: {answer}
Contexts: {" ".join(contexts[:3])}
Is the answer fully supported? (yes/no):"""
response = llm.invoke(prompt)
score = 1.0 if "yes" in response.content.lower() else 0.0
return EvaluationResult(
metric_name="faithfulness",
score=score,
details={"supported": score == 1.0},
)
def run_full_evaluation(
self,
test_cases: List[Dict[str, Any]],
retriever,
llm,
) -> List[EvaluationResult]:
"""Run complete evaluation on test cases."""
results = []
for case in test_cases:
query = case["query"]
result = retriever.generate(query)
results.append(
self.evaluate_answer_relevance(query, result["answer"], llm)
)
results.append(
self.evaluate_context_relevance(
query,
[s["content"] for s in result["sources"]],
llm,
)
)
results.append(
self.evaluate_faithfulness(
result["answer"],
[s["content"] for s in result["sources"]],
llm,
)
)
return results
Performance Metrics and Evaluation
| Metric | Target | Description |
|---|---|---|
| Retrieval Latency | < 200ms | P95 latency for vector search |
| End-to-End Latency | < 2s | Full pipeline response time |
| Hit Rate@5 | > 0.85 | Relevant doc in top 5 results |
| MRR | > 0.75 | Mean Reciprocal Rank |
| Faithfulness | > 0.90 | Answer grounded in context |
| Answer Relevance | > 4.0/5 | LLM-judged relevance score |
βΉοΈ
Always set up a golden dataset of at least 100 query-document pairs before deploying to production. Use human-annotated ground truth for reliable evaluation metrics.
π‘
Implement a feedback loop where users can thumbs-up/down responses. This data is invaluable for continuous improvement and fine-tuning your retrieval strategies.
Interview Talking Points
-
Chunking Strategy: Why recursive character splitting over fixed-size? Trade-offs between semantic and fixed chunking for different document types.
-
Hybrid Search: Combining dense embeddings with BM25 sparse retrieval improves recall by 15-25% for keyword-heavy queries.
-
Reranking: Cross-encoder reranking after initial retrieval improves precision but adds latency. The optimal setup uses a lightweight reranker for real-time and a heavier one for batch.
-
Evaluation: The RAG triad (answer relevance, context relevance, faithfulness) provides a comprehensive quality framework without requiring human annotation.
-
Production Concerns: Rate limiting, token budget management, caching frequent queries, and handling concurrent requests are critical for production deployment.
-
Scaling: Pinecone serverless handles scaling automatically, but you should implement connection pooling and consider regional deployment for latency.
β οΈ
Never store API keys in code. Always use environment variables or a secrets manager like AWS Secrets Manager or HashiCorp Vault.
Advanced Optimizations
Query Routing
from langchain.chains import RouterChain
from langchain.chains.router import MultiRouteChain
class QueryRouter:
"""Route different query types to specialized retrievers."""
def __init__(self):
self.routes = {
"factual": self.factual_retriever,
"summarization": self.summary_retriever,
"code": self.code_retriever,
}
def route(self, query: str) -> str:
"""Classify query and route to appropriate retriever."""
if any(kw in query.lower() for kw in ["code", "function", "api", "implement"]):
return "code"
elif any(kw in query.lower() for kw in ["summarize", "summary", "overview"]):
return "summarization"
return "factual"
Caching Layer
from functools import lru_cache
from hashlib import md5
import redis
class QueryCache:
"""Redis-based query caching for repeated queries."""
def __init__(self, redis_url: str, ttl: int = 3600):
self.redis = redis.from_url(redis_url)
self.ttl = ttl
def get(self, query: str) -> Optional[dict]:
key = f"rag:{md5(query.encode()).hexdigest()}"
cached = self.redis.get(key)
return json.loads(cached) if cached else None
def set(self, query: str, result: dict):
key = f"rag:{md5(query.encode()).hexdigest()}"
self.redis.setex(key, self.ttl, json.dumps(result))
βΉοΈ
This project demonstrates a complete production RAG pipeline. For the full implementation with all edge cases and optimizations, refer to the accompanying repository.