πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Classification: Algorithms, Evaluation, Imbalanced Data

Data Science Interview PremiumClassification⭐ Premium

Advertisement

META & MICROSOFT INTERVIEW QUESTION

Classification: Algorithms, Evaluation, Imbalanced Data

Supervised Learning & Model Evaluation

The Interview Question

ℹ️

Question: You're building a spam classifier for an email platform:

  • Dataset: 1M emails, 2% spam rate (imbalanced)
  • Features: email text, sender info, metadata
  • Requirements: High precision (minimize false positives), real-time inference

Walk through your classification approach:

  1. How do you handle the class imbalance?
  2. Which algorithms would you consider and why?
  3. How do you evaluate the model given the business requirements?
  4. How do you deploy for real-time inference?

Detailed Answer

1. Handling Class Imbalance

Class imbalance is one of the most common challenges in classification problems. With 2% spam rate, a naive classifier predicting all emails as "not spam" would achieve 98% accuracy but be useless.

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (accuracy_score, precision_score, recall_score, 
                            f1_score, roc_auc_score, confusion_matrix,
                            classification_report, precision_recall_curve,
                            average_precision_score)
from imblearn.over_sampling import SMOTE, ADASYN
from imblearn.under_sampling import RandomUnderSampler, TomekLinks
from imblearn.combine import SMOTETomek, SMOTEENN
from imblearn.ensemble import BalancedRandomForestClassifier, BalancedBaggingClassifier
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')

class ImbalanceHandler:
    """Handle class imbalance in classification"""
    
    def __init__(self, X, y):
        self.X = X
        self.y = y
        self.class_distribution = pd.Series(y).value_counts()
    
    def analyze_imbalance(self):
        """Analyze class distribution"""
        total = len(self.y)
        analysis = {}
        
        for class_label, count in self.class_distribution.items():
            analysis[class_label] = {
                'count': count,
                'percentage': count / total * 100,
                'imbalance_ratio': self.class_distribution.min() / count
            }
        
        print("Class Distribution Analysis")
        print("=" * 60)
        for label, stats in analysis.items():
            print(f"Class {label}: {stats['count']:,} ({stats['percentage']:.2f}%)")
        
        print(f"\nImbalance Ratio: {analysis[1]['imbalance_ratio']:.4f}")
        print(f"Minority class is {1/analysis[1]['imbalance_ratio']:.1f}x smaller than majority")
        
        return analysis
    
    def apply_smote(self, sampling_strategy='auto', k_neighbors=5):
        """Apply SMOTE (Synthetic Minority Over-sampling Technique)"""
        smote = SMOTE(
            sampling_strategy=sampling_strategy,
            k_neighbors=k_neighbors,
            random_state=42
        )
        X_resampled, y_resampled = smote.fit_resample(self.X, self.y)
        
        print(f"\nSMOTE Applied:")
        print(f"Original: {pd.Series(self.y).value_counts().to_dict()}")
        print(f"Resampled: {pd.Series(y_resampled).value_counts().to_dict()}")
        
        return X_resampled, y_resampled
    
    def apply_adasyn(self):
        """Apply ADASYN (Adaptive Synthetic Sampling)"""
        adasyn = ADASYN(random_state=42)
        X_resampled, y_resampled = adasyn.fit_resample(self.X, self.y)
        
        print(f"\nADASYN Applied:")
        print(f"Original: {pd.Series(self.y).value_counts().to_dict()}")
        print(f"Resampled: {pd.Series(y_resampled).value_counts().to_dict()}")
        
        return X_resampled, y_resampled
    
    def apply_hybrid_sampling(self):
        """Apply hybrid sampling (SMOTE + Tomek Links)"""
        smote_tomek = SMOTETomek(random_state=42)
        X_resampled, y_resampled = smote_tomek.fit_resample(self.X, self.y)
        
        print(f"\nHybrid Sampling Applied:")
        print(f"Original: {pd.Series(self.y).value_counts().to_dict()}")
        print(f"Resampled: {pd.Series(y_resampled).value_counts().to_dict()}")
        
        return X_resampled, y_resampled
    
    def apply_class_weights(self, model, X_train, y_train):
        """Train model with class weights"""
        from sklearn.utils.class_weight import compute_class_weight
        
        # Calculate class weights
        classes = np.unique(y_train)
        weights = compute_class_weight('balanced', classes=classes, y=y_train)
        class_weights = dict(zip(classes, weights))
        
        print(f"\nClass Weights: {class_weights}")
        
        # Train with class weights
        model.set_params(class_weight=class_weights)
        model.fit(X_train, y_train)
        
        return model, class_weights
    
    def threshold_optimization(self, y_true, y_prob, metric='f1'):
        """Optimize classification threshold"""
        precisions, recalls, thresholds = precision_recall_curve(y_true, y_prob)
        
        # Calculate F1 for each threshold
        f1_scores = 2 * (precisions * recalls) / (precisions + recalls + 1e-6)
        
        # Find optimal threshold based on metric
        if metric == 'f1':
            optimal_idx = np.argmax(f1_scores)
        elif metric == 'precision':
            # Find threshold with precision >= 0.99 and maximum recall
            valid_mask = precisions >= 0.99
            if valid_mask.any():
                optimal_idx = np.argmax(recalls[valid_mask])
            else:
                optimal_idx = np.argmax(precisions)
        elif metric == 'recall':
            optimal_idx = np.argmax(recalls)
        
        optimal_threshold = thresholds[optimal_idx]
        
        print(f"\nThreshold Optimization ({metric}):")
        print(f"Optimal threshold: {optimal_threshold:.4f}")
        print(f"Precision: {precisions[optimal_idx]:.4f}")
        print(f"Recall: {recalls[optimal_idx]:.4f}")
        print(f"F1: {f1_scores[optimal_idx]:.4f}")
        
        return optimal_threshold, precisions, recalls, f1_scores, thresholds

# Example usage
# imbalance_handler = ImbalanceHandler(X, y)
# imbalance_handler.analyze_imbalance()
# X_resampled, y_resampled = imbalance_handler.apply_smote()

2. Classification Algorithms Comparison

from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from catboost import CatBoostClassifier
import time

class AlgorithmComparison:
    """Compare multiple classification algorithms"""
    
    def __init__(self):
        self.models = {}
        self.results = {}
    
    def define_models(self):
        """Define models with hyperparameters"""
        self.models = {
            'logistic_regression': LogisticRegression(
                max_iter=1000,
                class_weight='balanced',
                random_state=42
            ),
            'decision_tree': DecisionTreeClassifier(
                max_depth=10,
                class_weight='balanced',
                random_state=42
            ),
            'random_forest': RandomForestClassifier(
                n_estimators=100,
                max_depth=10,
                class_weight='balanced',
                random_state=42,
                n_jobs=-1
            ),
            'gradient_boosting': GradientBoostingClassifier(
                n_estimators=100,
                max_depth=5,
                learning_rate=0.1,
                random_state=42
            ),
            'xgboost': XGBClassifier(
                n_estimators=100,
                max_depth=5,
                learning_rate=0.1,
                scale_pos_weight=49,  # For 2% spam rate
                random_state=42,
                use_label_encoder=False,
                eval_metric='logloss'
            ),
            'lightgbm': LGBMClassifier(
                n_estimators=100,
                max_depth=5,
                learning_rate=0.1,
                is_unbalance=True,
                random_state=42,
                verbose=-1
            ),
            'catboost': CatBoostClassifier(
                iterations=100,
                depth=5,
                learning_rate=0.1,
                auto_class_weights='Balanced',
                random_state=42,
                verbose=0
            )
        }
        
        return self.models
    
    def compare_performance(self, X_train, y_train, X_test, y_test):
        """Compare all models"""
        results = []
        
        for name, model in self.models.items():
            print(f"\nTraining {name}...")
            start_time = time.time()
            
            # Train
            model.fit(X_train, y_train)
            train_time = time.time() - start_time
            
            # Predict
            start_time = time.time()
            y_pred = model.predict(X_test)
            y_prob = model.predict_proba(X_test)[:, 1] if hasattr(model, 'predict_proba') else None
            inference_time = time.time() - start_time
            
            # Evaluate
            result = {
                'model': name,
                'accuracy': accuracy_score(y_test, y_pred),
                'precision': precision_score(y_test, y_pred),
                'recall': recall_score(y_test, y_pred),
                'f1': f1_score(y_test, y_pred),
                'roc_auc': roc_auc_score(y_test, y_prob) if y_prob is not None else None,
                'average_precision': average_precision_score(y_test, y_prob) if y_prob is not None else None,
                'train_time': train_time,
                'inference_time': inference_time
            }
            
            results.append(result)
            print(f"  Completed in {train_time:.2f}s")
        
        self.results = pd.DataFrame(results)
        return self.results
    
    def visualize_comparison(self):
        """Visualize model comparison"""
        fig, axes = plt.subplots(2, 3, figsize=(15, 10))
        
        metrics = ['accuracy', 'precision', 'recall', 'f1', 'roc_auc', 'average_precision']
        
        for idx, metric in enumerate(metrics):
            ax = axes[idx // 3, idx % 3]
            bars = ax.barh(self.results['model'], self.results[metric])
            ax.set_xlabel(metric.replace('_', ' ').title())
            ax.set_title(f'{metric.replace("_", " ").title()} Comparison')
            
            # Add value labels
            for bar, value in zip(bars, self.results[metric]):
                if pd.notna(value):
                    ax.text(value + 0.005, bar.get_y() + bar.get_height()/2,
                           f'{value:.3f}', va='center')
        
        plt.tight_layout()
        plt.savefig('model_comparison.png', dpi=150, bbox_inches='tight')
        plt.show()
    
    def recommend_model(self, requirement='precision'):
        """Recommend best model based on requirements"""
        if requirement == 'precision':
            best_model = self.results.loc[self.results['precision'].idxmax()]
        elif requirement == 'recall':
            best_model = self.results.loc[self.results['recall'].idxmax()]
        elif requirement == 'f1':
            best_model = self.results.loc[self.results['f1'].idxmax()]
        elif requirement == 'speed':
            best_model = self.results.loc[self.results['inference_time'].idxmin()]
        elif requirement == 'balanced':
            # F1 score as balanced metric
            best_model = self.results.loc[self.results['f1'].idxmax()]
        
        return best_model

# Example usage
# comparator = AlgorithmComparison()
# comparator.define_models()
# results = comparator.compare_performance(X_train, y_train, X_test, y_test)
# comparator.visualize_comparison()
# best = comparator.recommend_model(requirement='precision')

3. Comprehensive Evaluation Metrics

class ClassificationEvaluator:
    """Comprehensive classification evaluation"""
    
    def __init__(self, y_true, y_pred, y_prob=None):
        self.y_true = y_true
        self.y_pred = y_pred
        self.y_prob = y_prob
        self.confusion = confusion_matrix(y_true, y_pred)
    
    def calculate_all_metrics(self):
        """Calculate comprehensive metrics"""
        tn, fp, fn, tp = self.confusion.ravel()
        
        metrics = {
            'accuracy': accuracy_score(self.y_true, self.y_pred),
            'precision': precision_score(self.y_true, self.y_pred),
            'recall': recall_score(self.y_true, self.y_pred),
            'f1': f1_score(self.y_true, self.y_pred),
            'specificity': tn / (tn + fp),
            'npv': tn / (tn + fn) if (tn + fn) > 0 else 0,  # Negative Predictive Value
            'mcc': self._matthews_correlation(),
            'balanced_accuracy': (recall_score(self.y_true, self.y_pred) + 
                                 tn / (tn + fp)) / 2
        }
        
        if self.y_prob is not None:
            metrics['roc_auc'] = roc_auc_score(self.y_true, self.y_prob)
            metrics['average_precision'] = average_precision_score(self.y_true, self.y_prob)
            metrics['log_loss'] = self._log_loss()
        
        # Business metrics
        metrics['false_positive_rate'] = fp / (fp + tn) if (fp + tn) > 0 else 0
        metrics['false_negative_rate'] = fn / (fn + tp) if (fn + tp) > 0 else 0
        
        return metrics
    
    def _matthews_correlation(self):
        """Calculate Matthews Correlation Coefficient"""
        tn, fp, fn, tp = self.confusion.ravel()
        
        numerator = (tp * tn) - (fp * fn)
        denominator = np.sqrt((tp + fp) * (tp + fn) * (tn + fp) * (tn + fn))
        
        return numerator / denominator if denominator > 0 else 0
    
    def _log_loss(self):
        """Calculate log loss"""
        from sklearn.metrics import log_loss
        return log_loss(self.y_true, self.y_prob)
    
    def plot_confusion_matrix(self, normalize=True):
        """Plot confusion matrix"""
        fig, axes = plt.subplots(1, 2, figsize=(12, 5))
        
        # Raw confusion matrix
        sns.heatmap(self.confusion, annot=True, fmt='d', cmap='Blues', ax=axes[0])
        axes[0].set_title('Confusion Matrix (Counts)')
        axes[0].set_ylabel('Actual')
        axes[0].set_xlabel('Predicted')
        
        # Normalized confusion matrix
        if normalize:
            cm_normalized = self.confusion.astype('float') / self.confusion.sum(axis=1)[:, np.newaxis]
            sns.heatmap(cm_normalized, annot=True, fmt='.2%', cmap='Blues', ax=axes[1])
            axes[1].set_title('Confusion Matrix (Normalized)')
            axes[1].set_ylabel('Actual')
            axes[1].set_xlabel('Predicted')
        
        plt.tight_layout()
        plt.savefig('confusion_matrix.png', dpi=150, bbox_inches='tight')
        plt.show()
    
    def plot_precision_recall_curve(self):
        """Plot precision-recall curve"""
        if self.y_prob is None:
            print("Probability estimates required for PR curve")
            return
        
        precisions, recalls, thresholds = precision_recall_curve(self.y_true, self.y_prob)
        
        plt.figure(figsize=(8, 6))
        plt.plot(recalls, precisions, color='blue', linewidth=2)
        plt.xlabel('Recall')
        plt.ylabel('Precision')
        plt.title('Precision-Recall Curve')
        plt.grid(True, alpha=0.3)
        
        # Find and mark F1-optimal threshold
        f1_scores = 2 * (precisions * recalls) / (precisions + recalls + 1e-6)
        optimal_idx = np.argmax(f1_scores)
        plt.scatter(recalls[optimal_idx], precisions[optimal_idx], 
                   color='red', s=100, label=f'Optimal F1 Threshold: {thresholds[optimal_idx]:.3f}')
        plt.legend()
        
        plt.tight_layout()
        plt.savefig('pr_curve.png', dpi=150, bbox_inches='tight')
        plt.show()
    
    def calculate_cost_matrix(self, cost_fp, cost_fn, cost_tp=0, cost_tn=0):
        """Calculate business cost of predictions"""
        tn, fp, fn, tp = self.confusion.ravel()
        
        total_cost = (fp * cost_fp + fn * cost_fn + 
                     tp * cost_tp + tn * cost_tn)
        
        cost_per_sample = total_cost / len(self.y_true)
        
        print(f"Cost Analysis:")
        print(f"  False Positives: {fp:,} Γ— ${cost_fp} = ${fp * cost_fp:,.2f}")
        print(f"  False Negatives: {fn:,} Γ— ${cost_fn} = ${fn * cost_fn:,.2f}")
        print(f"  True Positives: {tp:,} Γ— ${cost_tp} = ${tp * cost_tp:,.2f}")
        print(f"  True Negatives: {tn:,} Γ— ${cost_tn} = ${tn * cost_tn:,.2f}")
        print(f"  Total Cost: ${total_cost:,.2f}")
        print(f"  Cost per Sample: ${cost_per_sample:.4f}")
        
        return total_cost, cost_per_sample

# Example usage for spam detection
# evaluator = ClassificationEvaluator(y_test, y_pred, y_prob)
# metrics = evaluator.calculate_all_metrics()
# evaluator.plot_confusion_matrix()
# evaluator.plot_precision_recall_curve()
# total_cost, cost_per_sample = evaluator.calculate_cost_matrix(
#     cost_fp=0.10,  # Cost of marking legitimate email as spam
#     cost_fn=0.01   # Cost of letting spam through
# )

4. Real-World Application: Spam Detection

class SpamDetector:
    """Complete spam detection system"""
    
    def __init__(self):
        self.model = None
        self.threshold = 0.5
        self.feature_names = None
    
    def build_pipeline(self):
        """Build complete ML pipeline"""
        from sklearn.pipeline import Pipeline
        from sklearn.feature_extraction.text import TfidfVectorizer
        from sklearn.compose import ColumnTransformer
        
        # Text features
        text_transformer = Pipeline([
            ('tfidf', TfidfVectorizer(max_features=5000, ngram_range=(1, 2)))
        ])
        
        # Numeric features
        numeric_transformer = Pipeline([
            ('scaler', StandardScaler())
        ])
        
        # Combine
        preprocessor = ColumnTransformer([
            ('text', text_transformer, 'email_text'),
            ('numeric', numeric_transformer, ['sender_reputation', 'num_links', 'num_attachments'])
        ])
        
        # Full pipeline
        pipeline = Pipeline([
            ('preprocessor', preprocessor),
            ('classifier', XGBClassifier(
                n_estimators=100,
                max_depth=5,
                scale_pos_weight=49,
                random_state=42
            ))
        ])
        
        return pipeline
    
    def train_with_imbalance_handling(self, X_train, y_train):
        """Train with proper imbalance handling"""
        # Analyze imbalance
        imbalance_handler = ImbalanceHandler(X_train, y_train)
        imbalance_handler.analyze_imbalance()
        
        # Apply SMOTE
        X_resampled, y_resampled = imbalance_handler.apply_smote()
        
        # Train model
        pipeline = self.build_pipeline()
        pipeline.fit(X_resampled, y_resampled)
        
        self.model = pipeline
        
        return pipeline
    
    def predict_with_optimal_threshold(self, X, target_precision=0.99):
        """Predict with optimized threshold"""
        y_prob = self.model.predict_proba(X)[:, 1]
        
        # Find threshold for target precision
        from sklearn.metrics import precision_recall_curve
        precisions, recalls, thresholds = precision_recall_curve(
            np.zeros_like(y_prob), y_prob  # Dummy y_true for threshold finding
        )
        
        # Find threshold that gives desired precision
        valid_mask = precisions >= target_precision
        if valid_mask.any():
            optimal_idx = np.argmax(recalls[valid_mask])
            self.threshold = thresholds[optimal_idx]
        
        # Apply threshold
        y_pred = (y_prob >= self.threshold).astype(int)
        
        return y_pred, y_prob
    
    def evaluate_production(self, X_test, y_test):
        """Evaluate model for production readiness"""
        y_pred, y_prob = self.predict_with_optimal_threshold(X_test)
        
        evaluator = ClassificationEvaluator(y_test, y_pred, y_prob)
        metrics = evaluator.calculate_all_metrics()
        
        print("Production Evaluation")
        print("=" * 60)
        print(f"Threshold: {self.threshold:.4f}")
        print(f"Precision: {metrics['precision']:.4f}")
        print(f"Recall: {metrics['recall']:.4f}")
        print(f"F1: {metrics['f1']:.4f}")
        print(f"ROC-AUC: {metrics['roc_auc']:.4f}")
        
        # Business requirements check
        requirements_met = {
            'precision_99': metrics['precision'] >= 0.99,
            'recall_80': metrics['recall'] >= 0.80,
            'f1_85': metrics['f1'] >= 0.85
        }
        
        print(f"\nRequirements Check:")
        for req, met in requirements_met.items():
            print(f"  {req}: {'βœ“' if met else 'βœ—'}")
        
        return metrics, requirements_met

# Example usage
# spam_detector = SpamDetector()
# spam_detector.train_with_imbalance_handling(X_train, y_train)
# metrics, requirements = spam_detector.evaluate_production(X_test, y_test)

πŸ’‘

Pro Tip: For spam detection, precision is often more critical than recall. Marking legitimate email as spam (false positive) is much worse than letting some spam through (false negative).

5. Common Follow-Up Questions

Follow-up 1: How do you handle concept drift in production?

class DriftDetector:
    """Detect and handle concept drift"""
    
    def __init__(self, reference_data, reference_labels):
        self.reference_data = reference_data
        self.reference_labels = reference_labels
        self.drift_history = []
    
    def detect_data_drift(self, new_data, threshold=0.05):
        """Detect data drift using KS test"""
        from scipy.stats import ks_2samp
        
        drift_results = {}
        
        for i in range(self.reference_data.shape[1]):
            stat, p_value = ks_2samp(
                self.reference_data[:, i], 
                new_data[:, i]
            )
            drift_results[f'feature_{i}'] = {
                'ks_statistic': stat,
                'p_value': p_value,
                'drifted': p_value < threshold
            }
        
        return drift_results
    
    def detect_concept_drift(self, new_data, new_labels, window_size=1000):
        """Detect concept drift using performance monitoring"""
        # Calculate performance on new data
        new_predictions = self.model.predict(new_data)
        new_accuracy = accuracy_score(new_labels, new_predictions)
        
        # Compare with reference performance
        ref_predictions = self.model.predict(self.reference_data)
        ref_accuracy = accuracy_score(self.reference_labels, ref_predictions)
        
        # Statistical test for performance difference
        from scipy.stats import proportion_confint
        ref_ci = proportion_confint(
            ref_predictions.sum(), len(ref_predictions), alpha=0.05
        )
        
        drifted = new_accuracy < ref_ci[0]  # Below lower confidence bound
        
        return {
            'reference_accuracy': ref_accuracy,
            'new_accuracy': new_accuracy,
            'reference_ci': ref_ci,
            'concept_drifted': drifted
        }
    
    def retrain_if_needed(self, new_data, new_labels, drift_detected):
        """Retrain model if drift detected"""
        if drift_detected:
            print("Concept drift detected. Retraining model...")
            
            # Combine old and new data
            combined_data = np.vstack([self.reference_data, new_data])
            combined_labels = np.concatenate([self.reference_labels, new_labels])
            
            # Retrain
            self.model.fit(combined_data, combined_labels)
            
            # Update reference data
            self.reference_data = new_data
            self.reference_labels = new_labels
            
            print("Model retrained successfully")
        
        return self.model

Follow-up 2: How do you interpret model predictions for stakeholders?

def explain_prediction(model, X_sample, feature_names, method='shap'):
    """Explain individual predictions"""
    
    if method == 'shap':
        import shap
        explainer = shap.TreeExplainer(model)
        shap_values = explainer.shap_values(X_sample)
        
        # Plot
        shap.initjs()
        shap.force_plot(explainer.expected_value, shap_values[0], X_sample[0])
        
        # Summary
        shap.summary_plot(shap_values, X_sample, feature_names=feature_names)
    
    elif method == 'lime':
        from lime.lime_tabular import LimeTabularExplainer
        
        explainer = LimeTabularExplainer(
            X_sample,
            feature_names=feature_names,
            class_names=['Not Spam', 'Spam'],
            mode='classification'
        )
        
        # Explain prediction
        explanation = explainer.explain_instance(
            X_sample[0], 
            model.predict_proba,
            num_features=10
        )
        
        explanation.show_in_notebook()
    
    elif method == 'eli5':
        import eli5
        eli5.show_weights(model, feature_names=feature_names)
    
    return explanation

Company-Specific Tips

ℹ️

Meta Tips:

  • Meta values scalable classification systems
  • Know how to handle multi-class and multi-label classification
  • Be comfortable with online learning for classification
  • Understand fairness in classification (demographic parity, equalized odds)

Microsoft Tips:

  • Microsoft focuses on enterprise ML systems
  • Know how to deploy classification models at scale
  • Be comfortable with Azure ML and cognitive services
  • Understand responsible AI principles

Quiz Section


Related Topics

Advertisement