πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Production MLOps Pipeline (Kubeflow + Seldon + Prometheus)

AI/ML ProjectsMLOps⭐ Premium

Advertisement

Production MLOps Pipeline

Kubeflow + Seldon + Prometheus | Enterprise ML Infrastructure

Expert20+ HoursKubernetes Required

Project Overview

Problem Statement

Deploying ML models to production requires more than just model code. Teams need CI/CD pipelines for models, automated testing, canary deployments, rollback mechanisms, and infrastructure monitoring. This project builds a complete MLOps platform using Kubernetes-native tools.

Objectives

  • Build end-to-end ML pipeline with Kubeflow Pipelines
  • Deploy models with Seldon Core for A/B testing and canary releases
  • Implement model versioning and registry
  • Create CI/CD pipelines for ML with GitHub Actions
  • Set up comprehensive monitoring with Prometheus + Grafana
ComponentTechnology
Pipeline OrchestrationKubeflow Pipelines
Model ServingSeldon Core + Triton
Container OrchestrationKubernetes (EKS/GKE)
CI/CDGitHub Actions + ArgoCD
Model RegistryMLflow + S3
MonitoringPrometheus + Grafana
Feature StoreFeast on Kubernetes
SecretsHashiCorp Vault

Architecture Diagram

Architecture Diagram
+-------------------------------------------------------------------+
|              Production MLOps Architecture                        |
+-------------------------------------------------------------------+
|  +--------------+    +--------------+    +------------------+     |
|  | Git Push     |--->| GitHub       |--->| Kubeflow         |     |
|  | (Code/Data)  |    | Actions CI   |    | Pipeline Run     |     |
|  +--------------+    +--------------+    +--------+---------+     |
|                                                  |               |
|                                                  v               |
|  +--------------+    +--------------+    +------------------+     |
|  |  Model       |--->| Seldon Core  |--->| Canary Deploy    |     |
|  |  Registry    |    | Serving      |    | (5% -> 25% -> 100%)|   |
|  +--------------+    +--------------+    +------------------+     |
|        |                                                   |     |
|        v                                                   v     |
|  +--------------+    +--------------+    +------------------+     |
|  |  Prometheus  |--->|  Grafana     |--->|  AlertManager    |     |
|  |  Metrics     |    |  Dashboard   |    |  (PagerDuty)     |     |
|  +--------------+    +--------------+    +------------------+     |
+-------------------------------------------------------------------+

Step-by-Step Implementation

Step 1: Kubernetes Cluster Setup

# Install required tools
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
curl https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash

# Create cluster (example with kind for local dev)
kind create cluster --name ml-cluster --config kind-config.yaml

# Install Kubeflow Pipelines
export PIPELINE_VERSION=2.0.0
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/dev?ref=$PIPELINE_VERSION"

# Install Seldon Core
kubectl create namespace seldon-system || true
helm install seldon-core seldon-core-operator -n seldon-system \
  --set usageMetrics.enabled=true \
  --set istio.enabled=true

# Install Prometheus + Grafana
helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm install monitoring prometheus-community/kube-prometheus-stack \
  --namespace monitoring --create-namespace

Step 2: Kubeflow Pipeline Definition

Define an ML training pipeline as a Kubeflow Pipeline with reusable components.

# pipelines/training_pipeline.py
from kfp import dsl
from kfp.components import load_component_from_file


@dsl.component(base_image="python:3.10-slim")
def preprocess_data(input_path: str, output_path: str):
    import pandas as pd
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler
    import joblib
    import os

    df = pd.read_csv(input_path)
    scaler = StandardScaler()
    feature_cols = [c for c in df.columns if c != "target"]
    df[feature_cols] = scaler.fit_transform(df[feature_cols])
    train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)

    os.makedirs(output_path, exist_ok=True)
    train_df.to_csv(f"{output_path}/train.csv", index=False)
    test_df.to_csv(f"{output_path}/test.csv", index=False)
    joblib.dump(scaler, f"{output_path}/scaler.joblib")


@dsl.component(base_image="python:3.10-slim")
def train_model(data_path: str, model_path: str, params: dict):
    import pandas as pd
    import lightgbm as lgb
    import mlflow
    import json
    import os

    train_df = pd.read_csv(f"{data_path}/train.csv")
    X_train = train_df.drop("target", axis=1)
    y_train = train_df["target"]

    mlflow.set_experiment("production-training")
    with mlflow.start_run():
        mlflow.log_params(params)

        train_data = lgb.Dataset(X_train, label=y_train)
        model = lgb.train(
            params, train_data, num_boost_round=500,
            valid_sets=[train_data], callbacks=[lgb.log_evaluation(50)],
        )

        mlflow.lightgbm.log_model(model, "model")
        os.makedirs(model_path, exist_ok=True)
        model.save_model(f"{model_path}/model.txt")
        mlflow.log_artifact(f"{model_path}/model.txt")


@dsl.component(base_image="python:3.10-slim")
def evaluate_model(data_path: str, model_path: str, metrics_path: str):
    import pandas as pd
    import lightgbm as lgb
    from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
    import json
    import os

    test_df = pd.read_csv(f"{data_path}/test.csv")
    X_test = test_df.drop("target", axis=1)
    y_test = test_df["target"]

    model = lgb.Booster(model_file=f"{model_path}/model.txt")
    y_pred = (model.predict(X_test) > 0.5).astype(int)

    metrics = {
        "accuracy": float(accuracy_score(y_test, y_pred)),
        "f1": float(f1_score(y_test, y_pred)),
        "auc": float(roc_auc_score(y_test, model.predict(X_test))),
    }

    os.makedirs(metrics_path, exist_ok=True)
    with open(f"{metrics_path}/metrics.json", "w") as f:
        json.dump(metrics, f, indent=2)
    print(f"Metrics: {metrics}")


@dsl.component(base_image="python:3.10-slim")
def validate_model(metrics_path: str, threshold: float = 0.85):
    import json
    with open(f"{metrics_path}/metrics.json") as f:
        metrics = json.load(f)
    assert metrics["accuracy"] >= threshold, f"Model accuracy {metrics['accuracy']} below threshold {threshold}"
    print(f"Model validation passed: accuracy={metrics['accuracy']:.4f}")


@dsl.pipeline(
    name="training-pipeline",
    description="End-to-end ML training pipeline",
)
def training_pipeline(
    input_path: str = "gs://ml-data/input/",
    output_path: str = "gs://ml-data/processed/",
    model_path: str = "gs://ml-models/production/",
    metrics_path: str = "gs://ml-metrics/",
):
    preprocess_task = preprocess_data(input_path=input_path, output_path=output_path)

    train_task = train_model(
        data_path=preprocess_task.outputs["output_path"],
        model_path=model_path,
        params={"num_leaves": 31, "learning_rate": 0.05, "objective": "binary"},
    )

    eval_task = evaluate_model(
        data_path=preprocess_task.outputs["output_path"],
        model_path=train_task.outputs["model_path"],
        metrics_path=metrics_path,
    )

    validate_task = validate_model(metrics_path=eval_task.outputs["metrics_path"])

    # Set dependency
    validate_task.after(train_task)

Step 3: Seldon Core Model Deployment

Deploy models with Seldon Core for A/B testing, canary releases, and monitoring.

# seldon/model.py
import numpy as np
import lightgbm as lgb
import logging
from typing import Dict, List

logger = logging.getLogger(__name__)


class SeldonModel:
    def __init__(self):
        self.model = None
        self.scaler = None

    def load(self, model_path: str):
        self.model = lgb.Booster(model_file=f"{model_path}/model.txt")
        logger.info(f"Loaded model from {model_path}")

    def predict(self, X: np.ndarray, features_names: List[str] = None) -> np.ndarray:
        """Seldon predict interface."""
        predictions = self.model.predict(X)
        return (predictions > 0.5).astype(int).reshape(-1, 1)

    def predict_proba(self, X: np.ndarray, features_names: List[str] = None) -> np.ndarray:
        """Return prediction probabilities."""
        proba = self.model.predict(X)
        return np.column_stack([1 - proba, proba])

    def feedback(self, feedback_request) -> bool:
        """Handle feedback for online learning."""
        logger.info(f"Received feedback: {feedback_request}")
        return True

Seldon Deployment YAML

# seldon/deployment.yaml
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
  name: ml-model-production
  namespace: ml-serving
spec:
  predictors:
    - name: stable
      replicas: 3
      traffic: 90
      graph:
        name: model
        implementation: UNKNOWN_IMPLEMENTATION
        modelUri: gs://ml-models/production/v1.0.0
        children: []
      componentSpecs:
        - spec:
            containers:
              - name: model
                image: ml-model:latest
                resources:
                  requests:
                    cpu: "1"
                    memory: "2Gi"
                  limits:
                    cpu: "2"
                    memory: "4Gi"
    - name: canary
      replicas: 1
      traffic: 10
      graph:
        name: model
        implementation: UNKNOWN_IMPLEMENTATION
        modelUri: gs://ml-models/production/v1.1.0
        children: []
      componentSpecs:
        - spec:
            containers:
              - name: model
                image: ml-model:canary

Step 4: CI/CD Pipeline with GitHub Actions

# .github/workflows/ml-pipeline.yaml
name: ML Pipeline CI/CD

on:
  push:
    branches: [main]
    paths:
      - "src/**"
      - "pipelines/**"
      - "seldon/**"

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.10"
      - run: pip install -r requirements.txt
      - run: pytest tests/ -v --cov=src

  build-and-push:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: docker/build-push-action@v5
        with:
          push: true
          tags: ghcr.io/${{ github.repository }}:${{ github.sha }}

  deploy-staging:
    needs: build-and-push
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - run: kubectl apply -f seldon/deployment.yaml -n staging
      - run: kubectl rollout status seldondeployment/ml-model-staging -n staging

  deploy-production:
    needs: deploy-staging
    runs-on: ubuntu-latest
    environment: production
    steps:
      - uses: actions/checkout@v4
      - run: kubectl apply -f seldon/deployment.yaml -n production
      - run: kubectl rollout status seldondeployment/ml-model-production -n production

Step 5: Prometheus Monitoring Rules

# monitoring/prometheus-rules.yaml
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
  name: ml-model-alerts
  namespace: monitoring
spec:
  groups:
    - name: ml-model
      rules:
        - alert: HighPredictionLatency
          expr: histogram_quantile(0.99, rate(seldon_prediction_duration_seconds_bucket[5m])) > 0.5
          for: 5m
          labels:
            severity: warning
          annotations:
            summary: "Model prediction latency is high"
            description: "P99 latency is {{ $value }}s (threshold: 0.5s)"

        - alert: HighErrorRate
          expr: rate(seldon_prediction_errors_total[5m]) / rate(seldon_predictions_total[5m]) > 0.05
          for: 2m
          labels:
            severity: critical
          annotations:
            summary: "Model error rate exceeds 5%"

        - alert: LowAccuracy
          expr: ml_model_accuracy < 0.85
          for: 30m
          labels:
            severity: warning
          annotations:
            summary: "Model accuracy has dropped below threshold"

        - alert: DataDriftDetected
          expr: ml_data_drift_score > 0.1
          for: 15m
          labels:
            severity: warning
          annotations:
            summary: "Data drift detected in production features"

Step 6: Feature Store with Feast

# feature_store/feature_definitions.py
from feast import FeatureStore, Entity, Feature, ValueType
from feast import FileSource, FeatureView
from datetime import timedelta
import pandas as pd


# Define entities
user_entity = Entity(name="user_id", value_type=ValueType.INT64, description="User ID")
product_entity = Entity(name="product_id", value_type=ValueType.INT64, description="Product ID")


# Define feature views
user_features = FeatureView(
    name="user_features",
    entities=["user_id"],
    ttl=timedelta(days=1),
    features=[
        Feature(name="purchase_count_7d", value_type=ValueType.INT64),
        Feature(name="avg_order_value", value_type=ValueType.FLOAT),
        Feature(name="last_purchase_days_ago", value_type=ValueType.INT64),
    ],
    online=True,
    source=FileSource(path="data/user_features.parquet", timestamp_field="event_timestamp"),
)

product_features = FeatureView(
    name="product_features",
    entities=["product_id"],
    ttl=timedelta(days=7),
    features=[
        Feature(name="price", value_type=ValueType.FLOAT),
        Feature(name="category_id", value_type=ValueType.INT64),
        Feature(name="avg_rating", value_type=ValueType.FLOAT),
        Feature(name="review_count", value_type=ValueType.INT64),
    ],
    online=True,
    source=FileSource(path="data/product_features.parquet", timestamp_field="event_timestamp"),
)


# Feature retrieval
store = FeatureStore(repo_path=".")

training_df = store.get_historical_features(
    entity_df=entity_df,
    features=["user_features:purchase_count_7d", "product_features:price"],
).to_df()

online_features = store.get_online_features(
    features=["user_features:purchase_count_7d", "product_features:price"],
    entity_rows=[{"user_id": 1, "product_id": 42}],
).to_dict()

ℹ️

Implement progressive rollouts for model deployments: start with 5% traffic, monitor for 30 minutes, then gradually increase to 100%. Set automatic rollback triggers on error rate spikes.

πŸ’‘

Use ArgoCD for GitOps-based deployment. Store all Kubernetes manifests in Git and let ArgoCD sync the desired state. This provides audit trails and easy rollbacks.

Performance Metrics

MetricTargetDescription
Deployment FrequencyDailyModel deployments per day
Lead Time< 1hrCode commit to production
Rollback Time< 5minTime to rollback bad deploy
Pipeline Success Rate> 95%CI/CD pipeline reliability
Model Serving Latency< 100msP99 inference latency
Infrastructure CostOptimizedPer-prediction cost

Interview Talking Points

  1. Kubeflow Pipelines: Provides reproducible, version-controlled ML pipelines on Kubernetes. Each pipeline run creates immutable artifacts.
  2. Seldon Core: Kubernetes-native model serving with built-in A/B testing, canary deployments, and explainability.
  3. Feature Store: Feast provides a central repository for feature definitions, ensuring consistency between training and serving.
  4. GitOps: ArgoCD + GitHub Actions provides automated, auditable deployments with automatic rollback on failures.
  5. Monitoring: Prometheus + Grafana provides infrastructure and model-specific monitoring with configurable alerting.
  6. Cost Optimization: Spot instances for training, auto-scaling for serving, and resource quotas for cost control.

⚠️

Kubeflow has a steep learning curve. Start with simple pipelines and gradually add complexity. Ensure your team is comfortable with Kubernetes before adopting the full platform.

ℹ️

For teams not ready for Kubernetes, consider managed ML platforms like AWS SageMaker, GCP Vertex AI, or Azure ML which provide similar capabilities with less operational overhead.

Advertisement