MLOps Architecture Overview
Difficulty: Senior Level | Companies: Google, Meta, Netflix, Uber, Stripe
What is MLOps?
MLOps (Machine Learning Operations) is a set of practices that combines Machine Learning, DevOps, and Data Engineering to deploy and maintain ML systems in production reliably and efficiently.
βΉοΈ
MLOps is not just about toolingβit's about people, processes, and technology working together to deliver ML value at scale.
MLOps Maturity Model
Level 0: Manual Process
No pipeline automation. Data scientists build models in notebooks and hand off to engineers.
Level 1: ML Pipeline Automation
Automated training pipelines with experiment tracking and model versioning.
Level 2: CI/CD for ML
Full CI/CD with automated testing, deployment, and monitoring.
Architecture Components
# mlops_architecture.py
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum
class PipelineStage(Enum):
DATA_INGESTION = "data_ingestion"
DATA_VALIDATION = "data_validation"
FEATURE_ENGINEERING = "feature_engineering"
TRAINING = "training"
EVALUATION = "evaluation"
REGISTRATION = "registration"
DEPLOYMENT = "deployment"
MONITORING = "monitoring"
@dataclass
class MLOpsConfig:
project_name: str
cloud_provider: str
experiment_tracker: str
model_registry: str
feature_store: str
orchestrator: str
monitoring_tool: str
ci_cd_platform: str
@dataclass
class PipelineNode:
stage: PipelineStage
name: str
dependencies: List[str] = field(default_factory=list)
config: Dict = field(default_factory=dict)
retry_count: int = 3
timeout_seconds: int = 3600
class MLOpsArchitecture:
def __init__(self, config: MLOpsConfig):
self.config = config
self.nodes: List[PipelineNode] = []
self._setup_default_pipeline()
def _setup_default_pipeline(self):
default_nodes = [
PipelineNode(
stage=PipelineStage.DATA_INGESTION,
name="ingest_raw_data",
config={"source": "s3://data-lake/raw", "format": "parquet"}
),
PipelineNode(
stage=PipelineStage.DATA_VALIDATION,
name="validate_data",
dependencies=["ingest_raw_data"],
config={"schema_path": "schemas/input.json"}
),
PipelineNode(
stage=PipelineStage.FEATURE_ENGINEERING,
name="compute_features",
dependencies=["validate_data"],
config={"feature_store": self.config.feature_store}
),
PipelineNode(
stage=PipelineStage.TRAINING,
name="train_model",
dependencies=["compute_features"],
config={"framework": "pytorch", "gpu": True}
),
PipelineNode(
stage=PipelineStage.EVALUATION,
name="evaluate_model",
dependencies=["train_model"],
config={"metrics": ["accuracy", "f1", "auc"]}
),
PipelineNode(
stage=PipelineStage.REGISTRATION,
name="register_model",
dependencies=["evaluate_model"],
config={"registry": self.config.model_registry}
),
PipelineNode(
stage=PipelineStage.DEPLOYMENT,
name="deploy_model",
dependencies=["register_model"],
config={"target": "kubernetes", "replicas": 3}
),
PipelineNode(
stage=PipelineStage.MONITORING,
name="monitor_model",
dependencies=["deploy_model"],
config={"alert_threshold": 0.05}
),
]
self.nodes.extend(default_nodes)
def get_execution_order(self) -> List[str]:
visited = set()
order = []
def dfs(node_name: str):
if node_name in visited:
return
visited.add(node_name)
for node in self.nodes:
if node.name == node_name:
for dep in node.dependencies:
dfs(dep)
order.append(node.name)
for node in self.nodes:
dfs(node.name)
return order
def validate_dependencies(self) -> List[str]:
errors = []
node_names = {n.name for n in self.nodes}
for node in self.nodes:
for dep in node.dependencies:
if dep not in node_names:
errors.append(f"Node '{node.name}' depends on unknown node '{dep}'")
return errors
# Usage
config = MLOpsConfig(
project_name="recommendation-engine",
cloud_provider="aws",
experiment_tracker="mlflow",
model_registry="mlflow",
feature_store="feast",
orchestrator="kubeflow",
monitoring_tool="evidently",
ci_cd_platform="github_actions"
)
architecture = MLOpsArchitecture(config)
print("Execution order:", architecture.get_execution_order())
print("Validation errors:", architecture.validate_dependencies())
System Design Patterns
# mlops-stack.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: mlops-stack-config
data:
PROJECT_NAME: "ml-platform"
CLOUD_PROVIDER: "gcp"
REGION: "us-central1"
CLUSTER_NAME: "ml-prod-cluster"
---
apiVersion: v1
kind: Secret
metadata:
name: mlops-secrets
type: Opaque
data:
MLFLOW_TRACKING_URI: <base64-encoded>
FEAST_REDIS_URL: <base64-encoded>
DOCKER_REGISTRY_TOKEN: <base64-encoded>
---
apiVersion: v1
kind: Namespace
metadata:
name: mlops-production
labels:
environment: production
team: ml-platform
Key Metrics to Track
# metrics_dashboard.py
from dataclasses import dataclass
from typing import Dict, List
from datetime import datetime
@dataclass
class MLSystemMetrics:
model_accuracy: float
prediction_latency_ms: float
throughput_rps: float
error_rate: float
data_drift_score: float
feature_drift_score: float
model_version: str
timestamp: datetime
class MetricsCollector:
def __init__(self):
self.metrics_history: List[MLSystemMetrics] = []
def record(self, metrics: MLSystemMetrics):
self.metrics_history.append(metrics)
self._check_alerts(metrics)
def _check_alerts(self, metrics: MLSystemMetrics):
alerts = []
if metrics.model_accuracy < 0.85:
alerts.append(f"LOW_ACCURACY: {metrics.model_accuracy}")
if metrics.prediction_latency_ms > 100:
alerts.append(f"HIGH_LATENCY: {metrics.prediction_latency_ms}ms")
if metrics.error_rate > 0.01:
alerts.append(f"HIGH_ERROR_RATE: {metrics.error_rate}")
if metrics.data_drift_score > 0.3:
alerts.append(f"DATA_DRIFT: {metrics.data_drift_score}")
for alert in alerts:
self._send_alert(alert)
def _send_alert(self, alert: str):
print(f"[ALERT] {alert}")
def get_summary(self) -> Dict:
if not self.metrics_history:
return {}
latest = self.metrics_history[-1]
return {
"current_accuracy": latest.model_accuracy,
"avg_latency": sum(m.prediction_latency_ms for m in self.metrics_history) / len(self.metrics_history),
"total_predictions": len(self.metrics_history),
"drift_status": "OK" if latest.data_drift_score < 0.3 else "DRIFT_DETECTED"
}
Follow-Up Questions
- How would you design an MLOps architecture for a real-time recommendation system?
- What are the trade-offs between monolithic vs. microservices ML architectures?
- How does MLOps maturity level affect team structure and responsibilities?
- What security considerations are critical in production ML systems?