πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Recommender Systems: Collaborative Filtering, Content-Based

Data Science Interview PremiumRecommender Systems⭐ Premium

Advertisement

NETFLIX & AMAZON INTERVIEW QUESTION

Recommender Systems: Collaborative Filtering, Content-Based

Recommendation Algorithms & Personalization

The Interview Question

ℹ️

Question: You're building a recommendation engine for a streaming platform:

  • Dataset: 100M ratings from 1M users on 10K movies
  • Requirements: Real-time recommendations, handle cold start, explainable recommendations

Walk through your recommendation system design:

  1. How do you implement collaborative filtering at scale?
  2. How do you handle the cold start problem?
  3. How do you evaluate recommendation quality?
  4. How do you ensure diversity and serendipity in recommendations?

Detailed Answer

1. Collaborative Filtering Implementation

import pandas as pd
import numpy as np
from scipy.sparse import csr_matrix
from scipy.sparse.linalg import svals
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.decomposition import TruncatedSVD
import implicit
from collections import defaultdict
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')

class CollaborativeFiltering:
    """Collaborative filtering recommendation system"""
    
    def __init__(self, ratings_df):
        self.ratings = ratings_df
        self.user_item_matrix = None
        self.similarity_matrix = None
    
    def create_user_item_matrix(self):
        """Create sparse user-item matrix"""
        # Create mappings
        self.user_ids = self.ratings['user_id'].unique()
        self.item_ids = self.ratings['movie_id'].unique()
        
        self.user_to_idx = {uid: idx for idx, uid in enumerate(self.user_ids)}
        self.item_to_idx = {iid: idx for idx, iid in enumerate(self.item_ids)}
        self.idx_to_user = {idx: uid for uid, idx in self.user_to_idx.items()}
        self.idx_to_item = {idx: iid for iid, idx in self.item_to_idx.items()}
        
        # Create sparse matrix
        rows = self.ratings['user_id'].map(self.user_to_idx)
        cols = self.ratings['movie_id'].map(self.item_to_idx)
        values = self.ratings['rating']
        
        self.user_item_matrix = csr_matrix(
            (values, (rows, cols)),
            shape=(len(self.user_ids), len(self.item_ids))
        )
        
        print(f"User-Item Matrix: {self.user_item_matrix.shape}")
        print(f"Sparsity: {1 - self.user_item_matrix.nnz / (self.user_item_matrix.shape[0] * self.user_item_matrix.shape[1]):.4f}")
        
        return self.user_item_matrix
    
    def user_based_cf(self, user_id, n_recommendations=10):
        """User-based collaborative filtering"""
        if self.similarity_matrix is None:
            # Calculate user similarity using cosine similarity
            self.similarity_matrix = cosine_similarity(self.user_item_matrix)
        
        user_idx = self.user_to_idx[user_id]
        
        # Get similarity scores for target user
        user_similarities = self.similarity_matrix[user_idx]
        
        # Get items rated by target user
        user_ratings = self.user_item_matrix[user_idx].toarray().flatten()
        rated_items = np.where(user_ratings > 0)[0]
        
        # Find similar users who rated unrated items
        recommendations = {}
        
        for item_idx in range(self.user_item_matrix.shape[1]):
            if item_idx not in rated_items:
                # Calculate predicted rating
                numerator = 0
                denominator = 0
                
                for other_idx in range(self.user_item_matrix.shape[0]):
                    if other_idx != user_idx:
                        other_rating = self.user_item_matrix[other_idx, item_idx]
                        if other_rating > 0:
                            numerator += user_similarities[other_idx] * other_rating
                            denominator += abs(user_similarities[other_idx])
                
                if denominator > 0:
                    predicted_rating = numerator / denominator
                    recommendations[self.idx_to_item[item_idx]] = predicted_rating
        
        # Sort and return top N
        sorted_recs = sorted(recommendations.items(), key=lambda x: x[1], reverse=True)
        return sorted_recs[:n_recommendations]
    
    def item_based_cf(self, user_id, n_recommendations=10):
        """Item-based collaborative filtering"""
        # Calculate item similarity
        item_similarity = cosine_similarity(self.user_item_matrix.T)
        
        user_idx = self.user_to_idx[user_id]
        user_ratings = self.user_item_matrix[user_idx].toarray().flatten()
        
        # Get items rated by user
        rated_items = np.where(user_ratings > 0)[0]
        
        # Calculate predicted ratings for unrated items
        recommendations = {}
        
        for item_idx in range(self.user_item_matrix.shape[1]):
            if item_idx not in rated_items:
                # Weighted average of similar items
                numerator = 0
                denominator = 0
                
                for rated_idx in rated_items:
                    similarity = item_similarity[item_idx, rated_idx]
                    rating = user_ratings[rated_idx]
                    
                    numerator += similarity * rating
                    denominator += abs(similarity)
                
                if denominator > 0:
                    predicted_rating = numerator / denominator
                    recommendations[self.idx_to_item[item_idx]] = predicted_rating
        
        sorted_recs = sorted(recommendations.items(), key=lambda x: x[1], reverse=True)
        return sorted_recs[:n_recommendations]
    
    def matrix_factorization_svd(self, n_factors=100, n_recommendations=10):
        """Matrix factorization using SVD"""
        # Perform SVD
        svd = TruncatedSVD(n_components=n_factors, random_state=42)
        user_factors = svd.fit_transform(self.user_item_matrix)
        item_factors = svd.components_
        
        # Reconstruct the rating matrix
        reconstructed = np.dot(user_factors, item_factors)
        
        return user_factors, item_factors, reconstructed
    
    def als_implicit(self, user_id, n_recommendations=10, factors=100, regularization=0.01, iterations=20):
        """Alternating Least Squares for implicit feedback"""
        # Convert to implicit format
        model = implicit.als.AlternatingLeastSquares(
            factors=factors,
            regularization=regularization,
            iterations=iterations,
            random_state=42
        )
        
        # Train model (items_users format for implicit library)
        model.fit(self.user_item_matrix.T)
        
        # Get recommendations
        user_idx = self.user_to_idx[user_id]
        recommendations = model.recommend(
            user_idx,
            self.user_item_matrix[user_idx],
            N=n_recommendations
        )
        
        # Convert back to item IDs
        rec_items = [(self.idx_to_item[idx], score) for idx, score in zip(recommendations[0], recommendations[1])]
        
        return rec_items

# Example usage
# cf = CollaborativeFiltering(ratings_df)
# cf.create_user_item_matrix()
# recommendations = cf.user_based_cf(user_id=123, n_recommendations=10)

2. Content-Based Filtering

class ContentBasedRecommender:
    """Content-based recommendation system"""
    
    def __init__(self, items_df):
        self.items = items_df
        self.item_similarity = None
        self.feature_matrix = None
    
    def create_feature_matrix(self, text_columns, categorical_columns):
        """Create item feature matrix"""
        from sklearn.feature_extraction.text import TfidfVectorizer
        from sklearn.preprocessing import OneHotEncoder
        from sklearn.compose import ColumnTransformer
        from sklearn.pipeline import Pipeline
        
        # Text features using TF-IDF
        text_transformer = Pipeline([
            ('tfidf', TfidfVectorizer(max_features=1000, stop_words='english'))
        ])
        
        # Categorical features
        categorical_transformer = Pipeline([
            ('onehot', OneHotEncoder(handle_unknown='ignore'))
        ])
        
        # Combine features
        preprocessor = ColumnTransformer([
            ('text', text_transformer, text_columns[0] if text_columns else []),
            ('categorical', categorical_transformer, categorical_columns)
        ])
        
        # Fit and transform
        self.feature_matrix = preprocessor.fit_transform(self.items)
        
        # Calculate item similarity
        self.item_similarity = cosine_similarity(self.feature_matrix)
        
        print(f"Feature matrix shape: {self.feature_matrix.shape}")
        
        return self.feature_matrix
    
    def recommend_from_item(self, item_id, n_recommendations=10):
        """Recommend items similar to a given item"""
        item_idx = self.items.index.get_loc(item_id)
        
        # Get similarity scores
        similar_scores = list(enumerate(self.item_similarity[item_idx]))
        similar_scores = sorted(similar_scores, key=lambda x: x[1], reverse=True)
        
        # Get top N similar items (excluding itself)
        top_similar = similar_scores[1:n_recommendations + 1]
        
        recommendations = [
            (self.items.index[idx], score) for idx, score in top_similar
        ]
        
        return recommendations
    
    def recommend_for_user(self, user_history, n_recommendations=10):
        """Recommend based on user's viewing history"""
        # Get items user has interacted with
        user_items = self.items[self.items.index.isin(user_history)]
        
        if len(user_items) == 0:
            return []
        
        # Calculate user profile as weighted average of item features
        user_indices = [self.items.index.get_loc(idx) for idx in user_history]
        user_profile = self.feature_matrix[user_indices].mean(axis=0)
        
        # Calculate similarity between user profile and all items
        user_profile_dense = user_profile.toarray() if hasattr(user_profile, 'toarray') else user_profile
        similarities = cosine_similarity(user_profile_dense, self.feature_matrix)[0]
        
        # Get top N recommendations (excluding already seen items)
        similar_scores = list(enumerate(similarities))
        similar_scores = sorted(similar_scores, key=lambda x: x[1], reverse=True)
        
        recommendations = []
        for idx, score in similar_scores:
            if self.items.index[idx] not in user_history:
                recommendations.append((self.items.index[idx], score))
                if len(recommendations) >= n_recommendations:
                    break
        
        return recommendations
    
    def get_feature_importance(self, item_id, top_n=5):
        """Get most important features for an item"""
        item_idx = self.items.index.get_loc(item_id)
        
        # Get item features
        item_features = self.feature_matrix[item_idx].toarray().flatten()
        
        # Get feature names
        feature_names = self.get_feature_names()
        
        # Sort by importance
        feature_importance = list(zip(feature_names, item_features))
        feature_importance.sort(key=lambda x: x[1], reverse=True)
        
        return feature_importance[:top_n]
    
    def get_feature_names(self):
        """Get feature names from the preprocessor"""
        # This depends on how the preprocessor was configured
        # Simplified version
        return [f'feature_{i}' for i in range(self.feature_matrix.shape[1])]

# Example usage
# content_rec = ContentBasedRecommender(movies_df)
# content_rec.create_feature_matrix(['description'], ['genre', 'director'])
# recommendations = content_rec.recommend_from_item(movie_id=123)

3. Hybrid Recommendation System

class HybridRecommender:
    """Hybrid recommendation combining multiple approaches"""
    
    def __init__(self, cf_recommender, content_recommender):
        self.cf = cf_recommender
        self.content = content_recommender
        self.weights = {'cf': 0.6, 'content': 0.4}
    
    def recommend_hybrid(self, user_id, user_history, n_recommendations=10):
        """Generate hybrid recommendations"""
        # Get collaborative filtering recommendations
        cf_recs = self.cf.user_based_cf(user_id, n_recommendations * 2)
        cf_dict = {item: score for item, score in cf_recs}
        
        # Get content-based recommendations
        content_recs = self.content.recommend_for_user(user_history, n_recommendations * 2)
        content_dict = {item: score for item, score in content_recs}
        
        # Combine scores
        all_items = set(list(cf_dict.keys()) + list(content_dict.keys()))
        
        hybrid_scores = {}
        for item in all_items:
            cf_score = cf_dict.get(item, 0)
            content_score = content_dict.get(item, 0)
            
            # Weighted combination
            hybrid_score = (self.weights['cf'] * cf_score + 
                          self.weights['content'] * content_score)
            
            hybrid_scores[item] = hybrid_score
        
        # Sort and get top N
        sorted_recs = sorted(hybrid_scores.items(), key=lambda x: x[1], reverse=True)
        
        return sorted_recs[:n_recommendations]
    
    def update_weights(self, validation_data):
        """Optimize weights using validation data"""
        from scipy.optimize import minimize
        
        def objective(weights):
            self.weights['cf'] = weights[0]
            self.weights['content'] = 1 - weights[0]
            
            # Calculate NDCG on validation data
            ndcg = self.evaluate_on_validation(validation_data)
            return -ndcg  # Minimize negative NDCG
        
        # Optimize
        result = minimize(objective, [0.5], bounds=[(0, 1)])
        
        self.weights['cf'] = result.x[0]
        self.weights['content'] = 1 - result.x[0]
        
        print(f"Optimized weights: CF={self.weights['cf']:.3f}, Content={self.weights['content']:.3f}")
    
    def evaluate_on_validation(self, validation_data):
        """Evaluate recommendations on validation data"""
        ndcg_scores = []
        
        for user_id, true_items in validation_data.items():
            # Get recommendations
            user_history = self.get_user_history(user_id)
            recommendations = self.recommend_hybrid(user_id, user_history, n_recommendations=10)
            
            rec_items = [item for item, score in recommendations]
            
            # Calculate NDCG
            ndcg = self.calculate_ndcg(true_items, rec_items)
            ndcg_scores.append(ndcg)
        
        return np.mean(ndcg_scores)
    
    def calculate_ndcg(self, true_items, predicted_items, k=10):
        """Calculate NDCG@k"""
        # DCG
        dcg = 0
        for i, item in enumerate(predicted_items[:k]):
            if item in true_items:
                dcg += 1 / np.log2(i + 2)
        
        # Ideal DCG
        ideal_dcg = sum(1 / np.log2(i + 2) for i in range(min(len(true_items), k)))
        
        if ideal_dcg == 0:
            return 0
        
        return dcg / ideal_dcg
    
    def get_user_history(self, user_id):
        """Get user's interaction history"""
        # This would typically come from a database
        # Simplified version
        return []

# Example usage
# hybrid = HybridRecommender(cf, content_rec)
# recommendations = hybrid.recommend_hybrid(user_id=123, user_history=[1, 2, 3])

4. Cold Start Solutions

class ColdStartHandler:
    """Handle cold start problems in recommendations"""
    
    def __init__(self, items_df, popular_items):
        self.items = items_df
        self.popular_items = popular_items
    
    def new_user_recommendations(self, n_recommendations=10):
        """Recommendations for new users (no history)"""
        # Strategy 1: Popular items
        recommendations = self.popular_items[:n_recommendations]
        
        return recommendations
    
    def new_user_with_demographics(self, demographics, n_recommendations=10):
        """Recommendations using demographic information"""
        # Find similar users based on demographics
        # Recommend what similar users liked
        
        # Simplified: use demographic-based rules
        if demographics.get('age', 0) < 25:
            # Young users tend to like certain genres
            recommendations = self.items[
                self.items['genre'].isin(['Action', 'Comedy', 'Sci-Fi'])
            ].nlargest(n_recommendations, 'popularity')
        elif demographics.get('age', 0) > 40:
            # Older users tend to like different genres
            recommendations = self.items[
                self.items['genre'].isin(['Drama', 'Documentary', 'Classic'])
            ].nlargest(n_recommendations, 'popularity')
        else:
            recommendations = self.popular_items[:n_recommendations]
        
        return recommendations
    
    def new_item_recommendations(self, item_features, n_recommendations=10):
        """Recommendations for new items (no interactions)"""
        # Strategy 1: Content-based similarity
        # Find similar existing items
        
        # Strategy 2: Use item features to find target audience
        # Recommend to users who liked similar items
        
        # Simplified: find similar items by content
        similarities = []
        for existing_idx, existing_item in self.items.iterrows():
            similarity = self.calculate_item_similarity(item_features, existing_item)
            similarities.append((existing_idx, similarity))
        
        # Get most similar items
        similarities.sort(key=lambda x: x[1], reverse=True)
        similar_items = similarities[:n_recommendations]
        
        return similar_items
    
    def calculate_item_similarity(self, item1_features, item2_features):
        """Calculate similarity between two items"""
        # This would use actual feature comparison
        # Simplified version
        return 0.5
    
    def handle_new_item_with_metadata(self, item_metadata):
        """Use metadata to make new item recommendations"""
        # Extract features from metadata
        genre = item_metadata.get('genre', 'Unknown')
        director = item_metadata.get('director', 'Unknown')
        year = item_metadata.get('year', 2020)
        
        # Find similar items
        similar_items = self.items[
            (self.items['genre'] == genre) | 
            (self.items['director'] == director)
        ].head(10)
        
        return similar_items

# Example usage
# cold_start = ColdStartHandler(movies_df, popular_movies)
# new_user_recs = cold_start.new_user_recommendations()
# new_item_recs = cold_start.new_item_recommendations(new_movie_features)

5. Evaluation Metrics

class RecommenderEvaluator:
    """Evaluate recommendation system performance"""
    
    def __init__(self, test_data, recommendations):
        self.test_data = test_data
        self.recommendations = recommendations
    
    def precision_at_k(self, k=10):
        """Precision@K"""
        precisions = []
        
        for user_id, true_items in self.test_data.items():
            if user_id in self.recommendations:
                rec_items = [item for item, score in self.recommendations[user_id][:k]]
                relevant_and_recommended = len(set(true_items) & set(rec_items))
                precision = relevant_and_recommended / k
                precisions.append(precision)
        
        return np.mean(precisions) if precisions else 0
    
    def recall_at_k(self, k=10):
        """Recall@K"""
        recalls = []
        
        for user_id, true_items in self.test_data.items():
            if user_id in self.recommendations and true_items:
                rec_items = [item for item, score in self.recommendations[user_id][:k]]
                relevant_and_recommended = len(set(true_items) & set(rec_items))
                recall = relevant_and_recommended / len(true_items)
                recalls.append(recall)
        
        return np.mean(recalls) if recalls else 0
    
    def ndcg_at_k(self, k=10):
        """Normalized Discounted Cumulative Gain@K"""
        ndcg_scores = []
        
        for user_id, true_items in self.test_data.items():
            if user_id in self.recommendations:
                rec_items = [item for item, score in self.recommendations[user_id][:k]]
                
                # DCG
                dcg = 0
                for i, item in enumerate(rec_items):
                    if item in true_items:
                        dcg += 1 / np.log2(i + 2)
                
                # Ideal DCG
                ideal_dcg = sum(1 / np.log2(i + 2) for i in range(min(len(true_items), k)))
                
                if ideal_dcg > 0:
                    ndcg_scores.append(dcg / ideal_dcg)
        
        return np.mean(ndcg_scores) if ndcg_scores else 0
    
    def map_at_k(self, k=10):
        """Mean Average Precision@K"""
        ap_scores = []
        
        for user_id, true_items in self.test_data.items():
            if user_id in self.recommendations and true_items:
                rec_items = [item for item, score in self.recommendations[user_id][:k]]
                
                # Calculate AP
                precision_sum = 0
                relevant_count = 0
                
                for i, item in enumerate(rec_items):
                    if item in true_items:
                        relevant_count += 1
                        precision_sum += relevant_count / (i + 1)
                
                ap = precision_sum / min(len(true_items), k) if true_items else 0
                ap_scores.append(ap)
        
        return np.mean(ap_scores) if ap_scores else 0
    
    def mrr(self):
        """Mean Reciprocal Rank"""
        rr_scores = []
        
        for user_id, true_items in self.test_data.items():
            if user_id in self.recommendations:
                rec_items = [item for item, score in self.recommendations[user_id]]
                
                # Find first relevant item
                for i, item in enumerate(rec_items):
                    if item in true_items:
                        rr_scores.append(1 / (i + 1))
                        break
        
        return np.mean(rr_scores) if rr_scores else 0
    
    def coverage(self, all_items):
        """Catalog coverage"""
        recommended_items = set()
        
        for user_id, recs in self.recommendations.items():
            for item, score in recs:
                recommended_items.add(item)
        
        return len(recommended_items) / len(all_items)
    
    def diversity(self, recommendations):
        """Recommendation diversity"""
        if len(recommendations) < 2:
            return 0
        
        # Calculate pairwise dissimilarity
        items = [item for item, score in recommendations]
        
        # This would use actual item similarity
        # Simplified version
        return 0.5
    
    def evaluate_all(self, all_items):
        """Run all evaluation metrics"""
        results = {
            'precision_at_10': self.precision_at_k(10),
            'recall_at_10': self.recall_at_k(10),
            'ndcg_at_10': self.ndcg_at_k(10),
            'map_at_10': self.map_at_k(10),
            'mrr': self.mrr(),
            'coverage': self.coverage(all_items)
        }
        
        return results

# Example usage
# evaluator = RecommenderEvaluator(test_data, recommendations)
# results = evaluator.evaluate_all(all_items)

πŸ’‘

Pro Tip: Balance between accuracy and diversity. Highly accurate recommendations can create filter bubbles. Consider adding diversity metrics and exploration strategies.

6. Common Follow-Up Questions

Follow-up 1: How do you handle scalability for millions of users?

class ScalableRecommender:
    """Scalable recommendation system"""
    
    def __init__(self):
        self.index = None
    
    def build_ann_index(self, item_factors):
        """Build Approximate Nearest Neighbor index"""
        import faiss
        
        # Normalize vectors
        faiss.normalize_L2(item_factors)
        
        # Build index
        dimension = item_factors.shape[1]
        n_items = item_factors.shape[0]
        
        # Use IVF index for large datasets
        nlist = int(np.sqrt(n_items))  # Number of clusters
        quantizer = faiss.IndexFlatL2(dimension)
        self.index = faiss.IndexIVFFlat(quantizer, dimension, nlist)
        
        # Train and add vectors
        self.index.train(item_factors)
        self.index.add(item_factors)
        
        # Set search parameters
        self.index.nprobe = 10  # Number of clusters to search
        
        return self.index
    
    def recommend_ann(self, user_factor, k=10):
        """Fast recommendations using ANN"""
        import faiss
        
        # Normalize query
        query = user_factor.reshape(1, -1).astype('float32')
        faiss.normalize_L2(query)
        
        # Search
        distances, indices = self.index.search(query, k)
        
        return indices[0], distances[0]
    
    def distributed_recommendations(self, user_ids, n_workers=4):
        """Distribute recommendation computation"""
        from multiprocessing import Pool
        
        # Split users across workers
        user_chunks = np.array_split(user_ids, n_workers)
        
        with Pool(n_workers) as pool:
            results = pool.map(self.recommend_batch, user_chunks)
        
        # Combine results
        all_recommendations = {}
        for result in results:
            all_recommendations.update(result)
        
        return all_recommendations
    
    def recommend_batch(self, user_ids):
        """Recommend for a batch of users"""
        recommendations = {}
        
        for user_id in user_ids:
            # Get user recommendations
            recs = self.recommend_user(user_id)
            recommendations[user_id] = recs
        
        return recommendations

# Example usage
# scalable = ScalableRecommender()
# scalable.build_ann_index(item_factors)
# fast_recs = scalable.recommend_ann(user_factor, k=10)

Company-Specific Tips

ℹ️

Netflix Tips:

  • Netflix uses a combination of collaborative filtering, content-based, and deep learning
  • Focus on metrics like NDCG and watch time
  • Understand how to handle the cold start for new content
  • Be familiar with A/B testing for recommendation algorithms

Amazon Tips:

  • Amazon values real-time recommendations
  • Know how to implement "Customers who bought this also bought"
  • Understand item-to-item collaborative filtering
  • Be comfortable with large-scale matrix factorization

Quiz Section


Related Topics

Advertisement