A/B Testing for ML Models
Difficulty: Senior Level | Companies: Google, Meta, Netflix, Uber, Stripe
A/B Testing Framework
A/B testing ML models requires careful experimental design, statistical rigor, and automated decision-making.
βΉοΈ
Google runs thousands of ML A/B tests simultaneously, with automated systems handling 95% of test decisions.
Experiment Framework
# ab_testing.py
import numpy as np
from scipy import stats
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import json
import hashlib
class ExperimentStatus(Enum):
DRAFT = "draft"
RUNNING = "running"
PAUSED = "paused"
COMPLETED = "completed"
TERMINATED = "terminated"
@dataclass
class Variant:
name: str
model_version: str
traffic_percentage: float
is_control: bool = False
@dataclass
class ExperimentConfig:
experiment_id: str
name: str
description: str
variants: List[Variant]
primary_metric: str
secondary_metrics: List[str]
min_sample_size: int
significance_level: float = 0.05
power: float = 0.8
max_duration_days: int = 14
auto_promote: bool = True
promotion_threshold: float = 0.05
@dataclass
class ExperimentResult:
experiment_id: str
status: ExperimentStatus
winner: Optional[str]
p_value: float
confidence_interval: tuple
effect_size: float
sample_size_per_variant: int
duration_days: int
metrics_summary: Dict[str, Dict]
class ABTestManager:
def __init__(self):
self.experiments: Dict[str, ExperimentConfig] = {}
self.results: Dict[str, ExperimentResult] = {}
self.experiment_data: Dict[str, List[Dict]] = {}
def create_experiment(self, config: ExperimentConfig) -> str:
self.experiments[config.experiment_id] = config
self.experiment_data[config.experiment_id] = []
return config.experiment_id
def assign_variant(self, experiment_id: str, user_id: str) -> Optional[Variant]:
config = self.experiments[experiment_id]
hash_value = int(hashlib.md5(f"{experiment_id}:{user_id}".encode()).hexdigest(), 16)
assignment = (hash_value % 100) / 100
cumulative = 0
for variant in config.variants:
cumulative += variant.traffic_percentage
if assignment < cumulative:
return variant
return None
def log_metric(self, experiment_id: str, user_id: str, variant_name: str, metric_name: str, value: float):
self.experiment_data[experiment_id].append({
"user_id": user_id,
"variant": variant_name,
"metric": metric_name,
"value": value,
"timestamp": datetime.now().isoformat()
})
def analyze_experiment(self, experiment_id: str) -> ExperimentResult:
config = self.experiments[experiment_id]
data = self.experiment_data[experiment_id]
variant_data = {}
for variant in config.variants:
variant_metrics = [d["value"] for d in data if d["variant"] == variant.name and d["metric"] == config.primary_metric]
variant_data[variant.name] = variant_metrics
control_name = next(v.name for v in config.variants if v.is_control)
treatment_names = [v.name for v in config.variants if not v.is_control]
control_values = np.array(variant_data[control_name])
best_treatment = None
best_p_value = 1.0
for treatment_name in treatment_names:
treatment_values = np.array(variant_data[treatment_name])
t_stat, p_value = stats.ttest_ind(control_values, treatment_values)
if p_value < best_p_value:
best_p_value = p_value
best_treatment = treatment_name
if best_treatment:
treatment_values = np.array(variant_data[best_treatment])
effect_size = (treatment_values.mean() - control_values.mean()) / control_values.std()
ci_lower = effect_size - 1.96 * np.sqrt(1/len(control_values) + 1/len(treatment_values))
ci_upper = effect_size + 1.96 * np.sqrt(1/len(control_values) + 1/len(treatment_values))
winner = best_treatment if best_p_value < config.significance_level else None
else:
effect_size = 0
ci_lower, ci_upper = 0, 0
winner = None
result = ExperimentResult(
experiment_id=experiment_id,
status=ExperimentStatus.COMPLETED,
winner=winner,
p_value=best_p_value,
confidence_interval=(ci_lower, ci_upper),
effect_size=effect_size,
sample_size_per_variant=len(control_values),
duration_days=1,
metrics_summary={v: {"mean": np.mean(variant_data[v]), "std": np.std(variant_data[v])} for v in variant_data}
)
self.results[experiment_id] = result
return result
def should_promote(self, experiment_id: str) -> bool:
config = self.experiments[experiment_id]
result = self.results.get(experiment_id)
if result is None or not config.auto_promote:
return False
if result.p_value < config.significance_level and result.effect_size > config.promotion_threshold:
return True
return False
def get_traffic_split(self, experiment_id: str) -> Dict[str, float]:
config = self.experiments[experiment_id]
return {v.name: v.traffic_percentage for v in config.variants}
# Usage
manager = ABTestManager()
config = ExperimentConfig(
experiment_id="exp-001",
name="New Recommendation Model",
description="Testing improved recommendation algorithm",
variants=[
Variant(name="control", model_version="v1.0", traffic_percentage=0.5, is_control=True),
Variant(name="treatment", model_version="v2.0", traffic_percentage=0.5)
],
primary_metric="click_through_rate",
secondary_metrics=["conversion_rate", "revenue_per_user"],
min_sample_size=10000,
significance_level=0.05
)
manager.create_experiment(config)
for i in range(5000):
variant = manager.assign_variant("exp-001", f"user_{i}")
if variant.name == "control":
ctr = np.random.beta(2, 10)
else:
ctr = np.random.beta(2.2, 10)
manager.log_metric("exp-001", f"user_{i}", variant.name, "click_through_rate", ctr)
result = manager.analyze_experiment("exp-001")
print(f"Winner: {result.winner}, p-value: {result.p_value:.4f}")
Statistical Testing
# statistical_tests.py
import numpy as np
from scipy import stats
from typing import Dict, Tuple
from dataclasses import dataclass
@dataclass
class TestResult:
test_name: str
statistic: float
p_value: float
significant: bool
effect_size: float
confidence_interval: Tuple[float, float]
class StatisticalTester:
def __init__(self, significance_level: float = 0.05):
self.significance_level = significance_level
def t_test(self, control: np.ndarray, treatment: np.ndarray) -> TestResult:
t_stat, p_value = stats.ttest_ind(control, treatment)
effect_size = (treatment.mean() - control.mean()) / control.std()
ci = stats.t.interval(
1 - self.significance_level,
len(control) + len(treatment) - 2,
loc=(treatment.mean() - control.mean()),
scale=stats.sem(np.concatenate([control, treatment]))
)
return TestResult(
test_name="t_test",
statistic=t_stat,
p_value=p_value,
significant=p_value < self.significance_level,
effect_size=effect_size,
confidence_interval=ci
)
def mann_whitney_u(self, control: np.ndarray, treatment: np.ndarray) -> TestResult:
u_stat, p_value = stats.mannwhitneyu(control, treatment, alternative='two-sided')
effect_size = 1 - (2 * u_stat) / (len(control) * len(treatment))
return TestResult(
test_name="mann_whitney_u",
statistic=u_stat,
p_value=p_value,
significant=p_value < self.significance_level,
effect_size=effect_size,
confidence_interval=(0, 0)
)
def bootstrap_test(self, control: np.ndarray, treatment: np.ndarray, n_bootstrap: int = 10000) -> TestResult:
observed_diff = treatment.mean() - control.mean()
combined = np.concatenate([control, treatment])
bootstrap_diffs = []
for _ in range(n_bootstrap):
np.random.shuffle(combined)
boot_control = combined[:len(control)]
boot_treatment = combined[len(control):]
bootstrap_diffs.append(boot_treatment.mean() - boot_control.mean())
bootstrap_diffs = np.array(bootstrap_diffs)
p_value = np.mean(np.abs(bootstrap_diffs) >= np.abs(observed_diff))
ci = np.percentile(bootstrap_diffs, [2.5, 97.5])
return TestResult(
test_name="bootstrap",
statistic=observed_diff,
p_value=p_value,
significant=p_value < self.significance_level,
effect_size=observed_diff / control.std(),
confidence_interval=tuple(ci)
)
def calculate_sample_size(self, baseline_rate: float, minimum_detectable_effect: float, power: float = 0.8) -> int:
alpha = self.significance_level
p1 = baseline_rate
p2 = baseline_rate + minimum_detectable_effect
pooled_p = (p1 + p2) / 2
z_alpha = stats.norm.ppf(1 - alpha / 2)
z_beta = stats.norm.ppf(power)
n = ((z_alpha * np.sqrt(2 * pooled_p * (1 - pooled_p)) +
z_beta * np.sqrt(p1 * (1 - p1) + p2 * (1 - p2))) ** 2) / (minimum_detectable_effect ** 2)
return int(np.ceil(n))
tester = StatisticalTester(significance_level=0.05)
control = np.random.beta(2, 10, 1000)
treatment = np.random.beta(2.2, 10, 1000)
t_result = tester.t_test(control, treatment)
print(f"t-test: p={t_result.p_value:.4f}, significant={t_result.significant}")
sample_size = tester.calculate_sample_size(baseline_rate=0.2, minimum_detectable_effect=0.02)
print(f"Required sample size: {sample_size}")
Follow-Up Questions
- How do you handle network effects in A/B testing?
- What metrics should be tracked during ML A/B tests?
- How do you implement multi-armed bandit testing for model selection?
- What are the ethical considerations in ML A/B testing?