The Interview Question
βΉοΈ
Question: You need to estimate the probability that a new feature will increase revenue by at least 10%:
- Historical data shows revenue follows a log-normal distribution
- You have limited data (100 observations)
- You need confidence intervals for your estimate
Walk through your Monte Carlo approach:
- How do you design a simulation study?
- How do you use bootstrap for uncertainty quantification?
- How do you validate your simulation results?
- When are Monte Carlo methods preferred over analytical solutions?
Detailed Answer
1. Monte Carlo Simulation Framework
Monte Carlo methods use random sampling to solve problems that might be deterministic in principle but are difficult to solve analytically.
import numpy as np
import pandas as pd
from scipy import stats
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Callable, List, Dict, Tuple
import warnings
warnings.filterwarnings('ignore')
class MonteCarloSimulator:
"""Monte Carlo simulation framework"""
def __init__(self, n_simulations=10000, random_seed=42):
self.n_simulations = n_simulations
self.random_seed = random_seed
np.random.seed(random_seed)
self.results = {}
def estimate_probability(self,
simulation_func: Callable,
condition: Callable,
**kwargs) -> Dict:
"""
Estimate probability using Monte Carlo simulation
Parameters:
-----------
simulation_func : function that generates simulated outcomes
condition : function that returns True if condition is met
"""
# Run simulations
outcomes = []
for _ in range(self.n_simulations):
outcome = simulation_func(**kwargs)
outcomes.append(outcome)
outcomes = np.array(outcomes)
# Check condition
successes = condition(outcomes)
probability = np.mean(successes)
# Confidence interval using normal approximation
se = np.sqrt(probability * (1 - probability) / self.n_simulations)
ci_lower = probability - 1.96 * se
ci_upper = probability + 1.96 * se
results = {
'probability': probability,
'confidence_interval': (ci_lower, ci_upper),
'n_simulations': self.n_simulations,
'outcomes': outcomes,
'successes': successes.sum(),
'se': se
}
self.results['probability_estimation'] = results
return results
def estimate_expectation(self,
simulation_func: Callable,
**kwargs) -> Dict:
"""Estimate expected value using Monte Carlo"""
outcomes = []
for _ in range(self.n_simulations):
outcome = simulation_func(**kwargs)
outcomes.append(outcome)
outcomes = np.array(outcomes)
results = {
'mean': np.mean(outcomes),
'std': np.std(outcomes),
'se': np.std(outcomes) / np.sqrt(self.n_simulations),
'ci_95': (np.percentile(outcomes, 2.5), np.percentile(outcomes, 97.5)),
'outcomes': outcomes
}
self.results['expectation'] = results
return results
def sensitivity_analysis(self,
simulation_func: Callable,
param_name: str,
param_values: np.ndarray,
**fixed_kwargs) -> Dict:
"""Sensitivity analysis over parameter values"""
sensitivity_results = []
for value in param_values:
kwargs = fixed_kwargs.copy()
kwargs[param_name] = value
outcomes = []
for _ in range(self.n_simulations):
outcome = simulation_func(**kwargs)
outcomes.append(outcome)
sensitivity_results.append({
'param_value': value,
'mean': np.mean(outcomes),
'std': np.std(outcomes),
'percentile_25': np.percentile(outcomes, 25),
'percentile_75': np.percentile(outcomes, 75)
})
return pd.DataFrame(sensitivity_results)
def variance_reduction(self,
simulation_func: Callable,
method: str = 'antithetic',
**kwargs) -> Dict:
"""Apply variance reduction techniques"""
if method == 'antithetic':
# Antithetic variates
outcomes = []
for _ in range(self.n_simulations // 2):
# Generate random sample
sample1 = simulation_func(**kwargs)
# Generate antithetic (negatively correlated) sample
# This requires the simulation function to use uniform random variables
sample2 = simulation_func(**kwargs)
outcomes.extend([sample1, sample2])
outcomes = np.array(outcomes)
elif method == 'control':
# Control variates (simplified)
outcomes = []
for _ in range(self.n_simulations):
outcome = simulation_func(**kwargs)
outcomes.append(outcome)
outcomes = np.array(outcomes)
results = {
'mean': np.mean(outcomes),
'std': np.std(outcomes),
'method': method,
'variance_reduction': None # Would compare with naive MC
}
return results
# Example usage
def simulate_revenue_increase(base_revenue=1000000,
feature_effect_mean=0.12,
feature_effect_std=0.05,
noise_std=0.1):
"""Simulate revenue with new feature"""
# Feature effect (log-normal for positive skew)
feature_effect = np.random.lognormal(
np.log(feature_effect_mean),
feature_effect_std
)
# Random noise
noise = np.random.normal(1, noise_std)
# New revenue
new_revenue = base_revenue * (1 + feature_effect) * noise
return new_revenue
# Run simulation
simulator = MonteCarloSimulator(n_simulations=10000)
results = simulator.estimate_probability(
simulation_func=simulate_revenue_increase,
condition=lambda x: (x / 1000000 - 1) >= 0.10, # 10% increase
feature_effect_mean=0.12,
feature_effect_std=0.05
)
print("Monte Carlo Probability Estimation")
print("=" * 60)
print(f"Probability of β₯10% revenue increase: {results['probability']:.4f}")
print(f"95% CI: ({results['confidence_interval'][0]:.4f}, {results['confidence_interval'][1]:.4f})")
print(f"Standard error: {results['se']:.6f}")
2. Bootstrap Resampling
class BootstrapAnalyzer:
"""Bootstrap resampling for uncertainty quantification"""
def __init__(self, data, n_bootstrap=10000, random_seed=42):
self.data = np.array(data)
self.n_bootstrap = n_bootstrap
self.random_seed = random_seed
np.random.seed(random_seed)
def bootstrap_statistic(self, statistic: Callable,
confidence_level: float = 0.95) -> Dict:
"""Calculate bootstrap confidence interval for a statistic"""
# Calculate original statistic
original_stat = statistic(self.data)
# Bootstrap samples
bootstrap_stats = []
for _ in range(self.n_bootstrap):
# Sample with replacement
sample = np.random.choice(self.data, size=len(self.data), replace=True)
boot_stat = statistic(sample)
bootstrap_stats.append(boot_stat)
bootstrap_stats = np.array(bootstrap_stats)
# Confidence intervals
alpha = 1 - confidence_level
ci_lower = np.percentile(bootstrap_stats, alpha/2 * 100)
ci_upper = np.percentile(bootstrap_stats, (1 - alpha/2) * 100)
results = {
'original_statistic': original_stat,
'bootstrap_mean': np.mean(bootstrap_stats),
'bootstrap_std': np.std(bootstrap_stats),
'confidence_interval': (ci_lower, ci_upper),
'confidence_level': confidence_level,
'bootstrap_distribution': bootstrap_stats
}
return results
def bootstrap_mean(self, confidence_level=0.95):
"""Bootstrap confidence interval for mean"""
return self.bootstrap_statistic(np.mean, confidence_level)
def bootstrap_median(self, confidence_level=0.95):
"""Bootstrap confidence interval for median"""
return self.bootstrap_statistic(np.median, confidence_level)
def bootstrap_correlation(self, y, confidence_level=0.95):
"""Bootstrap confidence interval for correlation"""
x = self.data
y = np.array(y)
def corr_stat(data_pair):
x_sample, y_sample = data_pair
return np.corrcoef(x_sample, y_sample)[0, 1]
# Joint bootstrap
bootstrap_corrs = []
for _ in range(self.n_bootstrap):
indices = np.random.choice(len(x), size=len(x), replace=True)
x_sample = x[indices]
y_sample = y[indices]
boot_corr = np.corrcoef(x_sample, y_sample)[0, 1]
bootstrap_corrs.append(boot_corr)
bootstrap_corrs = np.array(bootstrap_corrs)
alpha = 1 - confidence_level
ci_lower = np.percentile(bootstrap_corrs, alpha/2 * 100)
ci_upper = np.percentile(bootstrap_corrs, (1 - alpha/2) * 100)
original_corr = np.corrcoef(x, y)[0, 1]
return {
'original_correlation': original_corr,
'bootstrap_mean': np.mean(bootstrap_corrs),
'bootstrap_std': np.std(bootstrap_corrs),
'confidence_interval': (ci_lower, ci_upper),
'confidence_level': confidence_level
}
def bootstrap_regression(self, X, y, confidence_level=0.95):
"""Bootstrap confidence intervals for regression coefficients"""
from sklearn.linear_model import LinearRegression
# Fit original model
model = LinearRegression()
model.fit(X, y)
original_coefs = model.coef_
# Bootstrap
bootstrap_coefs = []
for _ in range(self.n_bootstrap):
n_samples = len(X)
indices = np.random.choice(n_samples, size=n_samples, replace=True)
X_boot = X[indices] if isinstance(X, np.ndarray) else X.iloc[indices]
y_boot = y[indices]
boot_model = LinearRegression()
boot_model.fit(X_boot, y_boot)
bootstrap_coefs.append(boot_model.coef_)
bootstrap_coefs = np.array(bootstrap_coefs)
# Confidence intervals for each coefficient
alpha = 1 - confidence_level
ci_lower = np.percentile(bootstrap_coefs, alpha/2 * 100, axis=0)
ci_upper = np.percentile(bootstrap_coefs, (1 - alpha/2) * 100, axis=0)
return {
'original_coefficients': original_coefs,
'bootstrap_mean': np.mean(bootstrap_coefs, axis=0),
'bootstrap_std': np.std(bootstrap_coefs, axis=0),
'confidence_intervals': list(zip(ci_lower, ci_upper)),
'confidence_level': confidence_level
}
def visualize_bootstrap(self, statistic_name='mean'):
"""Visualize bootstrap distribution"""
if statistic_name == 'mean':
results = self.bootstrap_mean()
elif statistic_name == 'median':
results = self.bootstrap_median()
bootstrap_dist = results['bootstrap_distribution']
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# Bootstrap distribution
axes[0].hist(bootstrap_dist, bins=50, edgecolor='black', alpha=0.7)
axes[0].axvline(x=results['original_statistic'], color='red',
linestyle='--', linewidth=2, label='Original')
axes[0].axvline(x=results['confidence_interval'][0], color='green',
linestyle='--', label='95% CI Lower')
axes[0].axvline(x=results['confidence_interval'][1], color='green',
linestyle='--', label='95% CI Upper')
axes[0].set_xlabel(statistic_name.title())
axes[0].set_ylabel('Frequency')
axes[0].set_title(f'Bootstrap Distribution of {statistic_name.title()}')
axes[0].legend()
# Bootstrap vs Original
axes[1].boxplot([self.data, bootstrap_dist],
labels=['Original Data', 'Bootstrap Samples'])
axes[1].set_ylabel('Value')
axes[1].set_title('Comparison: Original vs Bootstrap')
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(f'bootstrap_{statistic_name}.png', dpi=150, bbox_inches='tight')
plt.show()
# Example usage
# bootstrap = BootstrapAnalyzer(data['revenue'], n_bootstrap=10000)
# mean_results = bootstrap.bootstrap_mean()
# median_results = bootstrap.bootstrap_median()
# bootstrap.visualize_bootstrap('mean')
3. Markov Chain Monte Carlo (MCMC)
class MCMCSampler:
"""Simple MCMC sampler for posterior inference"""
def __init__(self):
self.samples = None
self.acceptance_rate = None
def metropolis_hastings(self, log_posterior, initial_value,
proposal_width=1.0, n_samples=10000, burn_in=1000):
"""Metropolis-Hastings algorithm"""
# Initialize
current = initial_value
samples = [current]
n_accepted = 0
for i in range(n_samples + burn_in):
# Propose new value
proposed = current + np.random.normal(0, proposal_width)
# Calculate acceptance ratio
log_ratio = log_posterior(proposed) - log_posterior(current)
# Accept or reject
if np.log(np.random.random()) < log_ratio:
current = proposed
n_accepted += 1
samples.append(current)
# Remove burn-in
self.samples = np.array(samples[burn_in:])
self.acceptance_rate = n_accepted / (n_samples + burn_in)
return self.samples
def gibbs_sampler(self, conditional_samplers, initial_values,
n_samples=10000, burn_in=1000):
"""Gibbs sampling"""
n_params = len(initial_values)
current = list(initial_values)
samples = []
for i in range(n_samples + burn_in):
# Sample each parameter conditional on others
for j in range(n_params):
current[j] = conditional_samplers[j](*current)
samples.append(current.copy())
# Remove burn-in
self.samples = np.array(samples[burn_in:])
return self.samples
def diagnose_convergence(self, n_chains=4, n_samples=10000):
"""Diagnose MCMC convergence"""
# Run multiple chains
chains = []
for _ in range(n_chains):
chain = self.metropolis_hastings(
lambda x: -0.5 * x**2, # Standard normal target
initial_value=np.random.normal(0, 2),
n_samples=n_samples
)
chains.append(chain)
chains = np.array(chains)
# Calculate diagnostics
diagnostics = {
'r_hat': self._calculate_r_hat(chains),
'effective_sample_size': self._calculate_ess(chains),
'autocorrelation': self._calculate_autocorrelation(chains[0])
}
return diagnostics
def _calculate_r_hat(self, chains):
"""Calculate R-hat (Gelman-Rubin statistic)"""
n_chains, n_samples = chains.shape
# Between-chain variance
chain_means = chains.mean(axis=1)
overall_mean = chain_means.mean()
B = n_samples / (n_chains - 1) * np.sum((chain_means - overall_mean)**2)
# Within-chain variance
chain_vars = chains.var(axis=1)
W = chain_vars.mean()
# R-hat
var_plus = (1 - 1/n_samples) * W + 1/n_samples * B
r_hat = np.sqrt(var_plus / W)
return r_hat
def _calculate_ess(self, chains):
"""Calculate effective sample size"""
# Simplified ESS calculation
n_chains, n_samples = chains.shape
ess_per_chain = []
for chain in chains:
# Calculate autocorrelation
autocorr = self._calculate_autocorrelation(chain)
# Integrate autocorrelation
tau = 1 + 2 * np.sum(autocorr[:100])
ess = n_samples / tau
ess_per_chain.append(ess)
return np.sum(ess_per_chain)
def _calculate_autocorrelation(self, chain, max_lag=100):
"""Calculate autocorrelation function"""
n = len(chain)
mean = chain.mean()
var = chain.var()
autocorr = []
for lag in range(max_lag):
if lag == 0:
autocorr.append(1.0)
else:
cov = np.mean((chain[:-lag] - mean) * (chain[lag:] - mean))
autocorr.append(cov / var)
return np.array(autocorr)
def visualize_diagnostics(self):
"""Visualize MCMC diagnostics"""
if self.samples is None:
print("Run sampler first")
return
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
# Trace plot
axes[0, 0].plot(self.samples, linewidth=0.5)
axes[0, 0].set_xlabel('Iteration')
axes[0, 0].set_ylabel('Value')
axes[0, 0].set_title('Trace Plot')
axes[0, 0].grid(True, alpha=0.3)
# Histogram
axes[0, 1].hist(self.samples, bins=50, edgecolor='black', alpha=0.7)
axes[0, 1].set_xlabel('Value')
axes[0, 1].set_ylabel('Frequency')
axes[0, 1].set_title('Posterior Distribution')
# Autocorrelation
autocorr = self._calculate_autocorrelation(self.samples)
axes[1, 0].bar(range(len(autocorr)), autocorr)
axes[1, 0].axhline(y=0, color='gray', linestyle='--')
axes[1, 0].set_xlabel('Lag')
axes[1, 0].set_ylabel('Autocorrelation')
axes[1, 0].set_title('Autocorrelation Function')
# Running mean
running_mean = np.cumsum(self.samples) / np.arange(1, len(self.samples) + 1)
axes[1, 1].plot(running_mean, linewidth=2)
axes[1, 1].axhline(y=np.mean(self.samples), color='red', linestyle='--')
axes[1, 1].set_xlabel('Iteration')
axes[1, 1].set_ylabel('Running Mean')
axes[1, 1].set_title('Convergence of Mean')
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('mcmc_diagnostics.png', dpi=150, bbox_inches='tight')
plt.show()
# Example usage
# mcmc = MCMCSampler()
# samples = mcmc.metropolis_hastings(
# log_posterior=lambda x: -0.5 * (x - 2)**2, # Target: N(2, 1)
# initial_value=0,
# proposal_width=0.5,
# n_samples=10000
# )
# mcmc.visualize_diagnostics()
π‘
Pro Tip: Always check MCMC convergence using multiple diagnostics: trace plots, R-hat (< 1.1), effective sample size, and autocorrelation. Poor convergence invalidates your inference.
4. Real-World Application: Revenue Simulation
class RevenueSimulator:
"""Simulate revenue scenarios for business decisions"""
def __init__(self, historical_data=None):
self.historical_data = historical_data
self.simulator = MonteCarloSimulator(n_simulations=10000)
def estimate_new_feature_impact(self,
base_revenue,
feature_effect_distribution='lognormal',
feature_params=None,
uncertainty_factor=0.2):
"""Estimate impact of new feature on revenue"""
if feature_params is None:
feature_params = {'mean': 0.15, 'std': 0.05} # 15% expected increase
def simulate_revenue():
# Feature effect
if feature_effect_distribution == 'lognormal':
effect = np.random.lognormal(
np.log(feature_params['mean']),
feature_params['std']
)
elif feature_effect_distribution == 'normal':
effect = np.random.normal(
feature_params['mean'],
feature_params['std']
)
effect = max(effect, -0.5) # Floor at -50%
# Market uncertainty
market_factor = np.random.normal(1, uncertainty_factor)
# Revenue
new_revenue = base_revenue * (1 + effect) * market_factor
return new_revenue
# Run simulation
results = self.simulator.estimate_expectation(simulate_revenue)
# Add business metrics
base_assumption = base_revenue
results['lift'] = (results['mean'] - base_assumption) / base_assumption
results['probability_positive'] = np.mean(results['outcomes'] > base_assumption)
results['probability_10pct'] = np.mean(results['outcomes'] > base_assumption * 1.10)
return results
def simulate_customer_acquisition(self,
marketing_budget,
conversion_rate_mean,
conversion_rate_std,
customer_value_mean,
customer_value_std,
budget_allocation=None):
"""Simulate customer acquisition and revenue"""
def simulate_campaign():
# Actual conversion rate (uncertainty)
actual_cr = np.random.normal(conversion_rate_mean, conversion_rate_std)
actual_cr = max(0, min(1, actual_cr)) # Bound between 0 and 1
# Customer value (uncertainty)
actual_cv = np.random.lognormal(
np.log(customer_value_mean),
customer_value_std
)
# Simulate customers
n_customers = int(marketing_budget * actual_cr / 100) # Assume $100 CAC
# Revenue
revenue = n_customers * actual_cv
# ROI
roi = (revenue - marketing_budget) / marketing_budget
return {
'n_customers': n_customers,
'revenue': revenue,
'roi': roi,
'actual_cr': actual_cr,
'actual_cv': actual_cv
}
# Run simulations
results = []
for _ in range(self.simulator.n_simulations):
result = simulate_campaign()
results.append(result)
results_df = pd.DataFrame(results)
summary = {
'expected_customers': results_df['n_customers'].mean(),
'expected_revenue': results_df['revenue'].mean(),
'expected_roi': results_df['roi'].mean(),
'prob_positive_roi': (results_df['roi'] > 0).mean(),
'prob_2x_roi': (results_df['roi'] > 1).mean(),
'var_revenue': results_df['revenue'].var(),
'var_roi': results_df['roi'].var()
}
return summary, results_df
def portfolio_simulation(self,
assets,
correlations,
weights,
n_years=10,
initial_investment=1000000):
"""Simulate portfolio returns"""
n_assets = len(assets)
# Generate correlated returns
mean_returns = [a['expected_return'] for a in assets]
std_returns = [a['std_return'] for a in assets]
# Create correlation matrix
corr_matrix = np.array(correlations)
cov_matrix = np.outer(std_returns, std_returns) * corr_matrix
# Simulate
portfolio_values = []
for _ in range(self.simulator.n_simulations):
portfolio_value = initial_investment
for year in range(n_years):
# Generate correlated returns
returns = np.random.multivariate_normal(mean_returns, cov_matrix)
# Portfolio return
portfolio_return = np.sum(weights * returns)
# Update value
portfolio_value *= (1 + portfolio_return)
portfolio_values.append(portfolio_value)
portfolio_values = np.array(portfolio_values)
results = {
'expected_value': np.mean(portfolio_values),
'std_value': np.std(portfolio_values),
'percentile_5': np.percentile(portfolio_values, 5),
'percentile_95': np.percentile(portfolio_values, 95),
'prob_loss': np.mean(portfolio_values < initial_investment),
'prob_double': np.mean(portfolio_values > 2 * initial_investment),
'sharpe_ratio': (np.mean(portfolio_values) / initial_investment - 1) / np.std(portfolio_values / initial_investment)
}
return results, portfolio_values
# Example usage
# revenue_sim = RevenueSimulator()
# results = revenue_sim.estimate_new_feature_impact(
# base_revenue=10000000,
# feature_params={'mean': 0.15, 'std': 0.05}
# )
# print(f"Expected revenue: ${results['mean']:,.0f}")
# print(f"Probability of 10% increase: {results['probability_10pct']:.2%}")
5. Common Follow-Up Questions
Follow-up 1: When is Monte Carlo preferred over analytical solutions?
def compare_approaches(scenario):
"""Compare Monte Carlo vs analytical approaches"""
comparisons = {
'high_dimensional': {
'monte_carlo': 'Curse of dimensionality manageable',
'analytical': 'Intractable integrals',
'recommendation': 'Monte Carlo'
},
'complex_distributions': {
'monte_carlo': 'Can sample from any distribution',
'analytical': 'May not have closed-form solution',
'recommendation': 'Monte Carlo'
},
'simple_problems': {
'monte_carlo': 'Overkill, slow convergence',
'analytical': 'Exact and fast',
'recommendation': 'Analytical'
},
'real_time': {
'monte_carlo': 'Too slow for real-time',
'analytical': 'Fast computation',
'recommendation': 'Analytical (if available)'
},
'uncertainty_quantification': {
'monte_carlo': 'Provides full distribution',
'analytical': 'Often just point estimates',
'recommendation': 'Monte Carlo'
}
}
return comparisons.get(scenario, "Unknown scenario")
# Example
# print(compare_approaches('high_dimensional'))
Follow-up 2: How do you validate Monte Carlo simulations?
def validate_simulation(simulated_data, observed_data=None,
theoretical_distribution=None):
"""Validate Monte Carlo simulation results"""
validation_results = {}
# Check convergence
n_simulations = len(simulated_data)
running_mean = np.cumsum(simulated_data) / np.arange(1, n_simulations + 1)
# Check if running mean stabilized
last_10pct = running_mean[int(0.9 * n_simulations):]
validation_results['convergence'] = {
'final_mean': running_mean[-1],
'std_of_last_10pct': np.std(last_10pct),
'converged': np.std(last_10pct) < 0.01 * abs(running_mean[-1])
}
# Compare with observed data if available
if observed_data is not None:
from scipy.stats import ks_2samp
ks_stat, ks_p = ks_2samp(simulated_data, observed_data)
validation_results['empirical'] = {
'ks_statistic': ks_stat,
'ks_p_value': ks_p,
'consistent_with_observed': ks_p > 0.05
}
# Compare with theoretical distribution if available
if theoretical_distribution is not None:
from scipy.stats import chisquare
# (Simplified - would need proper binning)
validation_results['theoretical'] = {
'distribution': str(theoretical_distribution),
'note': 'Full goodness-of-fit test needed'
}
return validation_results
# Example
# validation = validate_simulation(results['outcomes'], observed_revenue)
Company-Specific Tips
βΉοΈ
Meta Tips:
- Meta values simulation for A/B test analysis
- Know how to use Monte Carlo for causal inference
- Understand variance reduction techniques
- Be comfortable with MCMC for Bayesian models
Netflix Tips:
- Netflix uses simulation for content valuation
- Know how to model user behavior probabilistically
- Understand portfolio simulation for content investment
- Be familiar with Monte Carlo for recommendation uncertainty
Quiz Section
Related Topics
- Bayesian Statistics β MCMC for posterior sampling
- Statistical Testing β Bootstrap hypothesis testing
- Simulation β Probabilistic simulation
- Uncertainty Quantification β Uncertainty in experiments