πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Building a Production RAG Pipeline (LangChain + Pinecone + GPT-4)

AI/ML ProjectsRetrieval-Augmented Generation⭐ Premium

Advertisement

Building a Production RAG Pipeline

LangChain + Pinecone + GPT-4 | Enterprise-Grade Implementation

Advanced12+ HoursProduction-Ready

Project Overview

Problem Statement

Traditional LLMs are limited by their training data cutoff and cannot access proprietary or real-time information. Organizations need systems that can combine the reasoning capabilities of LLMs with accurate, up-to-date information retrieval from their internal knowledge bases.

Objectives

  • Build an end-to-end RAG pipeline that ingests, indexes, and retrieves documents
  • Implement hybrid search (semantic + keyword) for optimal retrieval
  • Create a production-ready API with streaming responses
  • Deploy with monitoring, evaluation, and continuous improvement loops
  • Achieve sub-200ms retrieval latency at scale

Tech Stack

ComponentTechnology
OrchestrationLangChain 0.2+
Vector StorePinecone (serverless)
LLMGPT-4 / GPT-4-Turbo
EmbeddingsOpenAI text-embedding-3-large
API FrameworkFastAPI
ContainerizationDocker + Docker Compose
MonitoringLangSmith + Prometheus
Experiment TrackingMLflow

Architecture Diagram

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    RAG Pipeline Architecture                β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚ Document  │───▢│  Chunking &  │───▢│   Embedding      β”‚  β”‚
β”‚  β”‚  Ingest   β”‚    β”‚ Processing   β”‚    β”‚   Generation     β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚                                                β”‚            β”‚
β”‚                                                β–Ό            β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚  User     │───▢│  Query       │───▢│  Pinecone Vector β”‚  β”‚
β”‚  β”‚  Query    β”‚    β”‚  Processing  β”‚    β”‚     Store        β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚                                                β”‚            β”‚
β”‚                                                β–Ό            β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚ Response  │◀───│  LLM         │◀───│  Context         β”‚  β”‚
β”‚  β”‚ Streaming β”‚    β”‚  Generation  β”‚    β”‚  Assembly        β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Step-by-Step Implementation

Step 1: Environment Setup

# Create project directory
mkdir rag-pipeline && cd rag-pipeline

# Initialize Python project
python -m venv venv
source venv/bin/activate  # Linux/Mac
# venv\Scripts\activate   # Windows

# Install dependencies
pip install langchain langchain-openai langchain-pinecone pinecone-client
pip install fastapi uvicorn python-dotenv pydantic
pip install tiktoken unstructured pdfplumber
pip install mlflow langsmith
pip install docker prometheus-client

# Create project structure
mkdir -p src/{ingestion,retrieval,api,monitoring}
mkdir -p tests/{unit,integration}
mkdir -p configs data docs
touch src/__init__.py src/ingestion/__init__.py src/retrieval/__init__.py
touch src/api/__init__.py src/monitoring/__init__.py

Step 2: Configuration Management

# src/config.py
from pydantic_settings import BaseSettings
from typing import Optional
from functools import lru_cache

class Settings(BaseSettings):
    # OpenAI
    openai_api_key: str
    openai_model: str = "gpt-4-turbo-preview"
    embedding_model: str = "text-embedding-3-large"
    embedding_dimensions: int = 3072

    # Pinecone
    pinecone_api_key: str
    pinecone_index_name: str = "rag-production"
    pinecone_namespace: str = "default"

    # Chunking
    chunk_size: int = 1000
    chunk_overlap: int = 200
    chunking_strategy: str = "recursive"  # recursive, semantic, markdown

    # Retrieval
    top_k: int = 5
    score_threshold: float = 0.7
    rerank_top_k: int = 3

    # API
    api_host: str = "0.0.0.0"
    api_port: int = 8000
    max_concurrent_requests: int = 100

    # Monitoring
    langchain_tracing_v2: bool = True
    langchain_project: str = "rag-pipeline-prod"
    mlflow_tracking_uri: str = "http://localhost:5000"

    # Rate Limiting
    rate_limit_per_minute: int = 60

    class Config:
        env_file = ".env"
        case_sensitive = False

@lru_cache()
def get_settings() -> Settings:
    return Settings()

Step 3: Document Ingestion Pipeline

# src/ingestion/document_loader.py
from langchain_community.document_loaders import (
    PyPDFLoader,
    UnstructuredMarkdownLoader,
    TextLoader,
    CSVLoader,
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_pinecone import PineconeVectorStore
from pinecone import Pinecone, ServerlessSpec
from typing import List, Optional, Dict, Any
from dataclasses import dataclass
import hashlib
import logging

logger = logging.getLogger(__name__)

@dataclass
class DocumentMetadata:
    source: str
    page_number: Optional[int]
    section: Optional[str]
    doc_hash: str
    ingestion_date: str
    document_type: str

class DocumentIngestionPipeline:
    """Production document ingestion with deduplication and chunking."""

    LOADER_MAP = {
        ".pdf": PyPDFLoader,
        ".md": UnstructuredMarkdownLoader,
        ".txt": TextLoader,
        ".csv": CSVLoader,
    }

    def __init__(self, settings):
        self.settings = settings
        self.embeddings = OpenAIEmbeddings(
            model=settings.embedding_model,
            dimensions=settings.embedding_dimensions,
        )
        self._init_pinecone()

    def _init_pinecone(self):
        """Initialize Pinecone index with optimal configuration."""
        self.pc = Pinecone(api_key=self.settings.pinecone_api_key)

        existing_indexes = [idx.name for idx in self.pc.list_indexes()]

        if self.settings.pinecone_index_name not in existing_indexes:
            self.pc.create_index(
                name=self.settings.pinecone_index_name,
                dimension=self.settings.embedding_dimensions,
                metric="cosine",
                spec=ServerlessSpec(
                    cloud="aws",
                    region="us-east-1",
                ),
            )
            logger.info(f"Created Pinecone index: {self.settings.pinecone_index_name}")

        self.vector_store = PineconeVectorStore(
            index_name=self.settings.pinecone_index_name,
            embedding=self.embeddings,
            namespace=self.settings.pinecone_namespace,
        )

    def _compute_hash(self, content: str) -> str:
        return hashlib.sha256(content.encode()).hexdigest()

    def _get_text_splitter(self):
        """Return appropriate text splitter based on configuration."""
        if self.settings.chunking_strategy == "recursive":
            return RecursiveCharacterTextSplitter(
                chunk_size=self.settings.chunk_size,
                chunk_overlap=self.settings.chunk_overlap,
                length_function=len,
                separators=["\n\n", "\n", ". ", " ", ""],
            )
        elif self.settings.chunking_strategy == "markdown":
            from langchain.text_splitter import MarkdownHeaderTextSplitter
            headers = [
                ("#", "h1"),
                ("##", "h2"),
                ("###", "h3"),
            ]
            return MarkdownHeaderTextSplitter(headers_to_split_on=headers)
        else:
            return RecursiveCharacterTextSplitter(
                chunk_size=self.settings.chunk_size,
                chunk_overlap=self.settings.chunk_overlap,
            )

    def load_documents(self, file_paths: List[str]) -> List:
        """Load documents from various file types."""
        documents = []
        for path in file_paths:
            suffix = "." + path.rsplit(".", 1)[-1].lower()
            loader_cls = self.LOADER_MAP.get(suffix)
            if not loader_cls:
                logger.warning(f"Unsupported file type: {suffix} for {path}")
                continue

            try:
                loader = loader_cls(path)
                docs = loader.load()
                for doc in docs:
                    doc.metadata["doc_hash"] = self._compute_hash(doc.page_content)
                    doc.metadata["source"] = path
                    doc.metadata["document_type"] = suffix
                documents.extend(docs)
                logger.info(f"Loaded {len(docs)} documents from {path}")
            except Exception as e:
                logger.error(f"Error loading {path}: {e}")
                continue

        return documents

    def process_and_index(
        self,
        file_paths: List[str],
        batch_size: int = 100,
    ) -> Dict[str, Any]:
        """Full ingestion pipeline: load, split, embed, index."""
        # Load raw documents
        raw_docs = self.load_documents(file_paths)
        logger.info(f"Loaded {len(raw_docs)} raw documents")

        # Split into chunks
        text_splitter = self._get_text_splitter()
        chunks = text_splitter.split_documents(raw_docs)
        logger.info(f"Created {len(chunks)} chunks")

        # Deduplicate by hash
        seen_hashes = set()
        unique_chunks = []
        for chunk in chunks:
            chunk_hash = chunk.metadata.get("doc_hash", "")
            if chunk_hash not in seen_hashes:
                seen_hashes.add(chunk_hash)
                unique_chunks.append(chunk)

        logger.info(f"After deduplication: {len(unique_chunks)} unique chunks")

        # Batch embed and upsert to Pinecone
        for i in range(0, len(unique_chunks), batch_size):
            batch = unique_chunks[i : i + batch_size]
            self.vector_store.add_documents(batch)
            logger.info(
                f"Indexed batch {i // batch_size + 1}: "
                f"documents {i + 1}-{min(i + batch_size, len(unique_chunks))}"
            )

        stats = {
            "total_raw": len(raw_docs),
            "total_chunks": len(chunks),
            "unique_chunks": len(unique_chunks),
            "batches_indexed": (len(unique_chunks) + batch_size - 1) // batch_size,
        }
        logger.info(f"Ingestion complete: {stats}")
        return stats

Step 4: Query Processing and Retrieval

# src/retrieval/retriever.py
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_pinecone import PineconeVectorStore
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import CrossEncoderReranker
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
from langchain.prompts import ChatPromptTemplate
from langchain.schema import Document
from typing import List, Dict, Any, Optional, AsyncGenerator
from dataclasses import dataclass
import time
import logging

logger = logging.getLogger(__name__)

@dataclass
class RetrievalResult:
    documents: List[Document]
    scores: List[float]
    query_time_ms: float
    total_results: int

class ProductionRetriever:
    """Multi-strategy retriever with reranking and filtering."""

    SYSTEM_PROMPT = """You are a helpful assistant that answers questions based on the provided context.
    If the context does not contain enough information to answer the question, say so clearly.
    Always cite the source document when possible.

    Context:
    {context}

    Answer the user's question based on the above context. Be concise and accurate."""

    def __init__(self, settings):
        self.settings = settings
        self.embeddings = OpenAIEmbeddings(
            model=settings.embedding_model,
            dimensions=settings.embedding_dimensions,
        )
        self.vector_store = PineconeVectorStore(
            index_name=settings.pinecone_index_name,
            embedding=self.embeddings,
            namespace=settings.pinecone_namespace,
        )
        self.llm = ChatOpenAI(
            model=settings.openai_model,
            temperature=0.0,
            streaming=True,
        )
        self._setup_retriever()

    def _setup_retriever(self):
        """Configure retriever with reranking."""
        base_retriever = self.vector_store.as_retriever(
            search_type="similarity",
            search_kwargs={
                "k": self.settings.top_k,
                "score_threshold": self.settings.score_threshold,
            },
        )

        # Optional: Cross-encoder reranking
        cross_encoder = HuggingFaceCrossEncoder(model_name="BAAI/bge-reranker-base")
        compressor = CrossEncoderReranker(
            model=cross_encoder,
            top_n=self.settings.rerank_top_k,
        )
        self.retriever = ContextualCompressionRetriever(
            base_compressor=compressor,
            base_retriever=base_retriever,
        )

    def _format_context(self, documents: List[Document]) -> str:
        """Format retrieved documents into context string."""
        context_parts = []
        for i, doc in enumerate(documents, 1):
            source = doc.metadata.get("source", "Unknown")
            page = doc.metadata.get("page_number", "N/A")
            context_parts.append(
                f"[Document {i}] (Source: {source}, Page: {page})\n{doc.page_content}"
            )
        return "\n\n".join(context_parts)

    def retrieve(self, query: str) -> RetrievalResult:
        """Synchronous retrieval with metadata."""
        start = time.time()
        documents = self.retriever.invoke(query)
        scores = []  # Pinecone returns scores in metadata
        query_time = (time.time() - start) * 1000

        return RetrievalResult(
            documents=documents,
            scores=scores,
            query_time_ms=query_time,
            total_results=len(documents),
        )

    async def aretrieve(self, query: str) -> RetrievalResult:
        """Async retrieval."""
        start = time.time()
        documents = await self.retriever.ainvoke(query)
        query_time = (time.time() - start) * 1000

        return RetrievalResult(
            documents=documents,
            scores=[],
            query_time_ms=query_time,
            total_results=len(documents),
        )

    async def agenerate(
        self, query: str, chat_history: Optional[List[Dict]] = None
    ) -> AsyncGenerator[str, None]:
        """Streaming RAG generation."""
        # Retrieve relevant documents
        retrieval_result = await self.aretrieve(query)
        context = self._format_context(retrieval_result.documents)

        # Build prompt
        messages = [
            ("system", self.SYSTEM_PROMPT.format(context=context)),
        ]
        if chat_history:
            messages.extend(
                [("human", msg["human"]) if "human" in msg else ("ai", msg["ai"])
                 for msg in chat_history[-5:]]  # Last 5 exchanges
            )
        messages.append(("human", query))

        prompt = ChatPromptTemplate.from_messages(messages)

        # Stream response
        chain = prompt | self.llm
        async for chunk in chain.astream({}):
            if chunk.content:
                yield chunk.content

    def generate(
        self, query: str, chat_history: Optional[List[Dict]] = None
    ) -> Dict[str, Any]:
        """Synchronous RAG generation with full response."""
        retrieval_result = self.retrieve(query)
        context = self._format_context(retrieval_result.documents)

        messages = [
            ("system", self.SYSTEM_PROMPT.format(context=context)),
        ]
        if chat_history:
            for msg in chat_history[-5:]:
                if "human" in msg:
                    messages.append(("human", msg["human"]))
                if "ai" in msg:
                    messages.append(("ai", msg["ai"]))
        messages.append(("human", query))

        prompt = ChatPromptTemplate.from_messages(messages)
        chain = prompt | self.llm
        response = chain.invoke({})

        return {
            "answer": response.content,
            "sources": [
                {
                    "content": doc.page_content[:200],
                    "source": doc.metadata.get("source"),
                    "page": doc.metadata.get("page_number"),
                }
                for doc in retrieval_result.documents
            ],
            "retrieval_time_ms": retrieval_result.query_time_ms,
        }

Step 5: FastAPI Application

# src/api/main.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from contextlib import asynccontextmanager
import uvicorn
import logging

from src.config import get_settings
from src.retrieval.retriever import ProductionRetriever
from src.ingestion.document_loader import DocumentIngestionPipeline
from src.monitoring.metrics import MetricsCollector

logger = logging.getLogger(__name__)

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifecycle management."""
    settings = get_settings()
    app.state.retriever = ProductionRetriever(settings)
    app.state.ingestion = DocumentIngestionPipeline(settings)
    app.state.metrics = MetricsCollector()
    logger.info("RAG Pipeline initialized")
    yield
    logger.info("RAG Pipeline shutting down")

app = FastAPI(
    title="Production RAG API",
    version="1.0.0",
    description="Enterprise RAG pipeline with LangChain + Pinecone + GPT-4",
    lifespan=lifespan,
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

class QueryRequest(BaseModel):
    query: str = Field(..., min_length=1, max_length=2000)
    chat_history: Optional[List[Dict[str, str]]] = None
    top_k: Optional[int] = Field(5, ge=1, le=20)

class QueryResponse(BaseModel):
    answer: str
    sources: List[Dict[str, Any]]
    retrieval_time_ms: float
    total_tokens: Optional[int] = None

class IngestRequest(BaseModel):
    file_paths: List[str]

@app.post("/api/v1/query", response_model=QueryResponse)
async def query_rag(request: QueryRequest):
    """Query the RAG pipeline."""
    try:
        result = await app.state.retriever.agenerate(
            query=request.query,
            chat_history=request.chat_history,
        )
        app.state.metrics.record_query(
            retrieval_time_ms=result["retrieval_time_ms"],
            num_sources=len(result["sources"]),
        )
        return QueryResponse(**result)
    except Exception as e:
        logger.error(f"Query error: {e}")
        app.state.metrics.record_error("query_error")
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/api/v1/query/stream")
async def query_rag_stream(request: QueryRequest):
    """Stream RAG response."""
    async def generate():
        async for chunk in app.state.retriever.agenerate(
            query=request.query,
            chat_history=request.chat_history,
        ):
            yield f"data: {chunk}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

@app.post("/api/v1/ingest")
async def ingest_documents(request: IngestRequest, background_tasks: BackgroundTasks):
    """Ingest documents into the vector store."""
    background_tasks.add_task(
        app.state.ingestion.process_and_index,
        request.file_paths,
    )
    return {"status": "ingestion_started", "file_count": len(request.file_paths)}

@app.get("/api/v1/health")
async def health_check():
    return {"status": "healthy", "version": "1.0.0"}

@app.get("/api/v1/metrics")
async def get_metrics():
    return app.state.metrics.get_summary()

if __name__ == "__main__":
    uvicorn.run("src.api.main:app", host="0.0.0.0", port=8000, reload=True)

Step 6: MLflow Experiment Tracking

# src/monitoring/experiment_tracking.py
import mlflow
import mlflow.openai
from typing import Dict, Any, Optional
from dataclasses import dataclass
import time

@dataclass
class ExperimentConfig:
    experiment_name: str = "rag-pipeline"
    run_name: str = "production-run"
    tracking_uri: str = "http://localhost:5000"

class RAGExperimentTracker:
    """Track RAG pipeline experiments with MLflow."""

    def __init__(self, config: ExperimentConfig):
        mlflow.set_tracking_uri(config.tracking_uri)
        mlflow.set_experiment(config.experiment_name)
        self.config = config

    def log_retrieval_experiment(
        self,
        query: str,
        retrieved_docs: list,
        scores: list,
        ground_truth: Optional[str] = None,
        parameters: Dict[str, Any] = None,
    ):
        with mlflow.start_run(run_name=self.config.run_name):
            # Log parameters
            if parameters:
                mlflow.log_params(parameters)

            mlflow.log_param("query", query[:500])
            mlflow.log_param("num_retrieved", len(retrieved_docs))
            mlflow.log_param("top_score", max(scores) if scores else 0)

            # Log retrieval metrics
            if scores:
                mlflow.log_metric("avg_retrieval_score", sum(scores) / len(scores))
                mlflow.log_metric("max_retrieval_score", max(scores))
                mlflow.log_metric("min_retrieval_score", min(scores))

            # Log documents as artifacts
            for i, doc in enumerate(retrieved_docs):
                mlflow.log_text(
                    doc.page_content,
                    f"retrieved_docs/doc_{i}.txt",
                )

            return mlflow.active_run().info.run_id

    def log_generation_experiment(
        self,
        query: str,
        response: str,
        context: str,
        latency_ms: float,
        token_usage: Dict[str, int],
    ):
        with mlflow.start_run(run_name=f"{self.config.run_name}-generation"):
            mlflow.log_param("query_length", len(query))
            mlflow.log_param("context_length", len(context))
            mlflow.log_param("response_length", len(response))

            mlflow.log_metric("latency_ms", latency_ms)
            mlflow.log_metric("total_tokens", token_usage.get("total_tokens", 0))
            mlflow.log_metric("prompt_tokens", token_usage.get("prompt_tokens", 0))
            mlflow.log_metric("completion_tokens", token_usage.get("completion_tokens", 0))

            # Log artifacts
            mlflow.log_text(query, "query.txt")
            mlflow.log_text(response, "response.txt")
            mlflow.log_text(context, "context.txt")

            return mlflow.active_run().info.run_id

    def evaluate_retrieval(
        self,
        predictions: list,
        ground_truth: list,
        metric: str = "hit_rate",
    ) -> float:
        """Evaluate retrieval quality."""
        if metric == "hit_rate":
            hits = sum(
                1 for pred, gt in zip(predictions, ground_truth)
                if any(p in gt for p in pred)
            )
            score = hits / len(ground_truth) if ground_truth else 0
        elif metric == "mrr":
            # Mean Reciprocal Rank
            rr_sum = 0
            for pred_list, gt in zip(predictions, ground_truth):
                for rank, pred in enumerate(pred_list, 1):
                    if pred in gt:
                        rr_sum += 1 / rank
                        break
            score = rr_sum / len(ground_truth) if ground_truth else 0
        else:
            raise ValueError(f"Unknown metric: {metric}")

        with mlflow.start_run(run_name=f"eval-{metric}"):
            mlflow.log_metric(f"retrieval_{metric}", score)
            mlflow.log_param("eval_metric", metric)
            mlflow.log_param("num_samples", len(ground_truth))

        return score

Step 7: Docker Deployment

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    build-essential \
    && rm -rf /var/lib/apt/lists/*

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "src.api.main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'

services:
  rag-api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - PINECONE_API_KEY=${PINECONE_API_KEY}
      - PINECONE_INDEX_NAME=rag-production
    volumes:
      - ./data:/app/data
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/api/v1/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  mlflow:
    image: ghcr.io/mlflow/mlflow:v2.10.0
    ports:
      - "5000:5000"
    command: mlflow server --host 0.0.0.0 --port 5000
    volumes:
      - mlflow-data:/mlflow

  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./configs/prometheus.yml:/etc/prometheus/prometheus.yml

volumes:
  mlflow-data:

Step 8: Evaluation Framework

# src/monitoring/evaluation.py
from typing import List, Dict, Any
from dataclasses import dataclass
import numpy as np

@dataclass
class EvaluationResult:
    metric_name: str
    score: float
    details: Dict[str, Any]

class RAGEvaluator:
    """Comprehensive RAG evaluation suite."""

    def evaluate_answer_relevance(
        self,
        query: str,
        answer: str,
        llm,
    ) -> EvaluationResult:
        """Evaluate how relevant the answer is to the query."""
        prompt = f"""Rate the relevance of the answer to the query on a scale of 1-5.

Query: {query}
Answer: {answer}

Rating (1-5):"""

        response = llm.invoke(prompt)
        try:
            score = float(response.content.strip()) / 5.0
        except ValueError:
            score = 0.0

        return EvaluationResult(
            metric_name="answer_relevance",
            score=score,
            details={"query": query, "answer_preview": answer[:200]},
        )

    def evaluate_context_relevance(
        self,
        query: str,
        contexts: List[str],
        llm,
    ) -> EvaluationResult:
        """Evaluate how relevant contexts are to the query."""
        scores = []
        for ctx in contexts:
            prompt = f"""Rate how relevant this context is to the query on a scale of 1-5.

Query: {query}
Context: {ctx[:500]}

Rating (1-5):"""
            response = llm.invoke(prompt)
            try:
                scores.append(float(response.content.strip()) / 5.0)
            except ValueError:
                scores.append(0.0)

        avg_score = np.mean(scores) if scores else 0.0
        return EvaluationResult(
            metric_name="context_relevance",
            score=avg_score,
            details={"num_contexts": len(contexts), "individual_scores": scores},
        )

    def evaluate_faithfulness(
        self,
        answer: str,
        contexts: List[str],
        llm,
    ) -> EvaluationResult:
        """Evaluate if the answer is grounded in the provided contexts."""
        prompt = f"""Determine if the answer is fully supported by the contexts.

Answer: {answer}
Contexts: {" ".join(contexts[:3])}

Is the answer fully supported? (yes/no):"""

        response = llm.invoke(prompt)
        score = 1.0 if "yes" in response.content.lower() else 0.0

        return EvaluationResult(
            metric_name="faithfulness",
            score=score,
            details={"supported": score == 1.0},
        )

    def run_full_evaluation(
        self,
        test_cases: List[Dict[str, Any]],
        retriever,
        llm,
    ) -> List[EvaluationResult]:
        """Run complete evaluation on test cases."""
        results = []
        for case in test_cases:
            query = case["query"]
            result = retriever.generate(query)

            results.append(
                self.evaluate_answer_relevance(query, result["answer"], llm)
            )
            results.append(
                self.evaluate_context_relevance(
                    query,
                    [s["content"] for s in result["sources"]],
                    llm,
                )
            )
            results.append(
                self.evaluate_faithfulness(
                    result["answer"],
                    [s["content"] for s in result["sources"]],
                    llm,
                )
            )

        return results

Performance Metrics and Evaluation

MetricTargetDescription
Retrieval Latency< 200msP95 latency for vector search
End-to-End Latency< 2sFull pipeline response time
Hit Rate@5> 0.85Relevant doc in top 5 results
MRR> 0.75Mean Reciprocal Rank
Faithfulness> 0.90Answer grounded in context
Answer Relevance> 4.0/5LLM-judged relevance score

ℹ️

Always set up a golden dataset of at least 100 query-document pairs before deploying to production. Use human-annotated ground truth for reliable evaluation metrics.

πŸ’‘

Implement a feedback loop where users can thumbs-up/down responses. This data is invaluable for continuous improvement and fine-tuning your retrieval strategies.

Interview Talking Points

  1. Chunking Strategy: Why recursive character splitting over fixed-size? Trade-offs between semantic and fixed chunking for different document types.

  2. Hybrid Search: Combining dense embeddings with BM25 sparse retrieval improves recall by 15-25% for keyword-heavy queries.

  3. Reranking: Cross-encoder reranking after initial retrieval improves precision but adds latency. The optimal setup uses a lightweight reranker for real-time and a heavier one for batch.

  4. Evaluation: The RAG triad (answer relevance, context relevance, faithfulness) provides a comprehensive quality framework without requiring human annotation.

  5. Production Concerns: Rate limiting, token budget management, caching frequent queries, and handling concurrent requests are critical for production deployment.

  6. Scaling: Pinecone serverless handles scaling automatically, but you should implement connection pooling and consider regional deployment for latency.

⚠️

Never store API keys in code. Always use environment variables or a secrets manager like AWS Secrets Manager or HashiCorp Vault.

Advanced Optimizations

Query Routing

from langchain.chains import RouterChain
from langchain.chains.router import MultiRouteChain

class QueryRouter:
    """Route different query types to specialized retrievers."""

    def __init__(self):
        self.routes = {
            "factual": self.factual_retriever,
            "summarization": self.summary_retriever,
            "code": self.code_retriever,
        }

    def route(self, query: str) -> str:
        """Classify query and route to appropriate retriever."""
        if any(kw in query.lower() for kw in ["code", "function", "api", "implement"]):
            return "code"
        elif any(kw in query.lower() for kw in ["summarize", "summary", "overview"]):
            return "summarization"
        return "factual"

Caching Layer

from functools import lru_cache
from hashlib import md5
import redis

class QueryCache:
    """Redis-based query caching for repeated queries."""

    def __init__(self, redis_url: str, ttl: int = 3600):
        self.redis = redis.from_url(redis_url)
        self.ttl = ttl

    def get(self, query: str) -> Optional[dict]:
        key = f"rag:{md5(query.encode()).hexdigest()}"
        cached = self.redis.get(key)
        return json.loads(cached) if cached else None

    def set(self, query: str, result: dict):
        key = f"rag:{md5(query.encode()).hexdigest()}"
        self.redis.setex(key, self.ttl, json.dumps(result))

ℹ️

This project demonstrates a complete production RAG pipeline. For the full implementation with all edge cases and optimizations, refer to the accompanying repository.

Advertisement