Design an End-to-End ML Platform
Building a production ML platform with feature store, training pipelines, and model serving at scale
Interview Question
"Design an end-to-end ML platform like Netflix's or Uber's that can support hundreds of data scientists, manage thousands of ML models, handle feature engineering at scale, provide automated training pipelines, and serve models in production with low latency and high reliability."
Difficulty: Hard | Frequently asked at Netflix, Uber, Airbnb, Google, Meta
1. Requirements Gathering
Functional Requirements
- Feature Store: Centralized feature management with online/offline stores
- Training Pipeline: Automated model training with experiment tracking
- Model Registry: Version control for models with metadata
- Model Serving: Low-latency model inference at scale
- Monitoring: Model performance and data drift monitoring
- Experimentation: A/B testing framework for models
- Collaboration: Tools for data scientists to collaborate
Non-Functional Requirements
- Scalability: Support 100+ data scientists, 1000+ models
- Latency: < 10ms for online feature serving, < 100ms for model inference
- Throughput: 1M+ predictions per second
- Reliability: 99.99% uptime for model serving
- Reproducibility: All experiments must be reproducible
- Governance: Model lineage and audit trail
- Cost Efficiency: Optimize compute costs
βΉοΈ
Scale Perspective: Netflix runs 1000+ ML models in production. Uber's Michelangelo platform supports 1000+ data scientists. These platforms must handle massive scale while maintaining reliability and developer productivity.
2. High-Level Architecture Overview
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β USER LAYER β
β Data Scientists β ML Engineers β Business Users β APIs β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ML PLATFORM CORE β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β Feature β β Training β β Model β β Serving β β
β β Store β β Pipeline β β Registry β β Layer β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββΌββββββββββββββββ
βΌ βΌ βΌ
ββββββββββββββββββββββββββ βββββββββββββββββ ββββββββββββββββββββββββ
β DATA LAYER β β COMPUTE β β STORAGE β
β (Spark, Flink) β β (Kubernetes) β β (S3, HDFS, Redis) β
ββββββββββββββββββββββββββ βββββββββββββββββ ββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β OPERATIONS LAYER β
β Monitoring β Alerting β Logging β Cost Management β Governance β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π‘
Key Insight: An ML platform is not just about ML models. It's about enabling data scientists to be productive while ensuring production reliability. The platform must handle the full lifecycle from data to deployment.
3. Feature Store Design
3.1 Feature Store Architecture
class FeatureStore:
def __init__(self):
# Online store for real-time serving
self.online_store = RedisCluster(
startup_nodes=[
{"host": "redis-1", "port": 6379},
{"host": "redis-2", "port": 6379}
]
)
# Offline store for training
self.offline_store = SparkSession.builder \
.appName("FeatureStore") \
.getOrCreate()
# Feature registry
self.registry = FeatureRegistry()
async def get_online_features(self, feature_names, entity_rows):
"""Get features for online serving"""
pipeline = self.online_store.pipeline()
for entity in entity_rows:
for feature_name in feature_names:
key = self.generate_key(entity, feature_name)
pipeline.hget(key, 'value')
results = await pipeline.execute()
return self.format_results(results, feature_names, entity_rows)
def get_offline_features(self, feature_names, entity_df, date_range):
"""Get features for training"""
features = []
for feature_name in feature_names:
feature_df = self.offline_store.read.parquet(
f"s3://features/{feature_name}/"
).filter(
col('date').between(date_range[0], date_range[1])
)
features.append(feature_df)
# Join all features
result = features[0]
for df in features[1:]:
result = result.join(df, on='entity_id', how='outer')
return result
async def register_feature(self, feature_name, feature_config):
"""Register a new feature"""
await self.registry.register(feature_name, feature_config)
# Create online store table
await self.create_online_table(feature_name)
# Create offline store table
await self.create_offline_table(feature_name)
3.2 Feature Computation Pipeline
class FeatureComputationPipeline:
def __init__(self):
self.spark = SparkSession.builder \
.appName("FeatureComputation") \
.getOrCreate()
def compute_features(self, feature_config):
"""Compute features based on configuration"""
if feature_config['type'] == 'batch':
return self.compute_batch_features(feature_config)
elif feature_config['type'] == 'streaming':
return self.compute_streaming_features(feature_config)
elif feature_config['type'] == 'real_time':
return self.compute_real_time_features(feature_config)
def compute_batch_features(self, config):
"""Compute batch features (daily/hourly)"""
# Read source data
source_df = self.spark.read.parquet(config['source_path'])
# Apply transformations
features_df = self.apply_transformations(source_df, config['transformations'])
# Write to feature store
features_df.write \
.mode("overwrite") \
.partitionBy("date") \
.parquet(f"s3://features/{config['feature_name']}/")
return features_df
def compute_streaming_features(self, config):
"""Compute streaming features (real-time aggregation)"""
from pyspark.sql.streaming import StreamingQuery
# Read from Kafka
stream_df = self.spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", config['kafka_servers']) \
.option("subscribe", config['topic']) \
.load()
# Apply windowed aggregations
windowed_df = stream_df \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window("timestamp", "5 minutes"),
col("entity_id")
) \
.agg(
count("*").alias("count"),
avg("value").alias("avg_value")
)
# Write to feature store
query = windowed_df.writeStream \
.outputMode("update") \
.foreachBatch(self.write_to_online_store) \
.start()
return query
β οΈ
Critical Feature Store Considerations:
- Consistency: Online and offline stores must be consistent
- Freshness: Feature freshness requirements vary
- Backfilling: Support for historical feature computation
- Governance: Feature lineage and documentation
4. Training Pipeline Design
4.1 Training Pipeline Architecture
class TrainingPipeline:
def __init__(self):
self.feature_store = FeatureStore()
self.experiment_tracker = ExperimentTracker()
self.model_registry = ModelRegistry()
async def train_model(self, config):
"""Execute training pipeline"""
# Step 1: Get training data
training_data = await self.get_training_data(config)
# Step 2: Feature engineering
features = await self.engineer_features(training_data, config)
# Step 3: Train model
model, metrics = await self.train(features, config)
# Step 4: Evaluate model
evaluation = await self.evaluate_model(model, features, config)
# Step 5: Log experiment
await self.experiment_tracker.log(
config=config,
metrics=metrics,
evaluation=evaluation,
model=model
)
# Step 6: Register model
if evaluation['passed_validation']:
await self.model_registry.register(
model=model,
config=config,
metrics=metrics
)
return model, metrics
async def get_training_data(self, config):
"""Get training data from feature store"""
return self.feature_store.get_offline_features(
feature_names=config['features'],
entity_df=config['entity_df'],
date_range=config['date_range']
)
4.2 Experiment Tracking
class ExperimentTracker:
def __init__(self):
self.mlflow = mlflow.tracking.MlflowClient()
async def log(self, config, metrics, evaluation, model):
"""Log experiment details"""
with mlflow.start_run(run_name=config['run_name']):
# Log parameters
mlflow.log_params(config['hyperparameters'])
# Log metrics
for metric_name, metric_value in metrics.items():
mlflow.log_metric(metric_name, metric_value)
# Log evaluation metrics
for eval_name, eval_value in evaluation.items():
mlflow.log_metric(f"eval_{eval_name}", eval_value)
# Log model
mlflow.sklearn.log_model(model, "model")
# Log artifacts
mlflow.log_artifacts(config['artifacts_dir'])
# Log feature importance
if hasattr(model, 'feature_importances_'):
mlflow.log_dict(
dict(zip(config['features'], model.feature_importances_)),
"feature_importance.json"
)
4.3 Model Registry
class ModelRegistry:
def __init__(self):
self.registry = {} # In production, use a database
async def register(self, model, config, metrics):
"""Register a new model version"""
model_version = {
'model_name': config['model_name'],
'version': self.get_next_version(config['model_name']),
'model': model,
'config': config,
'metrics': metrics,
'status': 'staging',
'created_at': datetime.now()
}
# Store model
self.registry[config['model_name']] = model_version
# Save model to storage
await self.save_model(model, model_version)
return model_version
async def promote_to_production(self, model_name, version):
"""Promote model to production"""
model_version = self.registry[model_name][version]
model_version['status'] = 'production'
# Update serving layer
await self.update_serving_layer(model_name, model_version)
return model_version
βΉοΈ
Training Pipeline Best Practices:
- Use experiment tracking for reproducibility
- Implement model validation gates
- Support hyperparameter tuning
- Enable distributed training for large models
5. Model Serving Design
5.1 Model Serving Architecture
class ModelServingLayer:
def __init__(self):
self.model_cache = {}
self.feature_store = FeatureStore()
self.monitor = ModelMonitor()
async def predict(self, model_name, features):
"""Get prediction from model"""
# Get model
model = await self.get_model(model_name)
# Get features
feature_values = await self.feature_store.get_online_features(
feature_names=model['config']['features'],
entity_rows=features['entities']
)
# Make prediction
prediction = model['model'].predict(feature_values)
# Log prediction for monitoring
await self.monitor.log_prediction(
model_name=model_name,
features=features,
prediction=prediction
)
return {
'prediction': prediction,
'model_version': model['version'],
'latency_ms': self.measure_latency()
}
async def get_model(self, model_name):
"""Get model from cache or load from storage"""
if model_name in self.model_cache:
return self.model_cache[model_name]
# Load from model registry
model = await self.model_registry.get_production_model(model_name)
# Cache model
self.model_cache[model_name] = model
return model
5.2 Model Serving Options
class ModelServingOptions:
"""Different options for model serving"""
# Option 1: REST API
REST_API = {
'framework': 'FastAPI',
'latency': '< 100ms',
'throughput': '1000 QPS',
'use_case': 'Low-traffic models'
}
# Option 2: gRPC
GRPC = {
'framework': 'gRPC',
'latency': '< 50ms',
'throughput': '10000 QPS',
'use_case': 'High-traffic models'
}
# Option 3: TensorFlow Serving
TF_SERVING = {
'framework': 'TensorFlow Serving',
'latency': '< 20ms',
'throughput': '50000 QPS',
'use_case': 'TensorFlow models'
}
# Option 4: Triton Inference Server
TRITON = {
'framework': 'Triton',
'latency': '< 10ms',
'throughput': '100000 QPS',
'use_case': 'Multi-framework, GPU inference'
}
5.3 A/B Testing Framework
class ABTestingFramework:
def __init__(self):
self.experiments = {}
async def create_experiment(self, experiment_config):
"""Create a new A/B test"""
experiment = {
'name': experiment_config['name'],
'variants': experiment_config['variants'],
'traffic_split': experiment_config['traffic_split'],
'start_time': datetime.now(),
'end_time': experiment_config.get('end_time'),
'status': 'running'
}
self.experiments[experiment['name']] = experiment
return experiment
async def get_variant(self, experiment_name, user_id):
"""Get variant assignment for user"""
experiment = self.experiments[experiment_name]
# Deterministic assignment based on user_id
hash_value = hash(f"{user_id}:{experiment_name}")
variant_index = hash_value % 100
# Determine variant
cumulative = 0
for variant in experiment['variants']:
cumulative += variant['traffic_percentage']
if variant_index < cumulative:
return variant['name']
return experiment['variants'][-1]['name']
async def log_outcome(self, experiment_name, user_id, variant, outcome):
"""Log experiment outcome"""
experiment = self.experiments[experiment_name]
if 'outcomes' not in experiment:
experiment['outcomes'] = []
experiment['outcomes'].append({
'user_id': user_id,
'variant': variant,
'outcome': outcome,
'timestamp': datetime.now()
})
π‘
Model Serving Tips:
- Use model caching for low latency
- Implement request batching for GPU efficiency
- Support model versioning and rollback
- Monitor model performance in real-time
6. Monitoring and Observability
6.1 Model Monitoring
class ModelMonitor:
def __init__(self):
self.metrics_store = MetricsStore()
self.alert_manager = AlertManager()
async def monitor_model(self, model_name):
"""Monitor model performance"""
# Get recent predictions
predictions = await self.get_recent_predictions(model_name)
# Compute metrics
metrics = self.compute_metrics(predictions)
# Check for anomalies
anomalies = self.detect_anomalies(metrics)
# Send alerts if needed
for anomaly in anomalies:
await self.alert_manager.send_alert(anomaly)
# Log metrics
await self.metrics_store.log(model_name, metrics)
return metrics
def compute_metrics(self, predictions):
"""Compute monitoring metrics"""
return {
'prediction_distribution': self.compute_distribution(predictions),
'feature_drift': self.compute_feature_drift(predictions),
'prediction_drift': self.compute_prediction_drift(predictions),
'latency_p50': np.percentile(predictions['latency'], 50),
'latency_p99': np.percentile(predictions['latency'], 99),
'error_rate': self.compute_error_rate(predictions)
}
def detect_anomalies(self, metrics):
"""Detect anomalies in metrics"""
anomalies = []
# Check for feature drift
if metrics['feature_drift'] > 0.1:
anomalies.append({
'type': 'FEATURE_DRIFT',
'severity': 'HIGH',
'message': f"Feature drift detected: {metrics['feature_drift']:.3f}"
})
# Check for prediction drift
if metrics['prediction_drift'] > 0.1:
anomalies.append({
'type': 'PREDICTION_DRIFT',
'severity': 'MEDIUM',
'message': f"Prediction drift detected: {metrics['prediction_drift']:.3f}"
})
return anomalies
6.2 Monitoring Dashboard
class MonitoringDashboard:
def __init__(self):
self.grafana = GrafanaClient()
async def create_dashboard(self, model_name):
"""Create monitoring dashboard for model"""
dashboard = {
'title': f"Model Dashboard: {model_name}",
'panels': [
{
'title': 'Prediction Latency',
'targets': [{
'expr': f'histogram_quantile(0.99, model_latency_seconds_bucket{{model="{model_name}"}})',
'legendFormat': 'p99 latency'
}]
},
{
'title': 'Prediction Distribution',
'targets': [{
'expr': f'histogram_quantile(0.5, model_prediction_bucket{{model="{model_name}"}})',
'legendFormat': 'Median prediction'
}]
},
{
'title': 'Error Rate',
'targets': [{
'expr': f'rate(model_errors_total{{model="{model_name}"}}[5m])',
'legendFormat': 'Error rate'
}]
}
]
}
await self.grafana.create_dashboard(dashboard)
β οΈ
Critical Monitoring Points:
- Data drift: Monitor for changes in feature distributions
- Prediction drift: Monitor for changes in prediction patterns
- Model performance: Track accuracy metrics over time
- Latency: Monitor inference latency
7. Scale Considerations and Trade-offs
7.1 Horizontal Scaling
Feature Store: Redis cluster with sharding
Training: Kubernetes with GPU nodes
Model Serving: Horizontal scaling with load balancing
Monitoring: Distributed metrics collection
7.2 Cost vs Performance Trade-offs
| Dimension | Option A (Cost Optimized) | Option B (Performance Optimized) |
|---|---|---|
| Feature Store | Batch features only | Real-time streaming features |
| Training | Single-node training | Distributed training |
| Serving | CPU inference | GPU inference |
| Monitoring | Sampled monitoring | Full monitoring |
8. Advanced Topics
8.1 Feature Store Best Practices
class FeatureStoreBestPractices:
"""Best practices for feature store design"""
BEST_PRACTICES = {
'feature_naming': 'Use consistent naming conventions',
'feature_documentation': 'Document all features with descriptions',
'feature_versioning': 'Version features for reproducibility',
'feature_validation': 'Validate features before serving',
'feature_monitoring': 'Monitor feature quality and drift'
}
8.2 MLOps Best Practices
class MLOpsBestPractices:
"""Best practices for MLOps"""
BEST_PRACTICES = {
'reproducibility': 'Ensure all experiments are reproducible',
'version_control': 'Version control for code, data, and models',
'automated_testing': 'Automated testing for models',
'continuous_integration': 'CI/CD for ML pipelines',
'monitoring': 'Comprehensive monitoring and alerting'
}
9. Implementation Roadmap
Phase 1: Foundation (Weeks 1-4)
- Feature store implementation
- Basic training pipeline
- Model registry
Phase 2: Advanced Features (Weeks 5-8)
- Real-time feature computation
- Advanced training pipeline
- Model serving layer
Phase 3: MLOps (Weeks 9-12)
- Monitoring and alerting
- A/B testing framework
- Governance and compliance
Phase 4: Optimization (Weeks 13-16)
- Cost optimization
- Performance optimization
- Advanced features
10. Summary and Key Takeaways
Architecture Recap
- Feature Store: Centralized feature management
- Training Pipeline: Automated model training
- Model Registry: Version control for models
- Model Serving: Low-latency inference
- Monitoring: Comprehensive observability
Key Metrics
- Developer Productivity: Time from idea to production
- Model Performance: Accuracy, latency, throughput
- Cost Efficiency: Cost per prediction
- Reliability: Uptime, error rate
Common Interview Mistakes
- Not discussing feature store design
- Ignoring model versioning
- Forgetting about monitoring
- Not considering cost optimization
βΉοΈ
Final Interview Tip: Emphasize the end-to-end lifecycle. Discuss how you'd enable data scientists to be productive while ensuring production reliability. Show understanding of both ML techniques and platform engineering.
Further Reading
- "Feature Store for ML" (Feast)
- "ML Platform at Netflix" (Netflix Tech Blog)
- "Michelangelo: ML at Uber" (Uber Engineering)
- "MLOps: Continuous Delivery for ML" (Google)
- "Building ML Platforms" (O'Reilly)