The Interview Question
βΉοΈ
Question: You're building a model to predict customer lifetime value (CLV) for an e-commerce platform:
- Dataset: 100K customers with 50 features
- Target: CLV (continuous, right-skewed)
- Requirements: Interpretable model, production-ready
Walk through your regression modeling process:
- How do you prepare the data for regression?
- Which regression techniques would you consider and why?
- How do you evaluate and diagnose your model?
- How do you handle issues like heteroscedasticity and multicollinearity?
Detailed Answer
1. Data Preparation for Regression
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
import warnings
warnings.filterwarnings('ignore')
class RegressionDataPreparer:
"""Prepare data for regression analysis"""
def __init__(self, df, target_column):
self.df = df.copy()
self.target = target_column
self.preparation_log = {}
def analyze_target_distribution(self):
"""Analyze and transform target variable"""
y = self.df[self.target]
# Basic statistics
stats_info = {
'mean': y.mean(),
'std': y.std(),
'skewness': y.skew(),
'kurtosis': y.kurtosis(),
'min': y.min(),
'max': y.max()
}
# Check for skewness
if abs(stats_info['skewness']) > 1:
print(f"Warning: Target is highly skewed ({stats_info['skewness']:.2f})")
print("Consider log transformation")
# Log transformation
if (y > 0).all():
self.df[f'{self.target}_log'] = np.log1p(y)
stats_info['log_skewness'] = self.df[f'{self.target}_log'].skew()
# Box-Cox transformation
if (y > 0).all():
transformed, lambda_val = stats.boxcox(y)
self.df[f'{self.target}_boxcox'] = transformed
stats_info['boxcox_lambda'] = lambda_val
stats_info['boxcox_skewness'] = pd.Series(transformed).skew()
self.preparation_log['target_analysis'] = stats_info
return stats_info
def handle_missing_values(self):
"""Handle missing values for regression"""
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
if col == self.target:
continue
missing_pct = self.df[col].isnull().mean() * 100
if missing_pct > 0:
if missing_pct < 5:
# Simple imputation
self.df[col] = self.df[col].fillna(self.df[col].median())
elif missing_pct < 20:
# KNN imputation
from sklearn.impute import KNNImputer
imputer = KNNImputer(n_neighbors=5)
self.df[col] = imputer.fit_transform(self.df[[col]])
else:
# Create missing indicator and impute
self.df[f'{col}_missing'] = self.df[col].isnull().astype(int)
self.df[col] = self.df[col].fillna(self.df[col].median())
return self
def handle_outliers(self, method='winsorize'):
"""Handle outliers in features"""
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
numeric_cols = [col for col in numeric_cols if col != self.target]
for col in numeric_cols:
if method == 'winsorize':
# Winsorization at 1st and 99th percentiles
lower = self.df[col].quantile(0.01)
upper = self.df[col].quantile(0.99)
self.df[col] = self.df[col].clip(lower, upper)
elif method == 'iqr':
Q1, Q3 = self.df[col].quantile(0.25), self.df[col].quantile(0.75)
IQR = Q3 - Q1
lower, upper = Q1 - 1.5 * IQR, Q3 + 1.5 * IQR
self.df[col] = self.df[col].clip(lower, upper)
return self
def encode_categoricals(self):
"""Encode categorical variables"""
categorical_cols = self.df.select_dtypes(include=['object', 'category']).columns
for col in categorical_cols:
if self.df[col].nunique() <= 10:
# One-hot encoding
dummies = pd.get_dummies(self.df[col], prefix=col, drop_first=True)
self.df = pd.concat([self.df, dummies], axis=1)
else:
# Target encoding
target_mean = self.df.groupby(col)[self.target].mean()
self.df[f'{col}_encoded'] = self.df[col].map(target_mean)
# Drop original categorical columns
self.df = self.df.drop(columns=categorical_cols)
return self
def create_interactions(self, important_features, max_degree=2):
"""Create interaction and polynomial features"""
poly = PolynomialFeatures(degree=max_degree, interaction_only=False, include_bias=False)
# Select only important features for interaction
X_poly = poly.fit_transform(self.df[important_features])
feature_names = poly.get_feature_names_out(important_features)
# Create DataFrame with polynomial features
poly_df = pd.DataFrame(X_poly, columns=feature_names, index=self.df.index)
# Drop duplicate columns
poly_df = poly_df.loc[:, ~poly_df.columns.duplicated()]
# Merge with original dataframe
self.df = pd.concat([self.df, poly_df], axis=1)
return self
def scale_features(self):
"""Scale numeric features"""
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
numeric_cols = [col for col in numeric_cols if col != self.target]
scaler = StandardScaler()
self.df[numeric_cols] = scaler.fit_transform(self.df[numeric_cols])
self.preparation_log['scaler'] = scaler
return self
def prepare_final_dataset(self):
"""Prepare final dataset for modeling"""
# Separate features and target
X = self.df.drop(columns=[self.target])
y = self.df[self.target]
# Handle any remaining NaN
X = X.fillna(0)
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
return X_train, X_test, y_train, y_test
# Example usage
# preparer = RegressionDataPreparer(df, 'customer_lifetime_value')
# preparer.analyze_target_distribution()
# preparer.handle_missing_values()
# preparer.handle_outliers()
# preparer.encode_categoricals()
# X_train, X_test, y_train, y_test = preparer.prepare_final_dataset()
2. Linear Regression and Assumptions
from sklearn.linear_model import LinearRegression
import statsmodels.api as sm
from statsmodels.stats.outliers_influence import variance_inflation_factor
class LinearRegressionAnalysis:
"""Comprehensive linear regression analysis"""
def __init__(self):
self.model = None
self.results = {}
def fit_ols(self, X, y):
"""Fit OLS regression with statsmodels for detailed diagnostics"""
X_with_const = sm.add_constant(X)
model = sm.OLS(y, X_with_const).fit()
self.model = model
return model
def check_assumptions(self, X, y):
"""Check all regression assumptions"""
assumptions = {}
# 1. Linearity
X_with_const = sm.add_constant(X)
model = sm.OLS(y, X_with_const).fit()
y_pred = model.predict(X_with_const)
residuals = y - y_pred
# Reset test for linearity
from statsmodels.stats.diagnostic import linear_reset
reset_test = linear_reset(model, power=3, use_f=True)
assumptions['linearity'] = {
'reset_f_statistic': reset_test.fvalue,
'reset_p_value': reset_test.pvalue,
'linear': reset_test.pvalue > 0.05
}
# 2. Independence (Durbin-Watson)
from statsmodels.stats.stattools import durbin_watson
dw_stat = durbin_watson(residuals)
assumptions['independence'] = {
'durbin_watson': dw_stat,
'independent': 1.5 < dw_stat < 2.5
}
# 3. Homoscedasticity (Breusch-Pagan)
from statsmodels.stats.diagnostic import het_breuschpagan
bp_test = het_breuschpagan(residuals, X_with_const)
assumptions['homoscedasticity'] = {
'bp_statistic': bp_test[0],
'bp_p_value': bp_test[1],
'homoscedastic': bp_test[1] > 0.05
}
# 4. Normality of residuals
_, shapiro_p = stats.shapiro(residuals.sample(min(5000, len(residuals))))
_, jarque_bera_p = stats.jarque_bera(residuals)
assumptions['normality'] = {
'shapiro_p_value': shapiro_p,
'jarque_bera_p_value': jarque_bera_p,
'normal': shapiro_p > 0.05 and jarque_bera_p > 0.05
}
# 5. Multicollinearity (VIF)
vif_data = pd.DataFrame()
vif_data['feature'] = X.columns
vif_data['VIF'] = [variance_inflation_factor(X.values, i) for i in range(X.shape[1])]
assumptions['multicollinearity'] = {
'vif_scores': vif_data.to_dict('records'),
'high_vif_features': vif_data[vif_data['VIF'] > 5]['feature'].tolist(),
'no_multicollinearity': (vif_data['VIF'] < 5).all()
}
self.results['assumptions'] = assumptions
return assumptions
def handle_violations(self, assumptions, X, y):
"""Handle assumption violations"""
recommendations = []
# Handle non-linearity
if not assumptions['linearity']['linear']:
recommendations.append("Consider polynomial features or non-linear model")
# Handle heteroscedasticity
if not assumptions['homoscedasticity']['homoscedastic']:
recommendations.append("Use weighted least squares or robust standard errors")
# Implement WLS
from statsmodels.regression.linear_model import WLS
weights = 1 / (residuals ** 2 + 1e-6)
# Handle multicollinearity
if not assumptions['multicollinearity']['no_multicollinearity']:
high_vif = assumptions['multicollinearity']['high_vif_features']
recommendations.append(f"Remove or combine high VIF features: {high_vif}")
# Handle non-normality
if not assumptions['normality']['normal']:
recommendations.append("Consider transforming target or using robust regression")
return recommendations
3. Regularized Regression
class RegularizedRegression:
"""Ridge, Lasso, and ElasticNet regression"""
def __init__(self):
self.models = {}
self.results = {}
def fit_all_regularizations(self, X_train, y_train, X_test, y_test):
"""Fit all regularized regression models"""
alphas = [0.001, 0.01, 0.1, 1, 10, 100]
# Ridge (L2)
ridge_scores = []
for alpha in alphas:
ridge = Ridge(alpha=alpha)
scores = cross_val_score(ridge, X_train, y_train, cv=5, scoring='r2')
ridge_scores.append({'alpha': alpha, 'mean_score': scores.mean(), 'std_score': scores.std()})
best_ridge_alpha = max(ridge_scores, key=lambda x: x['mean_score'])['alpha']
self.models['ridge'] = Ridge(alpha=best_ridge_alpha).fit(X_train, y_train)
# Lasso (L1)
lasso_scores = []
for alpha in alphas:
lasso = Lasso(alpha=alpha)
scores = cross_val_score(lasso, X_train, y_train, cv=5, scoring='r2')
lasso_scores.append({'alpha': alpha, 'mean_score': scores.mean(), 'std_score': scores.std()})
best_lasso_alpha = max(lasso_scores, key=lambda x: x['mean_score'])['alpha']
self.models['lasso'] = Lasso(alpha=best_lasso_alpha).fit(X_train, y_train)
# ElasticNet
elastic_scores = []
l1_ratios = [0.1, 0.3, 0.5, 0.7, 0.9]
for alpha in alphas:
for l1_ratio in l1_ratios:
elastic = ElasticNet(alpha=alpha, l1_ratio=l1_ratio)
scores = cross_val_score(elastic, X_train, y_train, cv=5, scoring='r2')
elastic_scores.append({
'alpha': alpha,
'l1_ratio': l1_ratio,
'mean_score': scores.mean()
})
best_elastic = max(elastic_scores, key=lambda x: x['mean_score'])
self.models['elastic'] = ElasticNet(
alpha=best_elastic['alpha'],
l1_ratio=best_elastic['l1_ratio']
).fit(X_train, y_train)
# Evaluate all models
for name, model in self.models.items():
y_pred = model.predict(X_test)
self.results[name] = {
'r2': r2_score(y_test, y_pred),
'rmse': np.sqrt(mean_squared_error(y_test, y_pred)),
'mae': mean_absolute_error(y_test, y_pred),
'feature_importance': pd.Series(model.coef_, index=X_train.columns).sort_values(ascending=False)
}
return self.results
def compare_with_ols(self, X_train, y_train, X_test, y_test):
"""Compare regularized models with OLS"""
# Fit OLS
ols = LinearRegression().fit(X_train, y_train)
y_pred_ols = ols.predict(X_test)
self.results['ols'] = {
'r2': r2_score(y_test, y_pred_ols),
'rmse': np.sqrt(mean_squared_error(y_test, y_pred_ols)),
'mae': mean_absolute_error(y_test, y_pred_ols)
}
# Create comparison table
comparison = pd.DataFrame(self.results).T
comparison = comparison[['r2', 'rmse', 'mae']]
return comparison
# Example usage
reg_analysis = RegularizedRegression()
results = reg_analysis.fit_all_regularizations(X_train, y_train, X_test, y_test)
comparison = reg_analysis.compare_with_ols(X_train, y_train, X_test, y_test)
print("Model Comparison")
print("=" * 60)
print(comparison)
4. Logistic Regression
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (accuracy_score, precision_score, recall_score,
f1_score, roc_auc_score, confusion_matrix,
classification_report, roc_curve)
class LogisticRegressionAnalysis:
"""Comprehensive logistic regression analysis"""
def __init__(self):
self.model = None
self.results = {}
def fit_logistic(self, X_train, y_train, penalty='l2', C=1.0):
"""Fit logistic regression"""
self.model = LogisticRegression(
penalty=penalty,
C=C,
solver='lbfgs',
max_iter=1000,
random_state=42
)
self.model.fit(X_train, y_train)
return self
def evaluate(self, X_test, y_test):
"""Comprehensive model evaluation"""
y_pred = self.model.predict(X_test)
y_prob = self.model.predict_proba(X_test)[:, 1]
self.results = {
'accuracy': accuracy_score(y_test, y_pred),
'precision': precision_score(y_test, y_pred),
'recall': recall_score(y_test, y_pred),
'f1': f1_score(y_test, y_pred),
'roc_auc': roc_auc_score(y_test, y_prob),
'confusion_matrix': confusion_matrix(y_test, y_pred),
'classification_report': classification_report(y_test, y_pred)
}
return self.results
def odds_ratios(self, feature_names):
"""Calculate and interpret odds ratios"""
odds_ratios = pd.DataFrame({
'feature': feature_names,
'coefficient': self.model.coef_[0],
'odds_ratio': np.exp(self.model.coef_[0]),
'p_value': self._calculate_p_values()
})
odds_ratios = odds_ratios.sort_values('odds_ratio', ascending=False)
return odds_ratios
def _calculate_p_values(self):
"""Calculate p-values for coefficients"""
# Wald test
from scipy.stats import norm
y_pred = self.model.predict(self.model.X_train_)
residuals = self.model.y_train_ - y_pred
# Calculate standard errors (simplified)
X = self.model.X_train_
V = np.diag(y_pred * (1 - y_pred))
cov_matrix = np.linalg.inv(X.T @ V @ X)
std_errors = np.sqrt(np.diag(cov_matrix))
# Z-scores and p-values
z_scores = self.model.coef_[0] / std_errors
p_values = 2 * (1 - norm.cdf(np.abs(z_scores)))
return p_values
def plot_roc_curve(self, X_test, y_test):
"""Plot ROC curve"""
y_prob = self.model.predict_proba(X_test)[:, 1]
fpr, tpr, thresholds = roc_curve(y_test, y_prob)
roc_auc = roc_auc_score(y_test, y_prob)
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color='blue', label=f'ROC curve (AUC = {roc_auc:.3f})')
plt.plot([0, 1], [0, 1], color='gray', linestyle='--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend()
plt.grid(True)
plt.savefig('roc_curve.png', dpi=150, bbox_inches='tight')
plt.show()
return roc_auc
# Example usage
log_analysis = LogisticRegressionAnalysis()
log_analysis.fit_logistic(X_train_binary, y_train_binary)
results = log_analysis.evaluate(X_test_binary, y_test_binary)
print("Logistic Regression Results")
print("=" * 60)
print(f"Accuracy: {results['accuracy']:.4f}")
print(f"Precision: {results['precision']:.4f}")
print(f"Recall: {results['recall']:.4f}")
print(f"F1-Score: {results['f1']:.4f}")
print(f"ROC-AUC: {results['roc_auc']:.4f}")
π‘
Pro Tip: For interpretability, focus on odds ratios in logistic regression. An odds ratio of 1.5 means the odds of the outcome increase by 50% for a one-unit increase in the predictor.
5. Model Evaluation Metrics
class RegressionEvaluator:
"""Comprehensive regression model evaluation"""
def __init__(self, y_true, y_pred):
self.y_true = y_true
self.y_pred = y_pred
self.residuals = y_true - y_pred
def calculate_all_metrics(self):
"""Calculate comprehensive evaluation metrics"""
metrics = {
'r2': r2_score(self.y_true, self.y_pred),
'adjusted_r2': self._adjusted_r2(),
'rmse': np.sqrt(mean_squared_error(self.y_true, self.y_pred)),
'mae': mean_absolute_error(self.y_true, self.y_pred),
'mape': self._mape(),
'max_error': np.max(np.abs(self.residuals)),
'median_absolute_error': np.median(np.abs(self.residuals))
}
return metrics
def _adjusted_r2(self):
"""Calculate adjusted R-squared"""
n = len(self.y_true)
p = 1 # Number of predictors (simplified)
r2 = r2_score(self.y_true, self.y_pred)
return 1 - (1 - r2) * (n - 1) / (n - p - 1)
def _mape(self):
"""Calculate Mean Absolute Percentage Error"""
mask = self.y_true != 0
return np.mean(np.abs((self.y_true[mask] - self.y_pred[mask]) / self.y_true[mask])) * 100
def calculate_prediction_intervals(self, X_train, model, confidence=0.95):
"""Calculate prediction intervals"""
from scipy.stats import t
# Get predictions
X_with_const = sm.add_constant(X_train)
model_sm = sm.OLS(self.y_true, X_with_const).fit()
# Predict
X_test_with_const = sm.add_constant(np.zeros((1, X_train.shape[1])))
prediction = model_sm.get_prediction(X_test_with_const)
# Prediction interval
pred_summary = prediction.summary_frame(alpha=1 - confidence)
return pred_summary
def cross_validate(self, X, y, model, cv=5):
"""Perform cross-validation"""
from sklearn.model_selection import cross_val_score, cross_val_predict
# Cross-validation scores
cv_scores = cross_val_score(model, X, y, cv=cv, scoring='r2')
# Cross-validated predictions
y_pred_cv = cross_val_predict(model, X, y, cv=cv)
return {
'cv_r2_mean': cv_scores.mean(),
'cv_r2_std': cv_scores.std(),
'cv_predictions': y_pred_cv
}
# Example usage
evaluator = RegressionEvaluator(y_test, y_pred)
metrics = evaluator.calculate_all_metrics()
print("Regression Evaluation Metrics")
print("=" * 60)
for metric, value in metrics.items():
print(f"{metric}: {value:.4f}")
6. Common Follow-Up Questions
Follow-up 1: How do you handle heteroscedasticity?
def handle_heteroscedasticity(X, y):
"""Methods to handle heteroscedasticity"""
methods = {}
# 1. Weighted Least Squares
from statsmodels.regression.linear_model import WLS
# Fit initial model to estimate variance
model_ols = LinearRegression().fit(X, y)
residuals = y - model_ols.predict(X)
# Estimate variance function
abs_residuals = np.abs(residuals)
variance_model = LinearRegression().fit(X, abs_residuals)
weights = 1 / (variance_model.predict(X) ** 2 + 1e-6)
# Fit WLS
model_wls = WLS(y, sm.add_constant(X), weights=weights).fit()
methods['wls'] = model_wls
# 2. Robust Standard Errors (HC3)
model_ols_sm = sm.OLS(y, sm.add_constant(X)).fit()
robust_model = model_ols_sm.get_robustcov_results(cov_type='HC3')
methods['robust_se'] = robust_model
# 3. Log transformation
if (y > 0).all():
y_log = np.log(y)
model_log = LinearRegression().fit(X, y_log)
methods['log_transform'] = model_log
# 4. Square root transformation
y_sqrt = np.sqrt(y)
model_sqrt = LinearRegression().fit(X, y_sqrt)
methods['sqrt_transform'] = model_sqrt
return methods
Follow-up 2: How do you handle multicollinearity?
def handle_multicollinearity(X, threshold=5):
"""Handle multicollinearity using VIF"""
# Calculate VIF
vif_data = pd.DataFrame()
vif_data['feature'] = X.columns
vif_data['VIF'] = [variance_inflation_factor(X.values, i) for i in range(X.shape[1])]
# Remove features with high VIF iteratively
features_to_drop = []
while True:
# Recalculate VIF
vif_data = pd.DataFrame()
vif_data['feature'] = X.columns
vif_data['VIF'] = [variance_inflation_factor(X.values, i) for i in range(X.shape[1])]
max_vif = vif_data['VIF'].max()
if max_vif > threshold:
# Drop feature with highest VIF
feature_to_drop = vif_data.loc[vif_data['VIF'].idxmax(), 'feature']
features_to_drop.append(feature_to_drop)
X = X.drop(columns=[feature_to_drop])
else:
break
return X, features_to_drop, vif_data
Company-Specific Tips
βΉοΈ
Amazon Tips:
- Amazon values interpretable models for business decisions
- Be prepared to explain regression coefficients to non-technical stakeholders
- Know how to implement regression in production
- Understand A/B testing for model comparison
Google Tips:
- Google often asks about regularized regression for high-dimensional data
- Be comfortable with both statsmodels and scikit-learn implementations
- Know how to handle imbalanced data in logistic regression
- Understand online learning for regression models
Quiz Section
Related Topics
- Feature Engineering β Creating features for regression
- Statistical Testing β Testing regression coefficients
- Regularization β Bayesian perspective on regularization
- Model Selection β Choosing between models