Interview Question (Hard) β Asked at: Google, Microsoft, Amazon, Netflix, Meta
"Design an AutoML system that automatically selects models, tunes hyperparameters, and performs feature engineering. How do you balance exploration vs exploitation and manage computational budget?"
AutoML Architecture Overview
AutoML automates the machine learning pipeline, from feature engineering to model selection and hyperparameter optimization.
AutoML Pipeline Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β AutoML Pipeline β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β Data βββββΆβ Feature βββββΆβ Model βββββΆβHyper- β β
β βAnalysis β βEngineeringβ β Selectionβ βparameter β β
β ββββββββββββ ββββββββββββ ββββββββββββ β Optimizationβ
β β β β ββββββββββββ β
β βΌ βΌ βΌ β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β Dataset β βFeature β βModel β βBest β β
β βProfiling β βStore β βRegistry β βConfig β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Hyperparameter Optimization
Bayesian Optimization
import numpy as np
from typing import Dict, List, Tuple, Callable
from dataclasses import dataclass
from scipy.stats import norm
from scipy.optimize import minimize
import warnings
@dataclass
class Hyperparameter:
name: str
type: str # 'int', 'float', 'categorical'
bounds: Tuple = None
choices: List = None
log_scale: bool = False
class GaussianProcess:
"""Gaussian Process for Bayesian optimization."""
def __init__(self, length_scale: float = 1.0,
noise_level: float = 0.1):
self.length_scale = length_scale
self.noise_level = noise_level
self.X_train = None
self.y_train = None
self.K_inv = None
def fit(self, X: np.ndarray, y: np.ndarray):
"""Fit GP to observed data."""
self.X_train = X
self.y_train = y
# Compute kernel matrix
K = self._compute_kernel(X, X)
K += self.noise_level * np.eye(len(X))
# Compute inverse
self.K_inv = np.linalg.inv(K)
def predict(self, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Predict mean and variance."""
if self.X_train is None:
return np.zeros(len(X)), np.ones(len(X))
K_star = self._compute_kernel(X, self.X_train)
K_ss = self._compute_kernel(X, X)
mu = K_star @ self.K_inv @ self.y_train
sigma = np.sqrt(np.diag(K_ss - K_star @ self.K_inv @ K_star.T))
return mu, sigma
def _compute_kernel(self, X1: np.ndarray, X2: np.ndarray) -> np.ndarray:
"""Compute RBF kernel."""
sqdist = np.sum(X1**2, axis=1).reshape(-1, 1) + \
np.sum(X2**2, axis=1).reshape(1, -2) - \
2 * X1 @ X2.T
return np.exp(-0.5 * sqdist / self.length_scale**2)
class BayesianOptimizer:
"""Bayesian Optimization for hyperparameter tuning."""
def __init__(self, objective: Callable,
hyperparameters: List[Hyperparameter],
n_initial_points: int = 10,
acquisition_function: str = 'ei'):
"""
Args:
objective: Function to optimize (takes dict, returns float)
hyperparameters: List of hyperparameters to optimize
n_initial_points: Number of random initial points
acquisition_function: 'ei' (Expected Improvement) or 'ucb'
"""
self.objective = objective
self.hyperparameters = hyperparameters
self.n_initial_points = n_initial_points
self.acquisition_function = acquisition_function
self.gp = GaussianProcess()
self.observations = []
self.results = []
def _random_sample(self) -> Dict:
"""Generate random hyperparameter sample."""
sample = {}
for hp in self.hyperparameters:
if hp.type == 'float':
if hp.log_scale:
sample[hp.name] = np.exp(
np.random.uniform(
np.log(hp.bounds[0]),
np.log(hp.bounds[1])
)
)
else:
sample[hp.name] = np.random.uniform(
hp.bounds[0], hp.bounds[1]
)
elif hp.type == 'int':
sample[hp.name] = np.random.randint(
hp.bounds[0], hp.bounds[1] + 1
)
elif hp.type == 'categorical':
sample[hp.name] = np.random.choice(hp.choices)
return sample
def _expected_improvement(self, X: np.ndarray,
best_y: float,
xi: float = 0.01) -> np.ndarray:
"""Calculate Expected Improvement."""
mu, sigma = self.gp.predict(X)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
z = (mu - best_y - xi) / (sigma + 1e-9)
ei = (mu - best_y - xi) * norm.cdf(z) + sigma * norm.pdf(z)
ei[sigma == 0.0] = 0.0
return ei
def _upper_confidence_bound(self, X: np.ndarray,
beta: float = 2.0) -> np.ndarray:
"""Calculate Upper Confidence Bound."""
mu, sigma = self.gp.predict(X)
return mu + beta * sigma
def _encode_config(self, config: Dict) -> np.ndarray:
"""Encode hyperparameter config to vector."""
encoded = []
for hp in self.hyperparameters:
value = config[hp.name]
if hp.type == 'float':
if hp.log_scale:
encoded.append(np.log(value))
else:
encoded.append(value)
elif hp.type == 'int':
encoded.append(float(value))
elif hp.type == 'categorical':
# One-hot encoding
one_hot = [0.0] * len(hp.choices)
one_hot[hp.choices.index(value)] = 1.0
encoded.extend(one_hot)
return np.array(encoded)
def optimize(self, n_iterations: int = 50,
verbose: bool = True) -> Dict:
"""Run Bayesian optimization."""
# Initial random exploration
for i in range(self.n_initial_points):
config = self._random_sample()
value = self.objective(config)
self.observations.append(config)
self.results.append(value)
if verbose:
print(f"Iteration {i+1}: {value:.4f}")
# Bayesian optimization loop
for i in range(self.n_iterations - self.n_initial_points):
# Encode observations
X = np.array([
self._encode_config(obs) for obs in self.observations
])
y = np.array(self.results)
# Fit GP
self.gp.fit(X, y)
# Optimize acquisition function
best_config = self._optimize_acquisition(y)
# Evaluate
value = self.objective(best_config)
self.observations.append(best_config)
self.results.append(value)
if verbose:
print(f"Iteration {self.n_initial_points + i + 1}: "
f"{value:.4f} (best: {max(self.results):.4f})")
# Return best configuration
best_idx = np.argmax(self.results)
return {
'best_config': self.observations[best_idx],
'best_value': self.results[best_idx],
'all_configs': self.observations,
'all_values': self.results
}
def _optimize_acquisition(self, best_y: float) -> Dict:
"""Optimize acquisition function."""
best_acquisition = -np.inf
best_config = None
# Random search over acquisition function
for _ in range(1000):
config = self._random_sample()
X = self._encode_config(config).reshape(1, -1)
if self.acquisition_function == 'ei':
acquisition_value = self._expected_improvement(
X, best_y
)[0]
else:
acquisition_value = self._upper_confidence_bound(X)[0]
if acquisition_value > best_acquisition:
best_acquisition = acquisition_value
best_config = config
return best_config
βΉοΈ
Bayesian optimization balances exploration and exploitation using Gaussian Processes. It's sample-efficient but scales poorly to high-dimensional spaces. Use it for expensive-to-evaluate models.
Tree-Structured Parzen Estimator (TPE)
import numpy as np
from typing import Dict, List, Tuple, Callable
from collections import defaultdict
class TreeStructuredParzenEstimator:
"""TPE for hyperparameter optimization."""
def __init__(self, objective: Callable,
hyperparameters: List[Dict],
gamma: float = 0.25,
n_startup_trials: int = 10):
"""
Args:
objective: Function to minimize
hyperparameters: List of hyperparameter definitions
gamma: Split ratio for good/bad trials
n_startup_trials: Number of random trials before TPE
"""
self.objective = objective
self.hyperparameters = hyperparameters
self.gamma = gamma
self.n_startup_trials = n_startup_trials
self.trials = []
self.results = []
def _sample_from_prior(self, hp: Dict) -> float:
"""Sample from prior distribution."""
if hp['type'] == 'float':
if hp.get('log_scale', False):
return np.exp(np.random.uniform(
np.log(hp['low']),
np.log(hp['high'])
))
else:
return np.random.uniform(hp['low'], hp['high'])
elif hp['type'] == 'int':
return np.random.randint(hp['low'], hp['high'] + 1)
elif hp['type'] == 'categorical':
return np.random.choice(hp['choices'])
def _fit_kde(self, values: List[float], hp: Dict):
"""Fit Kernel Density Estimation."""
if hp['type'] == 'float':
if hp.get('log_scale', False):
values = np.log(values)
# Simple KDE
bandwidth = np.std(values) * 0.5
def kde(x):
return np.mean(
np.exp(-0.5 * ((x - values) / bandwidth) ** 2) /
(bandwidth * np.sqrt(2 * np.pi))
)
return kde
elif hp['type'] == 'categorical':
counts = defaultdict(int)
for v in values:
counts[v] += 1
total = len(values)
def kde(x):
return counts.get(x, 0) / total
return kde
def _suggest_next(self) -> Dict:
"""Suggest next hyperparameter configuration."""
if len(self.trials) < self.n_startup_trials:
# Random exploration
return {
hp['name']: self._sample_from_prior(hp)
for hp in self.hyperparameters
}
# Split trials into good/bad
sorted_indices = np.argsort(self.results)
n_good = int(len(sorted_indices) * self.gamma)
good_indices = sorted_indices[:n_good]
bad_indices = sorted_indices[n_good:]
# Fit KDEs
configs = []
for hp in self.hyperparameters:
hp_name = hp['name']
good_values = [self.trials[i][hp_name] for i in good_indices]
bad_values = [self.trials[i][hp_name] for i in bad_indices]
good_kde = self._fit_kde(good_values, hp)
bad_kde = self._fit_kde(bad_values, hp)
configs.append((hp, good_kde, bad_kde))
# Sample and score
best_config = None
best_score = -np.inf
for _ in range(1000):
config = {}
log_ratio = 0
for hp, good_kde, bad_kde in configs:
hp_name = hp['name']
# Sample from good distribution
value = self._sample_from_prior(hp)
config[hp_name] = value
# Calculate log ratio
good_prob = good_kde(value)
bad_prob = bad_kde(value)
if bad_prob > 0:
log_ratio += np.log(good_prob + 1e-10) - \
np.log(bad_prob + 1e-10)
if log_ratio > best_score:
best_score = log_ratio
best_config = config
return best_config
def optimize(self, n_trials: int = 100,
verbose: bool = True) -> Dict:
"""Run TPE optimization."""
for i in range(n_trials):
# Suggest next configuration
config = self._suggest_next()
# Evaluate
value = self.objective(config)
self.trials.append(config)
self.results.append(value)
if verbose:
print(f"Trial {i+1}: {value:.4f} "
f"(best: {min(self.results):.4f})")
best_idx = np.argmin(self.results)
return {
'best_config': self.trials[best_idx],
'best_value': self.results[best_idx],
'all_configs': self.trials,
'all_values': self.results
}
Hyperband
import numpy as np
from typing import Dict, List, Callable
from math import log, ceil
class Hyperband:
"""Hyperband for early-stopping based HPO."""
def __init__(self, objective: Callable,
max_resource: int = 81,
eta: int = 3):
"""
Args:
objective: Function to minimize (takes config and resource)
max_resource: Maximum resource (e.g., epochs)
eta: Elimination rate
"""
self.objective = objective
self.max_resource = max_resource
self.eta = eta
def _generate_random_config(self) -> Dict:
"""Generate random hyperparameter configuration."""
return {
'learning_rate': 10 ** np.random.uniform(-5, -1),
'batch_size': np.random.choice([16, 32, 64, 128]),
'dropout': np.random.uniform(0.1, 0.5),
'hidden_size': np.random.choice([64, 128, 256, 512])
}
def optimize(self, n_iterations: int = 1,
verbose: bool = True) -> Dict:
"""Run Hyperband optimization."""
s = int(log(self.max_resource) / log(self.eta))
all_configs = []
all_values = []
for i in range(s, -1, -1):
n = int(ceil(self.max_resource / self.eta**i * self.eta / (s + 1)))
r = self.max_resource * self.eta**(-i)
if verbose:
print(f"\n--- Bracket {i}: n={n}, r={r} ---")
# Generate random configs
configs = [self._generate_random_config() for _ in range(n)]
for j in range(s - i + 1):
# Evaluate all configs with current resource
values = []
for config in configs:
value = self.objective(config, int(r))
values.append(value)
all_configs.extend(configs)
all_values.extend(values)
if verbose:
print(f" Round {j}: evaluated {len(configs)} configs, "
f"best: {min(values):.4f}")
# Keep top 1/eta configs
n_keep = max(1, int(len(configs) / self.eta))
sorted_indices = np.argsort(values)[:n_keep]
configs = [configs[i] for i in sorted_indices]
best_idx = np.argmin(all_values)
return {
'best_config': all_configs[best_idx],
'best_value': all_values[best_idx],
'all_configs': all_configs,
'all_values': all_values
}
class SuccessiveHalving:
"""Successive Halving for resource-efficient optimization."""
def __init__(self, objective: Callable,
min_resource: int = 1,
max_resource: int = 64,
reduction_factor: int = 3):
self.objective = objective
self.min_resource = min_resource
self.max_resource = max_resource
self.reduction_factor = reduction_factor
def optimize(self, configs: List[Dict] = None,
n_configs: int = 27,
verbose: bool = True) -> Dict:
"""Run Successive Halving."""
if configs is None:
configs = [self._generate_random_config() for _ in range(n_configs)]
n_configs = len(configs)
n_iterations = int(log(n_configs) / log(self.reduction_factor))
all_configs = []
all_values = []
for i in range(n_iterations):
n_resources = int(self.min_resource * self.reduction_factor ** i)
n_configs = len(configs)
if verbose:
print(f"\nIteration {i+1}: {n_configs} configs, "
f"{n_resources} resources")
# Evaluate all configs
values = []
for config in configs:
value = self.objective(config, n_resources)
values.append(value)
all_configs.append(config)
all_values.append(value)
# Keep top 1/reduction_factor configs
n_keep = max(1, int(n_configs / self.reduction_factor))
sorted_indices = np.argsort(values)[:n_keep]
configs = [configs[i] for i in sorted_indices]
best_idx = np.argmin(all_values)
return {
'best_config': all_configs[best_idx],
'best_value': all_values[best_idx],
'all_configs': all_configs,
'all_values': all_values
}
def _generate_random_config(self) -> Dict:
return {
'learning_rate': 10 ** np.random.uniform(-5, -1),
'batch_size': np.random.choice([16, 32, 64]),
'n_layers': np.random.randint(1, 5)
}
β οΈ
Hyperband is excellent for models where training can be early-stopped (e.g., neural networks). For models without this capability, use Bayesian optimization or TPE.
Neural Architecture Search (NAS)
DARTS-Based NAS
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import List, Dict, Tuple
class MixedOperation(nn.Module):
"""Mixed operation for differentiable NAS."""
def __init__(self, C: int, stride: int):
super().__init__()
self.ops = nn.ModuleList([
nn.Identity() if stride == 1 else nn.AvgPool2d(3, stride=stride, padding=1),
nn.ZeroPad2d(0),
nn.Conv2d(C, C, 3, stride=stride, padding=1, bias=False),
nn.Conv2d(C, C, 5, stride=stride, padding=2, bias=False),
nn.Conv2d(C, C, 7, stride=stride, padding=3, bias=False),
nn.MaxPool2d(3, stride=stride, padding=1),
nn.AvgPool2d(3, stride=stride, padding=1),
])
self.betas = nn.Parameter(1e-3 * torch.randn(len(self.ops)))
def forward(self, x: torch.Tensor) -> torch.Tensor:
weights = F.softmax(self.betas, dim=0)
out = sum(w * op(x) for w, op in zip(weights, self.ops))
return out
class DARTSArchitecture:
"""Differentiable Architecture Search."""
def __init__(self, C: int = 16, n_layers: int = 8,
n_nodes: int = 4):
self.C = C
self.n_layers = n_layers
self.n_nodes = n_nodes
# Architecture parameters
self.arch_params = nn.ParameterDict({
'alphas': nn.Parameter(1e-3 * torch.randn(n_nodes, 7))
})
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Forward pass with architecture parameters."""
s0 = s1 = x
for layer in range(self.n_layers):
s0, s1 = s1, self._mixed_forward(s0, s1, layer)
return s1
def _mixed_forward(self, s0: torch.Tensor,
s1: torch.Tensor,
layer: int) -> torch.Tensor:
"""Forward through mixed operations."""
weights = F.softmax(self.arch_params['alphas'], dim=-1)
# Simplified: just use weights to combine operations
s2 = sum(w * s0 for w in weights[layer])
return s2
def get_architecture(self) -> Dict:
"""Extract best architecture from learned parameters."""
weights = F.softmax(self.arch_params['alphas'], dim=-1)
best_ops = weights.argmax(dim=-1)
op_names = [
'identity', 'zero', 'conv3x3', 'conv5x5',
'conv7x7', 'maxpool', 'avgpool'
]
architecture = {
'operations': [op_names[op] for op in best_ops],
'weights': weights.detach().numpy().tolist()
}
return architecture
class NASBench201:
"""NAS-Bench-201 style search space."""
def __init__(self):
self.ops = {
'zero': lambda C: Zero(),
'identity': lambda C: nn.Identity(),
'conv1x1': lambda C: nn.Conv2d(C, C, 1, padding=0),
'conv3x3': lambda C: nn.Conv2d(C, C, 3, padding=1),
'maxpool3x3': lambda C: nn.MaxPool2d(3, stride=1, padding=1),
'avgpool3x3': lambda C: nn.AvgPool2d(3, stride=1, padding=1),
}
def sample_architecture(self) -> Dict:
"""Sample random architecture."""
ops = list(self.ops.keys())
return {
'edge_0_1': np.random.choice(ops),
'edge_0_2': np.random.choice(ops),
'edge_1_2': np.random.choice(ops),
'edge_0_3': np.random.choice(ops),
'edge_1_3': np.random.choice(ops),
'edge_2_3': np.random.choice(ops),
}
class Zero(nn.Module):
def forward(self, x):
return x * 0
Population-Based Training
import numpy as np
from typing import Dict, List, Callable
from dataclasses import dataclass
import copy
@dataclass
class Individual:
config: Dict
fitness: float = None
generation: int = 0
class PopulationBasedTraining:
"""Population-Based Training for hyperparameter optimization."""
def __init__(self, objective: Callable,
hyperparameters: List[Dict],
population_size: int = 20,
generations: int = 10,
mutation_rate: float = 0.3,
crossover_rate: float = 0.7):
self.objective = objective
self.hyperparameters = hyperparameters
self.population_size = population_size
self.generations = generations
self.mutation_rate = mutation_rate
self.crossover_rate = crossover_rate
self.population = []
self.hall_of_fame = []
def _initialize_population(self) -> List[Individual]:
"""Initialize random population."""
population = []
for _ in range(self.population_size):
config = {}
for hp in self.hyperparameters:
if hp['type'] == 'float':
config[hp['name']] = np.random.uniform(
hp['low'], hp['high']
)
elif hp['type'] == 'int':
config[hp['name']] = np.random.randint(
hp['low'], hp['high'] + 1
)
elif hp['type'] == 'categorical':
config[hp['name']] = np.random.choice(hp['choices'])
population.append(Individual(config=config))
return population
def _evaluate_population(self, population: List[Individual]):
"""Evaluate fitness of all individuals."""
for individual in population:
if individual.fitness is None:
individual.fitness = self.objective(individual.config)
def _select_parents(self, population: List[Individual],
n_parents: int = 2) -> List[Individual]:
"""Tournament selection."""
parents = []
for _ in range(n_parents):
tournament_size = 3
tournament = np.random.choice(
population, size=tournament_size, replace=False
)
best = min(tournament, key=lambda x: x.fitness)
parents.append(best)
return parents
def _crossover(self, parent1: Individual,
parent2: Individual) -> Individual:
"""Uniform crossover."""
child_config = {}
for hp in self.hyperparameters:
hp_name = hp['name']
if np.random.random() < 0.5:
child_config[hp_name] = parent1.config[hp_name]
else:
child_config[hp_name] = parent2.config[hp_name]
return Individual(
config=child_config,
generation=max(parent1.generation, parent2.generation) + 1
)
def _mutate(self, individual: Individual) -> Individual:
"""Mutate hyperparameters."""
mutated_config = copy.deepcopy(individual.config)
for hp in self.hyperparameters:
if np.random.random() < self.mutation_rate:
hp_name = hp['name']
if hp['type'] == 'float':
# Gaussian perturbation
current = mutated_config[hp_name]
std = (hp['high'] - hp['low']) * 0.1
new_value = current + np.random.normal(0, std)
new_value = np.clip(new_value, hp['low'], hp['high'])
mutated_config[hp_name] = new_value
elif hp['type'] == 'int':
current = mutated_config[hp_name]
delta = np.random.choice([-1, 1])
new_value = current + delta
new_value = np.clip(new_value, hp['low'], hp['high'])
mutated_config[hp_name] = new_value
elif hp['type'] == 'categorical':
mutated_config[hp_name] = np.random.choice(hp['choices'])
return Individual(
config=mutated_config,
fitness=None,
generation=individual.generation + 1
)
def optimize(self, verbose: bool = True) -> Dict:
"""Run Population-Based Training."""
# Initialize
self.population = self._initialize_population()
self._evaluate_population(self.population)
for gen in range(self.generations):
# Sort by fitness
self.population.sort(key=lambda x: x.fitness)
# Update hall of fame
self.hall_of_fame.extend(self.population[:5])
self.hall_of_fame.sort(key=lambda x: x.fitness)
self.hall_of_fame = self.hall_of_fame[:10]
if verbose:
best = self.population[0]
print(f"Generation {gen+1}: best fitness = {best.fitness:.4f}")
# Create new population
new_population = []
# Elitism: keep top 10%
elite_count = max(1, int(self.population_size * 0.1))
new_population.extend(self.population[:elite_count])
# Fill rest with crossover and mutation
while len(new_population) < self.population_size:
# Selection
parents = self._select_parents(self.population)
# Crossover
if np.random.random() < self.crossover_rate:
child = self._crossover(parents[0], parents[1])
else:
child = Individual(
config=copy.deepcopy(parents[0].config),
generation=parents[0].generation + 1
)
# Mutation
child = self._mutate(child)
new_population.append(child)
self.population = new_population
self._evaluate_population(self.population)
# Return best
self.population.sort(key=lambda x: x.fitness)
return {
'best_config': self.population[0].config,
'best_fitness': self.population[0].fitness,
'hall_of_fame': [
{'config': ind.config, 'fitness': ind.fitness}
for ind in self.hall_of_fame
]
}
βΉοΈ
Population-Based Training combines evolutionary algorithms with learning rate schedules. It's particularly effective for deep learning where the optimal hyperparameters change during training.
AutoML Framework Integration
Optuna Integration
import optuna
from typing import Dict, List, Callable
import numpy as np
class OptunaAutoML:
"""AutoML using Optuna framework."""
def __init__(self, objective: Callable,
n_trials: int = 100,
timeout: int = 3600):
self.objective = objective
self.n_trials = n_trials
self.timeout = timeout
self.study = None
def define_search_space(self, trial: optuna.Trial) -> Dict:
"""Define hyperparameter search space."""
return {
'learning_rate': trial.suggest_float(
'learning_rate', 1e-5, 1e-1, log=True
),
'n_estimators': trial.suggest_int(
'n_estimators', 50, 1000
),
'max_depth': trial.suggest_int(
'max_depth', 3, 15
),
'min_child_weight': trial.suggest_int(
'min_child_weight', 1, 10
),
'subsample': trial.suggest_float(
'subsample', 0.5, 1.0
),
'colsample_bytree': trial.suggest_float(
'colsample_bytree', 0.5, 1.0
),
'reg_alpha': trial.suggest_float(
'reg_alpha', 1e-8, 10.0, log=True
),
'reg_lambda': trial.suggest_float(
'reg_lambda', 1e-8, 10.0, log=True
),
'gamma': trial.suggest_float(
'gamma', 1e-8, 5.0, log=True
)
}
def optimize(self, verbose: bool = True) -> Dict:
"""Run Optuna optimization."""
def optuna_objective(trial):
config = self.define_search_space(trial)
return self.objective(config)
# Create study
self.study = optuna.create_study(
direction='minimize',
sampler=optuna.samplers.TPESampler(seed=42),
pruner=optuna.pruners.MedianPruner()
)
# Optimize
self.study.optimize(
optuna_objective,
n_trials=self.n_trials,
timeout=self.timeout,
show_progress_bar=verbose
)
return {
'best_config': self.study.best_params,
'best_value': self.study.best_value,
'n_trials': len(self.study.trials),
'study': self.study
}
def get_feature_importance(self) -> Dict:
"""Get hyperparameter importance."""
if self.study is None:
return {}
importances = optuna.importance.get_param_importances(self.study)
return importances
def get_optimization_history(self) -> Dict:
"""Get optimization history."""
if self.study is None:
return {}
trials = self.study.trials
return {
'trials': [
{
'number': trial.number,
'value': trial.value,
'params': trial.params,
'state': trial.state.name
}
for trial in trials
],
'best_values': [
min(t.value for t in trials[:i+1] if t.value is not None)
for i in range(len(trials))
]
}
Ray Tune Integration
import ray
from ray import tune
from ray.tune.schedulers import ASHAScheduler
from ray.tune.search.optuna import OptunaSearch
from typing import Dict, List, Callable
class RayTuneAutoML:
"""AutoML using Ray Tune for distributed optimization."""
def __init__(self, objective: Callable,
n_trials: int = 100,
n_cpus: int = 8,
n_gpus: float = 1.0):
self.objective = objective
self.n_trials = n_trials
self.n_cpus = n_cpus
self.n_gpus = n_gpus
# Initialize Ray
if not ray.is_initialized():
ray.init(num_cpus=n_cpus, num_gpus=n_gpus)
def train_func(self, config: Dict):
"""Training function for Ray Tune."""
# Update config with Ray-specific settings
config['device'] = 'cuda' if self.n_gpus > 0 else 'cpu'
# Run objective
result = self.objective(config)
# Report results to Ray Tune
tune.report(
loss=result['loss'],
accuracy=result.get('accuracy', 0),
**result.get('metrics', {})
)
def optimize(self, verbose: bool = True) -> Dict:
"""Run distributed optimization with Ray Tune."""
# Define search space
search_space = {
'learning_rate': tune.loguniform(1e-5, 1e-1),
'batch_size': tune.choice([16, 32, 64, 128]),
'hidden_size': tune.choice([64, 128, 256, 512]),
'n_layers': tune.randint(1, 6),
'dropout': tune.uniform(0.1, 0.5),
}
# Define scheduler (early stopping)
scheduler = ASHAScheduler(
max_t=100,
grace_period=10,
reduction_factor=2
)
# Define search algorithm
search = OptunaSearch()
# Run optimization
analysis = tune.run(
self.train_func,
config=search_space,
num_samples=self.n_trials,
scheduler=scheduler,
search_alg=search,
metric='loss',
mode='min',
resources_per_trial={
'cpu': self.n_cpus // self.n_trials,
'gpu': self.n_gpus / self.n_trials
},
verbose=verbose
)
# Get best result
best_trial = analysis.get_best_trial('loss', 'min', 'last')
return {
'best_config': best_trial.config,
'best_value': best_trial.last_result['loss'],
'analysis': analysis
}
def cleanup(self):
"""Cleanup Ray resources."""
if ray.is_initialized():
ray.shutdown()
βΉοΈ
Ray Tune provides distributed hyperparameter optimization with early stopping and resource allocation. Use it for large-scale AutoML experiments requiring multiple GPUs.
Summary
AutoML and HPO automate ML pipeline optimization:
- Bayesian Optimization: Sample-efficient, uses GP surrogate
- TPE: Tree-structured Parzen Estimator for non-GP methods
- Hyperband: Early-stopping based resource allocation
- NAS: Neural Architecture Search for model design
- Population-Based: Evolutionary algorithms with learning
Choose the method based on your computational budget and model type.