Production MLOps Pipeline
Kubeflow + Seldon + Prometheus | Enterprise ML Infrastructure
Project Overview
Problem Statement
Deploying ML models to production requires more than just model code. Teams need CI/CD pipelines for models, automated testing, canary deployments, rollback mechanisms, and infrastructure monitoring. This project builds a complete MLOps platform using Kubernetes-native tools.
Objectives
- Build end-to-end ML pipeline with Kubeflow Pipelines
- Deploy models with Seldon Core for A/B testing and canary releases
- Implement model versioning and registry
- Create CI/CD pipelines for ML with GitHub Actions
- Set up comprehensive monitoring with Prometheus + Grafana
| Component | Technology |
|---|---|
| Pipeline Orchestration | Kubeflow Pipelines |
| Model Serving | Seldon Core + Triton |
| Container Orchestration | Kubernetes (EKS/GKE) |
| CI/CD | GitHub Actions + ArgoCD |
| Model Registry | MLflow + S3 |
| Monitoring | Prometheus + Grafana |
| Feature Store | Feast on Kubernetes |
| Secrets | HashiCorp Vault |
Architecture Diagram
+-------------------------------------------------------------------+
| Production MLOps Architecture |
+-------------------------------------------------------------------+
| +--------------+ +--------------+ +------------------+ |
| | Git Push |--->| GitHub |--->| Kubeflow | |
| | (Code/Data) | | Actions CI | | Pipeline Run | |
| +--------------+ +--------------+ +--------+---------+ |
| | |
| v |
| +--------------+ +--------------+ +------------------+ |
| | Model |--->| Seldon Core |--->| Canary Deploy | |
| | Registry | | Serving | | (5% -> 25% -> 100%)| |
| +--------------+ +--------------+ +------------------+ |
| | | |
| v v |
| +--------------+ +--------------+ +------------------+ |
| | Prometheus |--->| Grafana |--->| AlertManager | |
| | Metrics | | Dashboard | | (PagerDuty) | |
| +--------------+ +--------------+ +------------------+ |
+-------------------------------------------------------------------+
Step-by-Step Implementation
Step 1: Kubernetes Cluster Setup
# Install required tools
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
curl https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash
# Create cluster (example with kind for local dev)
kind create cluster --name ml-cluster --config kind-config.yaml
# Install Kubeflow Pipelines
export PIPELINE_VERSION=2.0.0
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/dev?ref=$PIPELINE_VERSION"
# Install Seldon Core
kubectl create namespace seldon-system || true
helm install seldon-core seldon-core-operator -n seldon-system \
--set usageMetrics.enabled=true \
--set istio.enabled=true
# Install Prometheus + Grafana
helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm install monitoring prometheus-community/kube-prometheus-stack \
--namespace monitoring --create-namespace
Step 2: Kubeflow Pipeline Definition
Define an ML training pipeline as a Kubeflow Pipeline with reusable components.
# pipelines/training_pipeline.py
from kfp import dsl
from kfp.components import load_component_from_file
@dsl.component(base_image="python:3.10-slim")
def preprocess_data(input_path: str, output_path: str):
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import joblib
import os
df = pd.read_csv(input_path)
scaler = StandardScaler()
feature_cols = [c for c in df.columns if c != "target"]
df[feature_cols] = scaler.fit_transform(df[feature_cols])
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)
os.makedirs(output_path, exist_ok=True)
train_df.to_csv(f"{output_path}/train.csv", index=False)
test_df.to_csv(f"{output_path}/test.csv", index=False)
joblib.dump(scaler, f"{output_path}/scaler.joblib")
@dsl.component(base_image="python:3.10-slim")
def train_model(data_path: str, model_path: str, params: dict):
import pandas as pd
import lightgbm as lgb
import mlflow
import json
import os
train_df = pd.read_csv(f"{data_path}/train.csv")
X_train = train_df.drop("target", axis=1)
y_train = train_df["target"]
mlflow.set_experiment("production-training")
with mlflow.start_run():
mlflow.log_params(params)
train_data = lgb.Dataset(X_train, label=y_train)
model = lgb.train(
params, train_data, num_boost_round=500,
valid_sets=[train_data], callbacks=[lgb.log_evaluation(50)],
)
mlflow.lightgbm.log_model(model, "model")
os.makedirs(model_path, exist_ok=True)
model.save_model(f"{model_path}/model.txt")
mlflow.log_artifact(f"{model_path}/model.txt")
@dsl.component(base_image="python:3.10-slim")
def evaluate_model(data_path: str, model_path: str, metrics_path: str):
import pandas as pd
import lightgbm as lgb
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
import json
import os
test_df = pd.read_csv(f"{data_path}/test.csv")
X_test = test_df.drop("target", axis=1)
y_test = test_df["target"]
model = lgb.Booster(model_file=f"{model_path}/model.txt")
y_pred = (model.predict(X_test) > 0.5).astype(int)
metrics = {
"accuracy": float(accuracy_score(y_test, y_pred)),
"f1": float(f1_score(y_test, y_pred)),
"auc": float(roc_auc_score(y_test, model.predict(X_test))),
}
os.makedirs(metrics_path, exist_ok=True)
with open(f"{metrics_path}/metrics.json", "w") as f:
json.dump(metrics, f, indent=2)
print(f"Metrics: {metrics}")
@dsl.component(base_image="python:3.10-slim")
def validate_model(metrics_path: str, threshold: float = 0.85):
import json
with open(f"{metrics_path}/metrics.json") as f:
metrics = json.load(f)
assert metrics["accuracy"] >= threshold, f"Model accuracy {metrics['accuracy']} below threshold {threshold}"
print(f"Model validation passed: accuracy={metrics['accuracy']:.4f}")
@dsl.pipeline(
name="training-pipeline",
description="End-to-end ML training pipeline",
)
def training_pipeline(
input_path: str = "gs://ml-data/input/",
output_path: str = "gs://ml-data/processed/",
model_path: str = "gs://ml-models/production/",
metrics_path: str = "gs://ml-metrics/",
):
preprocess_task = preprocess_data(input_path=input_path, output_path=output_path)
train_task = train_model(
data_path=preprocess_task.outputs["output_path"],
model_path=model_path,
params={"num_leaves": 31, "learning_rate": 0.05, "objective": "binary"},
)
eval_task = evaluate_model(
data_path=preprocess_task.outputs["output_path"],
model_path=train_task.outputs["model_path"],
metrics_path=metrics_path,
)
validate_task = validate_model(metrics_path=eval_task.outputs["metrics_path"])
# Set dependency
validate_task.after(train_task)
Step 3: Seldon Core Model Deployment
Deploy models with Seldon Core for A/B testing, canary releases, and monitoring.
# seldon/model.py
import numpy as np
import lightgbm as lgb
import logging
from typing import Dict, List
logger = logging.getLogger(__name__)
class SeldonModel:
def __init__(self):
self.model = None
self.scaler = None
def load(self, model_path: str):
self.model = lgb.Booster(model_file=f"{model_path}/model.txt")
logger.info(f"Loaded model from {model_path}")
def predict(self, X: np.ndarray, features_names: List[str] = None) -> np.ndarray:
"""Seldon predict interface."""
predictions = self.model.predict(X)
return (predictions > 0.5).astype(int).reshape(-1, 1)
def predict_proba(self, X: np.ndarray, features_names: List[str] = None) -> np.ndarray:
"""Return prediction probabilities."""
proba = self.model.predict(X)
return np.column_stack([1 - proba, proba])
def feedback(self, feedback_request) -> bool:
"""Handle feedback for online learning."""
logger.info(f"Received feedback: {feedback_request}")
return True
Seldon Deployment YAML
# seldon/deployment.yaml
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
name: ml-model-production
namespace: ml-serving
spec:
predictors:
- name: stable
replicas: 3
traffic: 90
graph:
name: model
implementation: UNKNOWN_IMPLEMENTATION
modelUri: gs://ml-models/production/v1.0.0
children: []
componentSpecs:
- spec:
containers:
- name: model
image: ml-model:latest
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "2"
memory: "4Gi"
- name: canary
replicas: 1
traffic: 10
graph:
name: model
implementation: UNKNOWN_IMPLEMENTATION
modelUri: gs://ml-models/production/v1.1.0
children: []
componentSpecs:
- spec:
containers:
- name: model
image: ml-model:canary
Step 4: CI/CD Pipeline with GitHub Actions
# .github/workflows/ml-pipeline.yaml
name: ML Pipeline CI/CD
on:
push:
branches: [main]
paths:
- "src/**"
- "pipelines/**"
- "seldon/**"
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.10"
- run: pip install -r requirements.txt
- run: pytest tests/ -v --cov=src
build-and-push:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: docker/build-push-action@v5
with:
push: true
tags: ghcr.io/${{ github.repository }}:${{ github.sha }}
deploy-staging:
needs: build-and-push
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: kubectl apply -f seldon/deployment.yaml -n staging
- run: kubectl rollout status seldondeployment/ml-model-staging -n staging
deploy-production:
needs: deploy-staging
runs-on: ubuntu-latest
environment: production
steps:
- uses: actions/checkout@v4
- run: kubectl apply -f seldon/deployment.yaml -n production
- run: kubectl rollout status seldondeployment/ml-model-production -n production
Step 5: Prometheus Monitoring Rules
# monitoring/prometheus-rules.yaml
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: ml-model-alerts
namespace: monitoring
spec:
groups:
- name: ml-model
rules:
- alert: HighPredictionLatency
expr: histogram_quantile(0.99, rate(seldon_prediction_duration_seconds_bucket[5m])) > 0.5
for: 5m
labels:
severity: warning
annotations:
summary: "Model prediction latency is high"
description: "P99 latency is {{ $value }}s (threshold: 0.5s)"
- alert: HighErrorRate
expr: rate(seldon_prediction_errors_total[5m]) / rate(seldon_predictions_total[5m]) > 0.05
for: 2m
labels:
severity: critical
annotations:
summary: "Model error rate exceeds 5%"
- alert: LowAccuracy
expr: ml_model_accuracy < 0.85
for: 30m
labels:
severity: warning
annotations:
summary: "Model accuracy has dropped below threshold"
- alert: DataDriftDetected
expr: ml_data_drift_score > 0.1
for: 15m
labels:
severity: warning
annotations:
summary: "Data drift detected in production features"
Step 6: Feature Store with Feast
# feature_store/feature_definitions.py
from feast import FeatureStore, Entity, Feature, ValueType
from feast import FileSource, FeatureView
from datetime import timedelta
import pandas as pd
# Define entities
user_entity = Entity(name="user_id", value_type=ValueType.INT64, description="User ID")
product_entity = Entity(name="product_id", value_type=ValueType.INT64, description="Product ID")
# Define feature views
user_features = FeatureView(
name="user_features",
entities=["user_id"],
ttl=timedelta(days=1),
features=[
Feature(name="purchase_count_7d", value_type=ValueType.INT64),
Feature(name="avg_order_value", value_type=ValueType.FLOAT),
Feature(name="last_purchase_days_ago", value_type=ValueType.INT64),
],
online=True,
source=FileSource(path="data/user_features.parquet", timestamp_field="event_timestamp"),
)
product_features = FeatureView(
name="product_features",
entities=["product_id"],
ttl=timedelta(days=7),
features=[
Feature(name="price", value_type=ValueType.FLOAT),
Feature(name="category_id", value_type=ValueType.INT64),
Feature(name="avg_rating", value_type=ValueType.FLOAT),
Feature(name="review_count", value_type=ValueType.INT64),
],
online=True,
source=FileSource(path="data/product_features.parquet", timestamp_field="event_timestamp"),
)
# Feature retrieval
store = FeatureStore(repo_path=".")
training_df = store.get_historical_features(
entity_df=entity_df,
features=["user_features:purchase_count_7d", "product_features:price"],
).to_df()
online_features = store.get_online_features(
features=["user_features:purchase_count_7d", "product_features:price"],
entity_rows=[{"user_id": 1, "product_id": 42}],
).to_dict()
βΉοΈ
Implement progressive rollouts for model deployments: start with 5% traffic, monitor for 30 minutes, then gradually increase to 100%. Set automatic rollback triggers on error rate spikes.
π‘
Use ArgoCD for GitOps-based deployment. Store all Kubernetes manifests in Git and let ArgoCD sync the desired state. This provides audit trails and easy rollbacks.
Performance Metrics
| Metric | Target | Description |
|---|---|---|
| Deployment Frequency | Daily | Model deployments per day |
| Lead Time | < 1hr | Code commit to production |
| Rollback Time | < 5min | Time to rollback bad deploy |
| Pipeline Success Rate | > 95% | CI/CD pipeline reliability |
| Model Serving Latency | < 100ms | P99 inference latency |
| Infrastructure Cost | Optimized | Per-prediction cost |
Interview Talking Points
- Kubeflow Pipelines: Provides reproducible, version-controlled ML pipelines on Kubernetes. Each pipeline run creates immutable artifacts.
- Seldon Core: Kubernetes-native model serving with built-in A/B testing, canary deployments, and explainability.
- Feature Store: Feast provides a central repository for feature definitions, ensuring consistency between training and serving.
- GitOps: ArgoCD + GitHub Actions provides automated, auditable deployments with automatic rollback on failures.
- Monitoring: Prometheus + Grafana provides infrastructure and model-specific monitoring with configurable alerting.
- Cost Optimization: Spot instances for training, auto-scaling for serving, and resource quotas for cost control.
β οΈ
Kubeflow has a steep learning curve. Start with simple pipelines and gradually add complexity. Ensure your team is comfortable with Kubernetes before adopting the full platform.
βΉοΈ
For teams not ready for Kubernetes, consider managed ML platforms like AWS SageMaker, GCP Vertex AI, or Azure ML which provide similar capabilities with less operational overhead.