Interview Question (Hard) β Asked at: Google, Netflix, Uber, Meta, Airbnb
"Walk us through the end-to-end MLOps lifecycle at scale. How do you manage data versioning, ensure reproducible training, handle model evaluation gates, and orchestrate deployments across multiple environments?"
The Complete MLOps Lifecycle Architecture
The MLOps lifecycle is a systematic framework that governs the journey of machine learning models from conception to production and beyond. At scale, this involves coordinating data pipelines, feature engineering, model training, evaluation, deployment, monitoring, and retraining in a continuous feedback loop.
MLOps Maturity Levels
Organizations typically evolve through maturity levels:
| Level | Name | Characteristics |
|---|---|---|
| 0 | Manual | Manual process, no pipeline |
| 1 | Pipeline Automation | ML pipeline automation |
| 2 | CI/CD for ML | CI/CD with pipeline automation |
| 3 | Full Automation | Full MLOps with continuous training |
βΉοΈ
At Google scale, the MLOps lifecycle involves millions of models serving billions of predictions daily. The infrastructure must handle versioning, rollback, A/B testing, and canary deployments simultaneously.
Data Versioning Strategies
Data versioning is the foundation of reproducible ML. Unlike code, data volumes can reach petabytes, making traditional version control impractical.
Data Versioning with DVC (Data Version Control)
# dvc.yaml - DVC pipeline definition
stages:
prepare:
cmd: python src/data/prepare.py
deps:
- src/data/prepare.py
- data/raw/dataset.csv
params:
- prepare.yaml:
- test_size
- random_state
outs:
- data/prepared/train.csv
- data/prepared/test.csv
metrics:
- metrics/prepare.json:
cache: false
featurize:
cmd: python src/features/build_features.py
deps:
- src/features/build_features.py
- data/prepared/train.csv
- data/prepared/test.csv
outs:
- data/features/train_features.csv
- data/features/test_features.csv
train:
cmd: python src/models/train.py
deps:
- src/models/train.py
- data/features/train_features.csv
params:
- train.yaml:
- learning_rate
- n_estimators
- max_depth
outs:
- models/model.pkl
metrics:
- metrics/train.json:
cache: false
Delta Lake for ACID Transactions on Data Lakes
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("MLDataVersioning") \
.config("spark.sql.extensions",
"io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Enable Delta Lake time travel
df = spark.read.format("delta").load("/data/training_set")
# Version 0 - initial load
df.write.format("delta").mode("overwrite").save("/data/training_set")
# Version 1 - updated with new data
new_df = spark.read.format("delta").load("/data/training_set")
updated_df = new_df.union(spark.read.parquet("/data/new_records"))
updated_df.write.format("delta").mode("overwrite").save("/data/training_set")
# Time travel to any version
df_v0 = spark.read.format("delta").option("versionAsOf", 0).load("/data/training_set")
df_v1 = spark.read.format("delta").option("versionAsOf", 1).load("/data/training_set")
# Compare versions
print(f"Version 0 rows: {df_v0.count()}")
print(f"Version 1 rows: {df_v1.count()}")
# Get history
delta_table = DeltaTable.forPath(spark, "/data/training_set")
history = delta_table.history()
history.show()
LakeFS for Git-like Data Operations
# Initialize LakeFS repository
lakectl repo create s3://my-bucket/mlops-data \
--storage-namespace s3://my-lakfs-storage \
--default-branch main
# Create a branch for experimentation
lakectl branch create s3://my-bucket/mlops-data \
--source main \
experiment/v2
# Commit data changes
lakectl commit s3://my-bucket/mlops-data \
--message "Added 100K new training samples" \
--metadata author=ml-engineer \
--metadata pipeline_id=12345
# Diff between branches
lakectl diff s3://my-bucket/mlops-data/main \
s3://my-bucket/mlops-data/experiment/v2
# Merge experimental branch back
lakectl merge s3://my-bucket/mlops-data/experiment/v2 \
s3://my-bucket/mlops-data/main
β οΈ
Data versioning at scale requires careful consideration of storage costs. Delta Lake and LakeFS provide efficient storage through copy-on-write mechanisms, but petabyte-scale datasets need tiered storage strategies.
Training Pipeline Design
Reproducible Training with Configuration Management
# config/train_config.yaml
experiment:
name: "fraud_detection_v3"
seed: 42
framework: "xgboost"
data:
version: "v2.1"
path: "s3://ml-data/training/fraud_v2.1/"
features:
- "transaction_amount"
- "time_since_last_transaction"
- "merchant_category"
- "user_transaction_count_24h"
target: "is_fraud"
split:
train: 0.7
validation: 0.15
test: 0.15
model:
type: "XGBClassifier"
params:
n_estimators: 500
max_depth: 8
learning_rate: 0.01
subsample: 0.8
colsample_bytree: 0.8
min_child_weight: 3
gamma: 0.1
reg_alpha: 0.1
reg_lambda: 1.0
training:
early_stopping_rounds: 50
eval_metric: "auc"
verbose: 100
evaluation:
metrics:
- "auc_roc"
- "precision"
- "recall"
- "f1_score"
thresholds:
auc_roc: 0.95
precision: 0.90
recall: 0.85
fairness:
protected_attributes:
- "age_group"
- "gender"
max_disparity: 0.1
deployment:
strategy: "canary"
environments:
- "staging"
- "production"
rollback:
enabled: true
trigger: "accuracy_drop > 5%"
Python Training Pipeline with MLflow
import mlflow
import mlflow.xgboost
import xgboost as xgb
import pandas as pd
import numpy as np
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import (
roc_auc_score, precision_score, recall_score,
f1_score, confusion_matrix
)
import yaml
import hashlib
from datetime import datetime
from pathlib import Path
class MLTrainingPipeline:
def __init__(self, config_path: str):
with open(config_path) as f:
self.config = yaml.safe_load(f)
mlflow.set_experiment(self.config['experiment']['name'])
def _compute_data_hash(self, df: pd.DataFrame) -> str:
"""Compute deterministic hash of dataset for versioning."""
return hashlib.md5(
pd.util.hash_pandas_object(df).values.tobytes()
).hexdigest()
def prepare_data(self):
"""Load and prepare training data."""
data_path = self.config['data']['path']
df = pd.read_parquet(data_path)
features = self.config['data']['features']
target = self.config['data']['target']
X = df[features]
y = df[target]
data_hash = self._compute_data_hash(df)
return X, y, data_hash
def train_with_cv(self, X, y):
"""Train model with cross-validation."""
params = self.config['model']['params']
n_folds = 5
skf = StratifiedKFold(
n_splits=n_folds,
shuffle=True,
random_state=self.config['experiment']['seed']
)
oof_predictions = np.zeros(len(X))
models = []
cv_scores = []
for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]
dtrain = xgb.DMatrix(X_train, label=y_train)
dval = xgb.DMatrix(X_val, label=y_val)
model = xgb.train(
params,
dtrain,
num_boost_round=1000,
evals=[(dval, 'val')],
early_stopping_rounds=50,
verbose_eval=100
)
val_pred = model.predict(dval)
oof_predictions[val_idx] = val_pred
auc = roc_auc_score(y_val, val_pred)
cv_scores.append(auc)
models.append(model)
mlflow.log_metric(f"fold_{fold}_auc", auc)
return models, oof_predictions, cv_scores
def evaluate(self, y_true, y_pred):
"""Comprehensive model evaluation."""
metrics = {
'auc_roc': roc_auc_score(y_true, y_pred),
'precision': precision_score(y_true, (y_pred > 0.5).astype(int)),
'recall': recall_score(y_true, (y_pred > 0.5).astype(int)),
'f1_score': f1_score(y_true, (y_pred > 0.5).astype(int)),
}
thresholds = self.config['evaluation']['thresholds']
for metric, threshold in thresholds.items():
if metric in metrics:
passed = metrics[metric] >= threshold
metrics[f'{metric}_passed'] = passed
if not passed:
raise ValueError(
f"Metric {metric}={metrics[metric]:.4f} "
f"below threshold {threshold}"
)
return metrics
def run(self):
"""Execute full training pipeline."""
with mlflow.start_run(run_name=f"run_{datetime.now():%Y%m%d_%H%M%S}"):
# Log configuration
mlflow.log_params(self.config['model']['params'])
mlflow.log_param("data_version", self.config['data']['version'])
mlflow.log_param("seed", self.config['experiment']['seed'])
# Prepare data
X, y, data_hash = self.prepare_data()
mlflow.log_param("data_hash", data_hash)
mlflow.log_param("dataset_size", len(X))
# Train
models, oof_preds, cv_scores = self.train_with_cv(X, y)
# Evaluate
metrics = self.evaluate(y, oof_preds)
mlflow.log_metrics(metrics)
mlflow.log_metric("cv_mean_auc", np.mean(cv_scores))
mlflow.log_metric("cv_std_auc", np.std(cv_scores))
# Log best model
best_model = models[np.argmax(cv_scores)]
mlflow.xgboost.log_model(
best_model,
"model",
registered_model_name=self.config['experiment']['name']
)
return metrics
if __name__ == "__main__":
pipeline = MLTrainingPipeline("config/train_config.yaml")
metrics = pipeline.run()
print(f"Training completed. Metrics: {metrics}")
Model Evaluation Gates
Multi-Dimensional Evaluation Framework
class ModelEvaluationGates:
"""Enforce quality gates before model deployment."""
def __init__(self, config: dict):
self.config = config
self.gates_passed = []
self.gates_failed = []
def performance_gate(self, metrics: dict) -> bool:
"""Gate 1: Model performance meets minimum thresholds."""
thresholds = self.config['evaluation']['thresholds']
for metric, threshold in thresholds.items():
if metric not in metrics:
self.gates_failed.append(f"Missing metric: {metric}")
return False
if metrics[metric] < threshold:
self.gates_failed.append(
f"Performance: {metric}={metrics[metric]:.4f} "
f"< {threshold}"
)
return False
self.gates_passed.append("Performance gate PASSED")
return True
def fairness_gate(self, predictions, protected_attrs) -> bool:
"""Gate 2: Model meets fairness criteria."""
max_disparity = self.config['evaluation']['fairness']['max_disparity']
for attr in protected_attrs.columns.unique():
groups = protected_attrs[attr].unique()
group_aucs = {}
for group in groups:
mask = protected_attrs[attr] == group
if mask.sum() > 100: # Minimum sample size
group_auc = roc_auc_score(
self.y_true[mask],
predictions[mask]
)
group_aucs[group] = group_auc
if len(group_aucs) > 1:
disparities = []
for i, g1 in enumerate(groups):
for g2 in groups[i+1:]:
if g1 in group_aucs and g2 in group_aucs:
disp = abs(group_aucs[g1] - group_aucs[g2])
disparities.append(disp)
max_disp = max(disparities) if disparities else 0
if max_disp > max_disparity:
self.gates_failed.append(
f"Fairness: {attr} disparity {max_disp:.4f} "
f"> {max_disparity}"
)
return False
self.gates_passed.append("Fairness gate PASSED")
return True
def robustness_gate(self, model, X_test, y_test) -> bool:
"""Gate 3: Model robustness under perturbation."""
baseline_auc = roc_auc_score(y_test, model.predict(X_test))
perturbations = [
('noise_001', lambda x: x + np.random.normal(0, 0.001, x.shape)),
('noise_01', lambda x: x + np.random.normal(0, 0.01, x.shape)),
('noise_1', lambda x: x + np.random.normal(0, 0.1, x.shape)),
]
for name, perturb_fn in perturbations:
X_perturbed = perturb_fn(X_test.copy())
perturbed_auc = roc_auc_score(y_test, model.predict(X_perturbed))
degradation = baseline_auc - perturbed_auc
mlflow.log_metric(f"robustness_{name}_degradation", degradation)
if degradation > 0.05: # 5% max degradation threshold
self.gates_failed.append(
f"Robustness: {name} degradation {degradation:.4f} > 0.05"
)
return False
self.gates_passed.append("Robustness gate PASSED")
return True
def data_quality_gate(self, data_stats: dict) -> bool:
"""Gate 4: Training data quality checks."""
checks = {
'null_ratio': lambda x: x < 0.05,
'feature_variance': lambda x: x > 1e-6,
'class_balance': lambda x: 0.2 < x < 0.8,
}
for metric, check_fn in checks.items():
if metric in data_stats:
if not check_fn(data_stats[metric]):
self.gates_failed.append(
f"Data Quality: {metric}={data_stats[metric]}"
)
return False
self.gates_passed.append("Data quality gate PASSED")
return True
def evaluate_all(self, metrics, predictions, protected_attrs,
model, X_test, y_test, data_stats) -> bool:
"""Run all evaluation gates."""
gates = [
("Performance", lambda: self.performance_gate(metrics)),
("Fairness", lambda: self.fairness_gate(predictions, protected_attrs)),
("Robustness", lambda: self.robustness_gate(model, X_test, y_test)),
("Data Quality", lambda: self.data_quality_gate(data_stats)),
]
all_passed = True
for name, gate_fn in gates:
try:
passed = gate_fn()
if not passed:
all_passed = False
except Exception as e:
self.gates_failed.append(f"{name}: Exception - {str(e)}")
all_passed = False
return all_passed
β
Never skip evaluation gates for "quick" deployments. At scale, a single bad model can affect millions of users and cost millions in revenue. Always enforce automated quality gates.
Deployment Strategies
Canary Deployment with Traffic Shifting
# kubernetes/canary-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: model-canary
labels:
app: ml-model
version: v2
spec:
replicas: 2
selector:
matchLabels:
app: ml-model
version: v2
template:
metadata:
labels:
app: ml-model
version: v2
spec:
containers:
- name: model-server
image: registry.example.com/ml-model:v2
ports:
- containerPort: 8080
resources:
requests:
memory: "2Gi"
cpu: "1000m"
nvidia.com/gpu: "1"
limits:
memory: "4Gi"
cpu: "2000m"
nvidia.com/gpu: "1"
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 60
periodSeconds: 30
---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: ml-model-vs
spec:
hosts:
- ml-model
http:
- route:
- destination:
host: ml-model
subset: stable
weight: 90
- destination:
host: ml-model
subset: canary
weight: 10
---
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: ml-model-dr
spec:
host: ml-model
subsets:
- name: stable
labels:
version: v1
- name: canary
labels:
version: v2
Automated Rollback on Metric Degradation
from kubernetes import client, config
import time
import requests
from prometheus_api_client import PrometheusConnect
class CanaryDeploymentManager:
def __init__(self, prometheus_url: str, kube_config_path: str):
config.load_kube_config(config_file=kube_config_path)
self.apps_v1 = client.AppsV1Api()
self.networking_v1 = client.NetworkingV1Api()
self.prom = PrometheusConnect(url=prometheus_url)
def get_model_metrics(self, model_name: str, window: str = "5m") -> dict:
"""Fetch current model metrics from Prometheus."""
queries = {
'latency_p99': f'histogram_quantile(0.99, rate(model_latency_seconds_bucket{{model="{model_name}"}}[{window}]))',
'error_rate': f'sum(rate(model_errors_total{{model="{model_name}"}}[{window}])) / sum(rate(model_requests_total{{model="{model_name}"}}[{window}]))',
'accuracy': f'avg(model_accuracy{{model="{model_name}"}}[{window}])',
}
metrics = {}
for name, query in queries.items():
result = self.prom.custom_query(query=query)
if result:
metrics[name] = float(result[0]['value'][1])
return metrics
def shift_traffic(self, canary_weight: int):
"""Shift traffic between stable and canary."""
vs = {
"apiVersion": "networking.istio.io/v1beta1",
"kind": "VirtualService",
"metadata": {"name": "ml-model-vs"},
"spec": {
"hosts": ["ml-model"],
"http": [{
"route": [
{
"destination": {"host": "ml-model", "subset": "stable"},
"weight": 100 - canary_weight
},
{
"destination": {"host": "ml-model", "subset": "canary"},
"weight": canary_weight
}
]
}]
}
}
self.networking_v1.patch_namespaced_custom_object(
group="networking.istio.io",
version="v1beta1",
namespace="ml-production",
plural="virtualservices",
name="ml-model-vs",
body=vs
)
def rollback(self, model_name: str):
"""Emergency rollback to stable version."""
print(f"ROLLBACK: Reverting {model_name} to stable version")
self.shift_traffic(canary_weight=0)
self.apps_v1.patch_namespaced_deployment(
name="model-canary",
namespace="ml-production",
body={"spec": {"replicas": 0}}
)
self.apps_v1.patch_namespaced_deployment(
name="model-stable",
namespace="ml-production",
body={"spec": {"replicas": 3}}
)
def canary_deploy(self, model_name: str,
canary_weight: int = 10,
increment: int = 10,
monitor_interval: int = 300,
max_failures: int = 3) -> bool:
"""Execute canary deployment with automated rollback."""
# Define thresholds
thresholds = {
'latency_p99': 0.5, # 500ms
'error_rate': 0.01, # 1%
'accuracy': 0.85, # 85%
}
failures = 0
current_weight = canary_weight
while current_weight <= 100:
print(f"Canary: shifting {current_weight}% traffic to canary")
self.shift_traffic(current_weight)
time.sleep(monitor_interval)
metrics = self.get_model_metrics(model_name)
print(f"Current metrics: {metrics}")
# Check thresholds
violated = False
for metric, threshold in thresholds.items():
if metric in metrics:
if metric == 'error_rate':
if metrics[metric] > threshold:
violated = True
elif metric == 'accuracy':
if metrics[metric] < threshold:
violated = True
elif metrics[metric] > threshold:
violated = True
if violated:
failures += 1
print(f"Threshold violation! Failure {failures}/{max_failures}")
if failures >= max_failures:
self.rollback(model_name)
return False
else:
failures = 0
if current_weight == 100:
print("Canary deployment complete!")
return True
current_weight = min(current_weight + increment, 100)
return True
Continuous Training Pipeline
ML Pipeline Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β MLOps Continuous Training β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β Data βββββΆβ Feature βββββΆβ Training βββββΆβ Evaluate β β
β β Ingest β β Store β β Pipeline β β & Gate β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β β β β β
β βΌ βΌ βΌ βΌ β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β Data β β Feature β β Model β β Model β β
β β Lake β β Registry β β Registry β β Registry β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β β
β βΌ β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β Monitor ββββββ Deploy ββββββ Canary ββββββ Promote β β
β β & Alert β β Service β β Testing β β to Prod β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β βββΆβ Feedback Loop: Retraining Trigger β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Airflow DAG for End-to-End ML Pipeline
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
default_args = {
'owner': 'ml-team',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'mlops_continuous_training',
default_args=default_args,
description='End-to-end ML pipeline',
schedule_interval='0 2 * * *', # Daily at 2 AM
start_date=days_ago(1),
catchup=False,
tags=['mlops', 'production'],
) as dag:
ingest = DockerOperator(
task_id='data_ingestion',
image='registry.example.com/ml-pipeline/ingest:latest',
command='python ingest.py --date {{ ds }}',
environment={
'DATA_SOURCE': 's3://raw-data-bucket/',
'OUTPUT_PATH': '/data/raw/{{ ds }}/'
},
mount_tmp_folder=False,
auto_remove=True,
)
validate = DockerOperator(
task_id='data_validation',
image='registry.example.com/ml-pipeline/validate:latest',
command='python validate.py --input /data/raw/{{ ds }}/',
)
feature = DockerOperator(
task_id='feature_engineering',
image='registry.example.com/ml-pipeline/features:latest',
command='python build_features.py --input /data/raw/{{ ds }}/',
)
train = DockerOperator(
task_id='model_training',
image='registry.example.com/ml-pipeline/train:latest',
command='python train.py',
gpu_request=1,
)
evaluate = DockerOperator(
task_id='model_evaluation',
image='registry.example.com/ml-pipeline/evaluate:latest',
command='python evaluate.py',
)
deploy = DockerOperator(
task_id='model_deployment',
image='registry.example.com/ml-pipeline/deploy:latest',
command='python deploy.py --env staging',
)
ingest >> validate >> feature >> train >> evaluate >> deploy
βΉοΈ
The entire pipeline should complete within a defined SLA. For daily retraining, aim for < 2 hours total. For real-time systems, consider incremental training with smaller windows.
Key Metrics for MLOps Monitoring
Track these metrics across your lifecycle:
MLOPS_METRICS = {
'data': {
'pipeline_duration_seconds': 'gauge',
'data_freshness_hours': 'gauge',
'schema_violations_total': 'counter',
'feature_drift_score': 'gauge',
},
'training': {
'training_duration_seconds': 'gauge',
'training_loss': 'gauge',
'validation_loss': 'gauge',
'model_size_bytes': 'gauge',
},
'serving': {
'prediction_latency_seconds': 'histogram',
'prediction_throughput_rps': 'gauge',
'model_accuracy': 'gauge',
'error_rate': 'gauge',
},
'infrastructure': {
'gpu_utilization': 'gauge',
'memory_usage_bytes': 'gauge',
'cpu_utilization': 'gauge',
'network_io_bytes': 'counter',
}
}
Summary
The MLOps lifecycle requires careful orchestration of:
- Data Versioning: Use DVC, Delta Lake, or LakeFS for reproducibility
- Training Pipelines: Automated, configurable, with experiment tracking
- Evaluation Gates: Multi-dimensional quality checks before deployment
- Deployment Strategies: Canary, blue-green, with automated rollback
- Continuous Training: Feedback loops for model freshness
Master this lifecycle to build production ML systems at scale.