Real-Time Recommendation System
Embeddings + ANN + Redis | Sub-Millisecond Serving
Project Overview
Problem Statement
E-commerce and content platforms need to deliver personalized recommendations in real-time. Traditional collaborative filtering is too slow for online serving. This system uses pre-computed embeddings with approximate nearest neighbor (ANN) search in Redis to serve recommendations in under 1ms.
Objectives
- Build user and item embedding models from interaction data
- Implement ANN search using HNSW indexes in Redis
- Create a real-time feature store for user context
- Deploy with A/B testing infrastructure
- Achieve sub-millisecond recommendation latency at 10K QPS
| Component | Technology |
|---|---|
| Embedding Model | Sentence Transformers + Custom |
| Vector Store | Redis + HNSW |
| Feature Store | Redis + Feature Store |
| API Framework | FastAPI + gRPC |
| A/B Testing | Optimizely / LaunchDarkly |
| Monitoring | Prometheus + Grafana |
| Experiment Tracking | MLflow |
Architecture Diagram
+-------------------------------------------------------------------+
| Real-Time Recommendation Architecture |
+-------------------------------------------------------------------+
| +--------------+ +--------------+ +------------------+ |
| | User Events |--->| Stream Proc |--->| Embedding Gen | |
| | (Kafka) | | (Flink) | | (GPU Workers) | |
| +--------------+ +--------------+ +--------+---------+ |
| | |
| v |
| +--------------+ +--------------+ +------------------+ |
| | Real-time |<---| Redis HNSW |<---| Embedding Store | |
| | Serving API | | Index | | (Pre-computed) | |
| +--------------+ +--------------+ +------------------+ |
| | | |
| v v |
| +--------------+ +--------------+ +------------------+ |
| | Response | | A/B Testing | | Feature Store | |
| | Ranking | | Layer | | (User Context) | |
| +--------------+ +--------------+ +------------------+ |
+-------------------------------------------------------------------+
Step-by-Step Implementation
Step 1: Environment Setup
mkdir rec-system && cd rec-system
python -m venv venv && source venv/bin/activate
pip install fastapi uvicorn redis[async] sentence-transformers
pip install numpy faiss-cpu torch pydantic
pip install mlflow kafka-python asyncpg
pip install prometheus-client grpcio
Step 2: Embedding Model Training
Train a custom embedding model using user interaction data with contrastive learning.
# src/embeddings/train_embeddings.py
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from sentence_transformers import SentenceTransformer, InputExample, losses
from sentence_transformers.evaluation import EmbeddingSimilarityEvaluator
import numpy as np
from typing import List, Tuple, Dict
import mlflow
class InteractionDataset(Dataset):
def __init__(self, interactions: List[Dict]):
self.pairs = []
for inter in interactions:
# Positive: items the user interacted with
# Negative: random items the user did not interact with
self.pairs.append(InputExample(
texts=[inter["user_profile"], inter["item_description"]],
label=float(inter["engagement_score"])
))
def __len__(self):
return len(self.pairs)
def __getitem__(self, idx):
return self.pairs[idx]
class EmbeddingTrainer:
def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
self.model = SentenceTransformer(model_name)
def train(
self,
train_data: List[Dict],
eval_data: List[Dict],
epochs: int = 10,
batch_size: int = 64,
learning_rate: float = 2e-5,
):
train_dataset = InteractionDataset(train_data)
train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size)
# Cosine similarity loss
train_loss = losses.CosineSimilarityLoss(self.model)
# Evaluator
eval_examples = [InputExample(
texts=[e["user_profile"], e["item_description"]],
label=float(e["engagement_score"])
) for e in eval_data]
evaluator = EmbeddingSimilarityEvaluator.from_input_examples(
eval_examples, name="eval"
)
mlflow.set_experiment("embedding-training")
with mlflow.start_run():
mlflow.log_params({
"model_name": "all-MiniLM-L6-v2",
"epochs": epochs,
"batch_size": batch_size,
"learning_rate": learning_rate,
})
self.model.fit(
train_objectives=[(train_dataloader, train_loss)],
evaluator=evaluator,
epochs=epochs,
warmup_steps=100,
output_path="./trained_embeddings",
show_progress_bar=True,
)
def encode(self, texts: List[str], batch_size: int = 256) -> np.ndarray:
return self.model.encode(texts, batch_size=batch_size, show_progress_bar=True)
Step 3: Redis ANN Index Setup
Build and manage HNSW indexes in Redis for fast approximate nearest neighbor search.
# src/storage/redis_index.py
import redis.asyncio as redis
import numpy as np
from typing import List, Tuple, Optional
import json
import logging
logger = logging.getLogger(__name__)
class RedisVectorIndex:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url, decode_responses=False)
async def create_index(
self, index_name: str, dimension: int = 384,
ef_construction: int = 200, m: int = 16
):
try:
await self.redis.execute_command(
"FT.CREATE", index_name,
"ON", "HASH",
"PREFIX", "1", f"{index_name}:",
"SCHEMA",
"vector", "VECTOR", "HNSW", "6",
"TYPE", "FLOAT32",
"DIM", str(dimension),
"DISTANCE_METRIC", "COSINE",
"EF_CONSTRUCTION", str(ef_construction),
"M", str(m),
)
logger.info(f"Created index {index_name} with dim={dimension}")
except redis.ResponseError as e:
if "Index already exists" not in str(e):
raise
async def upsert_embeddings(
self, index_name: str,
ids: List[str], embeddings: np.ndarray,
metadata: Optional[List[dict]] = None
):
pipe = self.redis.pipeline()
for i, (item_id, embedding) in enumerate(zip(ids, embeddings)):
key = f"{index_name}:{item_id}"
data = {"vector": embedding.astype(np.float32).tobytes()}
if metadata and i < len(metadata):
data["meta"] = json.dumps(metadata[i])
pipe.hset(key, mapping=data)
await pipe.execute()
logger.info(f"Upserted {len(ids)} embeddings to {index_name}")
async def search(
self, index_name: str, query_vector: np.ndarray,
top_k: int = 10,
) -> List[Tuple[str, float]]:
query = f"*=>[KNN {top_k} @vector $query_vec AS score]"
result = await self.redis.execute_command(
"FT.SEARCH", index_name, query,
"PARAMS", "2", "query_vec", query_vector.astype(np.float32).tobytes(),
"DIALECT", "2",
"SORTBY", "score", "ASC",
)
matches = []
for i in range(2, len(result), 2):
key = result[i].decode().split(":")[-1]
score = float(result[i+1][result[i+1].index(b"score") + 6])
matches.append((key, score))
return matches
Step 4: Real-Time Serving API
Build a high-performance FastAPI service for real-time recommendation serving.
# src/api/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from typing import List, Optional
from contextlib import asynccontextmanager
import numpy as np
import time
from src.storage.redis_index import RedisVectorIndex
from src.embeddings.train_embeddings import EmbeddingTrainer
from src.features.user_features import UserFeatureStore
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.redis = RedisVectorIndex()
app.state.embedder = EmbeddingTrainer("./trained_embeddings")
app.state.features = UserFeatureStore()
await app.state.redis.create_index("items", dimension=384)
yield
await app.state.redis.redis.close()
app = FastAPI(title="Rec System API", version="1.0.0", lifespan=lifespan)
class RecRequest(BaseModel):
user_id: str
context: Optional[dict] = None
num_results: int = Field(10, ge=1, le=100)
exclude_items: Optional[List[str]] = None
class RecItem(BaseModel):
item_id: str
score: float
metadata: Optional[dict] = None
class RecResponse(BaseModel):
recommendations: List[RecItem]
latency_ms: float
model_version: str
@app.post("/api/v1/recommend", response_model=RecResponse)
async def recommend(request: RecRequest):
start = time.time()
# Get user embedding from feature store
user_embedding = await app.state.features.get_user_embedding(
request.user_id, request.context
)
# ANN search in Redis
results = await app.state.redis.search(
"items", user_embedding, top_k=request.num_results * 2
)
# Filter excluded items
if request.exclude_items:
results = [r for r in results if r[0] not in request.exclude_items]
# Take top_k results
results = results[:request.num_results]
latency = (time.time() - start) * 1000
return RecResponse(
recommendations=[RecItem(item_id=r[0], score=r[1]) for r in results],
latency_ms=latency,
model_version="v1.2.0",
)
Step 5: User Feature Store
Build a real-time feature store for computing and caching user embeddings from recent interactions.
# src/features/user_features.py
import redis.asyncio as redis
import numpy as np
from typing import Optional, Dict, List
import json
from datetime import datetime, timedelta
class UserFeatureStore:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url, decode_responses=True)
self.TTL = timedelta(days=30)
async def get_user_embedding(
self, user_id: str, context: Optional[Dict] = None
) -> np.ndarray:
# Try cache first
cached = await self.redis.get(f"user_emb:{user_id}")
if cached:
return np.frombuffer(cached, dtype=np.float32)
# Compute from recent interactions
interactions = await self.get_recent_interactions(user_id)
if not interactions:
return np.random.randn(384).astype(np.float32) # Cold start
# Weighted average of item embeddings
weights = np.array([i["weight"] for i in interactions])
weights = weights / weights.sum()
embeddings = np.array([np.frombuffer(i["embedding"], dtype=np.float32) for i in interactions])
user_emb = np.average(embeddings, axis=0, weights=weights)
# Cache result
await self.redis.setex(
f"user_emb:{user_id}",
int(self.TTL.total_seconds()),
user_emb.astype(np.float32).tobytes(),
)
return user_emb
async def get_recent_interactions(
self, user_id: str, limit: int = 50
) -> List[Dict]:
return await self.redis.lrange(f"user_interactions:{user_id}", 0, limit - 1)
async def record_interaction(
self, user_id: str, item_id: str,
interaction_type: str, weight: float
):
entry = json.dumps({"item_id": item_id, "type": interaction_type, "weight": weight})
await self.redis.lpush(f"user_interactions:{user_id}", entry)
await self.redis.ltrim(f"user_interactions:{user_id}", 0, 999)
# Invalidate cached embedding
await self.redis.delete(f"user_emb:{user_id}")
Step 6: Docker Deployment
# docker-compose.yml
version: "3.8"
services:
rec-api:
build: .
ports:
- "8000:8000"
environment:
- REDIS_URL=redis://redis:6379
- MODEL_PATH=/models/embeddings
depends_on:
- redis
deploy:
replicas: 3
redis:
image: redis/redis-stack-server:latest
ports:
- "6379:6379"
volumes:
- redis-data:/data
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
volumes:
redis-data:
βΉοΈ
Pre-compute embeddings in batch and upsert them to Redis. Computing embeddings at request time adds unnecessary latency. Use a background job to refresh embeddings periodically.
π‘
For cold-start users, combine content-based features (demographics, device info) with popularity-based fallbacks until enough interaction data is collected.
Performance Metrics
| Metric | Target | Description |
|---|---|---|
| ANN Latency | < 1ms | Redis HNSW search |
| End-to-End Latency | < 10ms | Full pipeline |
| Throughput | > 10K QPS | Per API instance |
| Recall@10 | > 0.95 | vs exact NN search |
| NDCG@10 | > 0.80 | Ranking quality |
| Cache Hit Rate | > 90% | User embedding cache |
Interview Talking Points
- ANN vs Exact NN: HNSW provides 1000x speedup over brute-force with minimal recall loss (typically 95%+).
- Two-Tower Architecture: Separate user and item towers enable pre-computing item embeddings and real-time user embedding computation.
- Cold Start Strategy: Content-based features plus popularity fallback for new users, transitioning to collaborative filtering as data accumulates.
- Feature Store Design: Redis provides sub-millisecond reads for real-time features while maintaining high throughput.
- A/B Testing: Infrastructure for comparing model versions and ranking strategies with statistical significance.
- Scaling: Horizontal scaling through Redis clustering and API replicas behind a load balancer.
β οΈ
Monitor ANN index quality over time. As the index grows, HNSW parameters may need tuning to maintain recall targets.
βΉοΈ
This project demonstrates a complete real-time recommendation pipeline. For production deployment, add model retraining automation and drift detection.