The Interview Question
βΉοΈ
Question: You're building a spam classifier for an email platform:
- Dataset: 1M emails, 2% spam rate (imbalanced)
- Features: email text, sender info, metadata
- Requirements: High precision (minimize false positives), real-time inference
Walk through your classification approach:
- How do you handle the class imbalance?
- Which algorithms would you consider and why?
- How do you evaluate the model given the business requirements?
- How do you deploy for real-time inference?
Detailed Answer
1. Handling Class Imbalance
Class imbalance is one of the most common challenges in classification problems. With 2% spam rate, a naive classifier predicting all emails as "not spam" would achieve 98% accuracy but be useless.
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (accuracy_score, precision_score, recall_score,
f1_score, roc_auc_score, confusion_matrix,
classification_report, precision_recall_curve,
average_precision_score)
from imblearn.over_sampling import SMOTE, ADASYN
from imblearn.under_sampling import RandomUnderSampler, TomekLinks
from imblearn.combine import SMOTETomek, SMOTEENN
from imblearn.ensemble import BalancedRandomForestClassifier, BalancedBaggingClassifier
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')
class ImbalanceHandler:
"""Handle class imbalance in classification"""
def __init__(self, X, y):
self.X = X
self.y = y
self.class_distribution = pd.Series(y).value_counts()
def analyze_imbalance(self):
"""Analyze class distribution"""
total = len(self.y)
analysis = {}
for class_label, count in self.class_distribution.items():
analysis[class_label] = {
'count': count,
'percentage': count / total * 100,
'imbalance_ratio': self.class_distribution.min() / count
}
print("Class Distribution Analysis")
print("=" * 60)
for label, stats in analysis.items():
print(f"Class {label}: {stats['count']:,} ({stats['percentage']:.2f}%)")
print(f"\nImbalance Ratio: {analysis[1]['imbalance_ratio']:.4f}")
print(f"Minority class is {1/analysis[1]['imbalance_ratio']:.1f}x smaller than majority")
return analysis
def apply_smote(self, sampling_strategy='auto', k_neighbors=5):
"""Apply SMOTE (Synthetic Minority Over-sampling Technique)"""
smote = SMOTE(
sampling_strategy=sampling_strategy,
k_neighbors=k_neighbors,
random_state=42
)
X_resampled, y_resampled = smote.fit_resample(self.X, self.y)
print(f"\nSMOTE Applied:")
print(f"Original: {pd.Series(self.y).value_counts().to_dict()}")
print(f"Resampled: {pd.Series(y_resampled).value_counts().to_dict()}")
return X_resampled, y_resampled
def apply_adasyn(self):
"""Apply ADASYN (Adaptive Synthetic Sampling)"""
adasyn = ADASYN(random_state=42)
X_resampled, y_resampled = adasyn.fit_resample(self.X, self.y)
print(f"\nADASYN Applied:")
print(f"Original: {pd.Series(self.y).value_counts().to_dict()}")
print(f"Resampled: {pd.Series(y_resampled).value_counts().to_dict()}")
return X_resampled, y_resampled
def apply_hybrid_sampling(self):
"""Apply hybrid sampling (SMOTE + Tomek Links)"""
smote_tomek = SMOTETomek(random_state=42)
X_resampled, y_resampled = smote_tomek.fit_resample(self.X, self.y)
print(f"\nHybrid Sampling Applied:")
print(f"Original: {pd.Series(self.y).value_counts().to_dict()}")
print(f"Resampled: {pd.Series(y_resampled).value_counts().to_dict()}")
return X_resampled, y_resampled
def apply_class_weights(self, model, X_train, y_train):
"""Train model with class weights"""
from sklearn.utils.class_weight import compute_class_weight
# Calculate class weights
classes = np.unique(y_train)
weights = compute_class_weight('balanced', classes=classes, y=y_train)
class_weights = dict(zip(classes, weights))
print(f"\nClass Weights: {class_weights}")
# Train with class weights
model.set_params(class_weight=class_weights)
model.fit(X_train, y_train)
return model, class_weights
def threshold_optimization(self, y_true, y_prob, metric='f1'):
"""Optimize classification threshold"""
precisions, recalls, thresholds = precision_recall_curve(y_true, y_prob)
# Calculate F1 for each threshold
f1_scores = 2 * (precisions * recalls) / (precisions + recalls + 1e-6)
# Find optimal threshold based on metric
if metric == 'f1':
optimal_idx = np.argmax(f1_scores)
elif metric == 'precision':
# Find threshold with precision >= 0.99 and maximum recall
valid_mask = precisions >= 0.99
if valid_mask.any():
optimal_idx = np.argmax(recalls[valid_mask])
else:
optimal_idx = np.argmax(precisions)
elif metric == 'recall':
optimal_idx = np.argmax(recalls)
optimal_threshold = thresholds[optimal_idx]
print(f"\nThreshold Optimization ({metric}):")
print(f"Optimal threshold: {optimal_threshold:.4f}")
print(f"Precision: {precisions[optimal_idx]:.4f}")
print(f"Recall: {recalls[optimal_idx]:.4f}")
print(f"F1: {f1_scores[optimal_idx]:.4f}")
return optimal_threshold, precisions, recalls, f1_scores, thresholds
# Example usage
# imbalance_handler = ImbalanceHandler(X, y)
# imbalance_handler.analyze_imbalance()
# X_resampled, y_resampled = imbalance_handler.apply_smote()
2. Classification Algorithms Comparison
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from catboost import CatBoostClassifier
import time
class AlgorithmComparison:
"""Compare multiple classification algorithms"""
def __init__(self):
self.models = {}
self.results = {}
def define_models(self):
"""Define models with hyperparameters"""
self.models = {
'logistic_regression': LogisticRegression(
max_iter=1000,
class_weight='balanced',
random_state=42
),
'decision_tree': DecisionTreeClassifier(
max_depth=10,
class_weight='balanced',
random_state=42
),
'random_forest': RandomForestClassifier(
n_estimators=100,
max_depth=10,
class_weight='balanced',
random_state=42,
n_jobs=-1
),
'gradient_boosting': GradientBoostingClassifier(
n_estimators=100,
max_depth=5,
learning_rate=0.1,
random_state=42
),
'xgboost': XGBClassifier(
n_estimators=100,
max_depth=5,
learning_rate=0.1,
scale_pos_weight=49, # For 2% spam rate
random_state=42,
use_label_encoder=False,
eval_metric='logloss'
),
'lightgbm': LGBMClassifier(
n_estimators=100,
max_depth=5,
learning_rate=0.1,
is_unbalance=True,
random_state=42,
verbose=-1
),
'catboost': CatBoostClassifier(
iterations=100,
depth=5,
learning_rate=0.1,
auto_class_weights='Balanced',
random_state=42,
verbose=0
)
}
return self.models
def compare_performance(self, X_train, y_train, X_test, y_test):
"""Compare all models"""
results = []
for name, model in self.models.items():
print(f"\nTraining {name}...")
start_time = time.time()
# Train
model.fit(X_train, y_train)
train_time = time.time() - start_time
# Predict
start_time = time.time()
y_pred = model.predict(X_test)
y_prob = model.predict_proba(X_test)[:, 1] if hasattr(model, 'predict_proba') else None
inference_time = time.time() - start_time
# Evaluate
result = {
'model': name,
'accuracy': accuracy_score(y_test, y_pred),
'precision': precision_score(y_test, y_pred),
'recall': recall_score(y_test, y_pred),
'f1': f1_score(y_test, y_pred),
'roc_auc': roc_auc_score(y_test, y_prob) if y_prob is not None else None,
'average_precision': average_precision_score(y_test, y_prob) if y_prob is not None else None,
'train_time': train_time,
'inference_time': inference_time
}
results.append(result)
print(f" Completed in {train_time:.2f}s")
self.results = pd.DataFrame(results)
return self.results
def visualize_comparison(self):
"""Visualize model comparison"""
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
metrics = ['accuracy', 'precision', 'recall', 'f1', 'roc_auc', 'average_precision']
for idx, metric in enumerate(metrics):
ax = axes[idx // 3, idx % 3]
bars = ax.barh(self.results['model'], self.results[metric])
ax.set_xlabel(metric.replace('_', ' ').title())
ax.set_title(f'{metric.replace("_", " ").title()} Comparison')
# Add value labels
for bar, value in zip(bars, self.results[metric]):
if pd.notna(value):
ax.text(value + 0.005, bar.get_y() + bar.get_height()/2,
f'{value:.3f}', va='center')
plt.tight_layout()
plt.savefig('model_comparison.png', dpi=150, bbox_inches='tight')
plt.show()
def recommend_model(self, requirement='precision'):
"""Recommend best model based on requirements"""
if requirement == 'precision':
best_model = self.results.loc[self.results['precision'].idxmax()]
elif requirement == 'recall':
best_model = self.results.loc[self.results['recall'].idxmax()]
elif requirement == 'f1':
best_model = self.results.loc[self.results['f1'].idxmax()]
elif requirement == 'speed':
best_model = self.results.loc[self.results['inference_time'].idxmin()]
elif requirement == 'balanced':
# F1 score as balanced metric
best_model = self.results.loc[self.results['f1'].idxmax()]
return best_model
# Example usage
# comparator = AlgorithmComparison()
# comparator.define_models()
# results = comparator.compare_performance(X_train, y_train, X_test, y_test)
# comparator.visualize_comparison()
# best = comparator.recommend_model(requirement='precision')
3. Comprehensive Evaluation Metrics
class ClassificationEvaluator:
"""Comprehensive classification evaluation"""
def __init__(self, y_true, y_pred, y_prob=None):
self.y_true = y_true
self.y_pred = y_pred
self.y_prob = y_prob
self.confusion = confusion_matrix(y_true, y_pred)
def calculate_all_metrics(self):
"""Calculate comprehensive metrics"""
tn, fp, fn, tp = self.confusion.ravel()
metrics = {
'accuracy': accuracy_score(self.y_true, self.y_pred),
'precision': precision_score(self.y_true, self.y_pred),
'recall': recall_score(self.y_true, self.y_pred),
'f1': f1_score(self.y_true, self.y_pred),
'specificity': tn / (tn + fp),
'npv': tn / (tn + fn) if (tn + fn) > 0 else 0, # Negative Predictive Value
'mcc': self._matthews_correlation(),
'balanced_accuracy': (recall_score(self.y_true, self.y_pred) +
tn / (tn + fp)) / 2
}
if self.y_prob is not None:
metrics['roc_auc'] = roc_auc_score(self.y_true, self.y_prob)
metrics['average_precision'] = average_precision_score(self.y_true, self.y_prob)
metrics['log_loss'] = self._log_loss()
# Business metrics
metrics['false_positive_rate'] = fp / (fp + tn) if (fp + tn) > 0 else 0
metrics['false_negative_rate'] = fn / (fn + tp) if (fn + tp) > 0 else 0
return metrics
def _matthews_correlation(self):
"""Calculate Matthews Correlation Coefficient"""
tn, fp, fn, tp = self.confusion.ravel()
numerator = (tp * tn) - (fp * fn)
denominator = np.sqrt((tp + fp) * (tp + fn) * (tn + fp) * (tn + fn))
return numerator / denominator if denominator > 0 else 0
def _log_loss(self):
"""Calculate log loss"""
from sklearn.metrics import log_loss
return log_loss(self.y_true, self.y_prob)
def plot_confusion_matrix(self, normalize=True):
"""Plot confusion matrix"""
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# Raw confusion matrix
sns.heatmap(self.confusion, annot=True, fmt='d', cmap='Blues', ax=axes[0])
axes[0].set_title('Confusion Matrix (Counts)')
axes[0].set_ylabel('Actual')
axes[0].set_xlabel('Predicted')
# Normalized confusion matrix
if normalize:
cm_normalized = self.confusion.astype('float') / self.confusion.sum(axis=1)[:, np.newaxis]
sns.heatmap(cm_normalized, annot=True, fmt='.2%', cmap='Blues', ax=axes[1])
axes[1].set_title('Confusion Matrix (Normalized)')
axes[1].set_ylabel('Actual')
axes[1].set_xlabel('Predicted')
plt.tight_layout()
plt.savefig('confusion_matrix.png', dpi=150, bbox_inches='tight')
plt.show()
def plot_precision_recall_curve(self):
"""Plot precision-recall curve"""
if self.y_prob is None:
print("Probability estimates required for PR curve")
return
precisions, recalls, thresholds = precision_recall_curve(self.y_true, self.y_prob)
plt.figure(figsize=(8, 6))
plt.plot(recalls, precisions, color='blue', linewidth=2)
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.grid(True, alpha=0.3)
# Find and mark F1-optimal threshold
f1_scores = 2 * (precisions * recalls) / (precisions + recalls + 1e-6)
optimal_idx = np.argmax(f1_scores)
plt.scatter(recalls[optimal_idx], precisions[optimal_idx],
color='red', s=100, label=f'Optimal F1 Threshold: {thresholds[optimal_idx]:.3f}')
plt.legend()
plt.tight_layout()
plt.savefig('pr_curve.png', dpi=150, bbox_inches='tight')
plt.show()
def calculate_cost_matrix(self, cost_fp, cost_fn, cost_tp=0, cost_tn=0):
"""Calculate business cost of predictions"""
tn, fp, fn, tp = self.confusion.ravel()
total_cost = (fp * cost_fp + fn * cost_fn +
tp * cost_tp + tn * cost_tn)
cost_per_sample = total_cost / len(self.y_true)
print(f"Cost Analysis:")
print(f" False Positives: {fp:,} Γ ${cost_fp} = ${fp * cost_fp:,.2f}")
print(f" False Negatives: {fn:,} Γ ${cost_fn} = ${fn * cost_fn:,.2f}")
print(f" True Positives: {tp:,} Γ ${cost_tp} = ${tp * cost_tp:,.2f}")
print(f" True Negatives: {tn:,} Γ ${cost_tn} = ${tn * cost_tn:,.2f}")
print(f" Total Cost: ${total_cost:,.2f}")
print(f" Cost per Sample: ${cost_per_sample:.4f}")
return total_cost, cost_per_sample
# Example usage for spam detection
# evaluator = ClassificationEvaluator(y_test, y_pred, y_prob)
# metrics = evaluator.calculate_all_metrics()
# evaluator.plot_confusion_matrix()
# evaluator.plot_precision_recall_curve()
# total_cost, cost_per_sample = evaluator.calculate_cost_matrix(
# cost_fp=0.10, # Cost of marking legitimate email as spam
# cost_fn=0.01 # Cost of letting spam through
# )
4. Real-World Application: Spam Detection
class SpamDetector:
"""Complete spam detection system"""
def __init__(self):
self.model = None
self.threshold = 0.5
self.feature_names = None
def build_pipeline(self):
"""Build complete ML pipeline"""
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.compose import ColumnTransformer
# Text features
text_transformer = Pipeline([
('tfidf', TfidfVectorizer(max_features=5000, ngram_range=(1, 2)))
])
# Numeric features
numeric_transformer = Pipeline([
('scaler', StandardScaler())
])
# Combine
preprocessor = ColumnTransformer([
('text', text_transformer, 'email_text'),
('numeric', numeric_transformer, ['sender_reputation', 'num_links', 'num_attachments'])
])
# Full pipeline
pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', XGBClassifier(
n_estimators=100,
max_depth=5,
scale_pos_weight=49,
random_state=42
))
])
return pipeline
def train_with_imbalance_handling(self, X_train, y_train):
"""Train with proper imbalance handling"""
# Analyze imbalance
imbalance_handler = ImbalanceHandler(X_train, y_train)
imbalance_handler.analyze_imbalance()
# Apply SMOTE
X_resampled, y_resampled = imbalance_handler.apply_smote()
# Train model
pipeline = self.build_pipeline()
pipeline.fit(X_resampled, y_resampled)
self.model = pipeline
return pipeline
def predict_with_optimal_threshold(self, X, target_precision=0.99):
"""Predict with optimized threshold"""
y_prob = self.model.predict_proba(X)[:, 1]
# Find threshold for target precision
from sklearn.metrics import precision_recall_curve
precisions, recalls, thresholds = precision_recall_curve(
np.zeros_like(y_prob), y_prob # Dummy y_true for threshold finding
)
# Find threshold that gives desired precision
valid_mask = precisions >= target_precision
if valid_mask.any():
optimal_idx = np.argmax(recalls[valid_mask])
self.threshold = thresholds[optimal_idx]
# Apply threshold
y_pred = (y_prob >= self.threshold).astype(int)
return y_pred, y_prob
def evaluate_production(self, X_test, y_test):
"""Evaluate model for production readiness"""
y_pred, y_prob = self.predict_with_optimal_threshold(X_test)
evaluator = ClassificationEvaluator(y_test, y_pred, y_prob)
metrics = evaluator.calculate_all_metrics()
print("Production Evaluation")
print("=" * 60)
print(f"Threshold: {self.threshold:.4f}")
print(f"Precision: {metrics['precision']:.4f}")
print(f"Recall: {metrics['recall']:.4f}")
print(f"F1: {metrics['f1']:.4f}")
print(f"ROC-AUC: {metrics['roc_auc']:.4f}")
# Business requirements check
requirements_met = {
'precision_99': metrics['precision'] >= 0.99,
'recall_80': metrics['recall'] >= 0.80,
'f1_85': metrics['f1'] >= 0.85
}
print(f"\nRequirements Check:")
for req, met in requirements_met.items():
print(f" {req}: {'β' if met else 'β'}")
return metrics, requirements_met
# Example usage
# spam_detector = SpamDetector()
# spam_detector.train_with_imbalance_handling(X_train, y_train)
# metrics, requirements = spam_detector.evaluate_production(X_test, y_test)
π‘
Pro Tip: For spam detection, precision is often more critical than recall. Marking legitimate email as spam (false positive) is much worse than letting some spam through (false negative).
5. Common Follow-Up Questions
Follow-up 1: How do you handle concept drift in production?
class DriftDetector:
"""Detect and handle concept drift"""
def __init__(self, reference_data, reference_labels):
self.reference_data = reference_data
self.reference_labels = reference_labels
self.drift_history = []
def detect_data_drift(self, new_data, threshold=0.05):
"""Detect data drift using KS test"""
from scipy.stats import ks_2samp
drift_results = {}
for i in range(self.reference_data.shape[1]):
stat, p_value = ks_2samp(
self.reference_data[:, i],
new_data[:, i]
)
drift_results[f'feature_{i}'] = {
'ks_statistic': stat,
'p_value': p_value,
'drifted': p_value < threshold
}
return drift_results
def detect_concept_drift(self, new_data, new_labels, window_size=1000):
"""Detect concept drift using performance monitoring"""
# Calculate performance on new data
new_predictions = self.model.predict(new_data)
new_accuracy = accuracy_score(new_labels, new_predictions)
# Compare with reference performance
ref_predictions = self.model.predict(self.reference_data)
ref_accuracy = accuracy_score(self.reference_labels, ref_predictions)
# Statistical test for performance difference
from scipy.stats import proportion_confint
ref_ci = proportion_confint(
ref_predictions.sum(), len(ref_predictions), alpha=0.05
)
drifted = new_accuracy < ref_ci[0] # Below lower confidence bound
return {
'reference_accuracy': ref_accuracy,
'new_accuracy': new_accuracy,
'reference_ci': ref_ci,
'concept_drifted': drifted
}
def retrain_if_needed(self, new_data, new_labels, drift_detected):
"""Retrain model if drift detected"""
if drift_detected:
print("Concept drift detected. Retraining model...")
# Combine old and new data
combined_data = np.vstack([self.reference_data, new_data])
combined_labels = np.concatenate([self.reference_labels, new_labels])
# Retrain
self.model.fit(combined_data, combined_labels)
# Update reference data
self.reference_data = new_data
self.reference_labels = new_labels
print("Model retrained successfully")
return self.model
Follow-up 2: How do you interpret model predictions for stakeholders?
def explain_prediction(model, X_sample, feature_names, method='shap'):
"""Explain individual predictions"""
if method == 'shap':
import shap
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_sample)
# Plot
shap.initjs()
shap.force_plot(explainer.expected_value, shap_values[0], X_sample[0])
# Summary
shap.summary_plot(shap_values, X_sample, feature_names=feature_names)
elif method == 'lime':
from lime.lime_tabular import LimeTabularExplainer
explainer = LimeTabularExplainer(
X_sample,
feature_names=feature_names,
class_names=['Not Spam', 'Spam'],
mode='classification'
)
# Explain prediction
explanation = explainer.explain_instance(
X_sample[0],
model.predict_proba,
num_features=10
)
explanation.show_in_notebook()
elif method == 'eli5':
import eli5
eli5.show_weights(model, feature_names=feature_names)
return explanation
Company-Specific Tips
βΉοΈ
Meta Tips:
- Meta values scalable classification systems
- Know how to handle multi-class and multi-label classification
- Be comfortable with online learning for classification
- Understand fairness in classification (demographic parity, equalized odds)
Microsoft Tips:
- Microsoft focuses on enterprise ML systems
- Know how to deploy classification models at scale
- Be comfortable with Azure ML and cognitive services
- Understand responsible AI principles
Quiz Section
Related Topics
- Imbalanced Learning β Probability-based approaches
- Model Deployment β Production ML systems
- Explainable AI β Model interpretability
- Fairness in ML β Ethical considerations