MLOps & Experiment Tracking
đĄ MLOps bridges machine learning development and production operations. This lesson covers experiment tracking, model versioning, reproducibility, and CI/CD pipelines for ML systems.
1. Why MLOps?
The ML Lifecycle Problem
Traditional ML Workflow
âââââââââââââââââââââ
Data â Training â Evaluation â Deployment â Monitoring
â â â â â
âŧ âŧ âŧ âŧ âŧ
Messy Notebook Manual Manual No
files soup evals deploy feedback
MLOps Solution
MLOps Pipeline
âââââââââââââ
Data Versioning â Experiment Tracking â Model Registry â CI/CD
â â â â
âŧ âŧ âŧ âŧ
DVC MLflow/W&B Model Store Automated
Testing & Deploy
DfMLOps
MLOps (Machine Learning Operations) is a set of practices that combines Machine Learning, DevOps, and Data Engineering to deploy and maintain ML systems in production reliably and efficiently. It addresses the full lifecycle from data preparation to model monitoring.
âšī¸ Why Traditional DevOps Falls Short
ML systems have unique challenges beyond traditional software: data dependencies, model drift, non-deterministic training, and the need for continuous evaluation. MLOps extends DevOps practices to handle these ML-specific concerns.
2. Experiment Tracking
Key Metadata to Track
| Category | What to Track | Examples |
|---|---|---|
| Code | Git commit, branch, diff | a1b2c3d, main |
| Data | Dataset version, size, hash | data_v2.parquet, 1.2M rows |
| Parameters | Hyperparameters, config | lr=0.001, epochs=50 |
| Metrics | Training/validation metrics | acc=0.95, loss=0.12 |
| Artifacts | Model weights, plots | model.pth, confusion.png |
| Environment | Python version, packages | torch=2.0, cuda=11.8 |
MLflow Tracking
DfMLflow
An open-source platform for managing the ML lifecycle, including experimentation, reproducibility, and deployment.
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
# Load data
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
# Set tracking URI (local or remote server)
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("iris-classification")
# Start a run
with mlflow.start_run(run_name="rf-baseline"):
# Parameters
params = {
"n_estimators": 100,
"max_depth": 5,
"min_samples_split": 2,
"random_state": 42,
}
mlflow.log_params(params)
# Train model
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"f1_macro": f1_score(y_test, y_pred, average="macro"),
}
mlflow.log_metrics(metrics)
# Log model artifact
mlflow.sklearn.log_model(model, "model")
# Log data version
mlflow.log_artifact("data.parquet")
print(f"Run ID: {mlflow.active_run().info.run_id}")
print(f"Metrics: {metrics}")
đĄ Experiment Tracking Best Practices
Always track: (1) code version via Git, (2) hyperparameters, (3) training and validation metrics, (4) data version, (5) environment details. This ensures full reproducibility of any experiment.
MLflow Projects
# MLproject
name: iris-classification
conda_env: environment.yml
entry_points:
main:
parameters:
n_estimators: {type: int, default: 100}
max_depth: {type: int, default: 5}
command: "python train.py --n_estimators {n_estimators} --max_depth {max_depth}"
# Run project
mlflow run . -P n_estimators=200 -P max_depth=10
# Run from git
mlflow run https://github.com/user/repo -P n_estimators=200
3. Weights & Biases
W&B Setup
import wandb
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
# Initialize run
wandb.init(
project="iris-classification",
name="rf-experiment-1",
config={
"n_estimators": 100,
"max_depth": 5,
"learning_rate": 0.01,
}
)
# Access config
config = wandb.config
# Load data
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
# Train
model = RandomForestClassifier(
n_estimators=config.n_estimators,
max_depth=config.max_depth,
)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
acc = accuracy_score(y_test, y_pred)
# Log metrics
wandb.log({"accuracy": acc, "f1": f1_score})
# Log model
wandb.sklearn.log_model(model, "random-forest")
# Finish run
wandb.finish()
W&B Sweeps (Hyperparameter Search)
sweep_config = {
"method": "bayes",
"metric": {"name": "accuracy", "goal": "maximize"},
"parameters": {
"n_estimators": {"values": [50, 100, 200, 300]},
"max_depth": {"values": [3, 5, 10, 15, None]},
"min_samples_split": {"values": [2, 5, 10]},
},
}
sweep_id = wandb.sweep(sweep_config, project="iris-classification")
def train():
wandb.init()
config = wandb.config
model = RandomForestClassifier(
n_estimators=config.n_estimators,
max_depth=config.max_depth,
min_samples_split=config.min_samples_split,
)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
wandb.log({"accuracy": accuracy})
wandb.finish()
# Run sweep
wandb.agent(sweep_id, function=train, count=20)
âšī¸ Bayesian vs Grid Search
W&B Sweeps support Bayesian optimization, which uses past results to choose the next hyperparameters to try. This is more efficient than grid search, especially when the search space is large or some parameters are more important than others.
4. Data Versioning
DVC (Data Version Control)
# Initialize DVC
dvc init
# Track data files
dvc add data/training_data.parquet
# Stage files
dvc stage add -n prepare \
python prepare.py \
--data data/training_data.parquet
dvc stage add -n train \
python train.py \
--data data/training_data.parquet
# Run pipeline
dvc repro
# Push to remote
dvc push
# Pull data
dvc pull
DVC Pipeline
# dvc.yaml
stages:
prepare:
cmd: python prepare.py
deps:
- prepare.py
- data/raw_data.csv
outs:
- data/training_data.parquet
- data/test_data.parquet
train:
cmd: python train.py
deps:
- train.py
- data/training_data.parquet
params:
- params.json:
- n_estimators
- max_depth
outs:
- models/model.pkl
metrics:
- metrics.json:
cache: false
evaluate:
cmd: python evaluate.py
deps:
- evaluate.py
- models/model.pkl
metrics:
- metrics.json:
cache: false
# params.json
{
"n_estimators": 100,
"max_depth": 5,
"learning_rate": 0.01
}
5. Model Registry
MLflow Model Registry
import mlflow
# Register model
model_uri = "runs:/<run_id>/model"
model_name = "iris-classifier"
result = mlflow.register_model(model_uri, model_name)
# Transition model stage
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
name=model_name,
version=1,
stage="production",
)
# Load production model
model = mlflow.pyfunc.load_model(f"models:/{model_name}/production")
Model Versioning Strategies
DfModel Versioning
Tracks different versions of the same model, including metadata about training data, parameters, and performance.
| Strategy | When to Use |
|---|---|
| Version by commit | Every code change |
| Version by data | When data changes |
| Version by performance | When model improves |
| A/B testing | Compare versions in production |
âšī¸ Model Stages
MLflow uses stages: None â Staging â Production â Archived. This lifecycle allows safe rollout with the ability to rollback to previous versions if issues arise.
6. Reproducibility
Environment Management
# environment.yml
name: ml-project
channels:
- defaults
- conda-forge
dependencies:
- python=3.9
- scikit-learn=1.2
- pandas=1.5
- numpy=1.23
- pip:
- mlflow==2.0
- wandb==0.13
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY environment.yml .
RUN conda env create -f environment.yml && \
conda clean -afy
COPY . .
CMD ["python", "train.py"]
Random Seeds
import torch
import numpy as np
import random
def set_seed(seed=42):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
# Use in training
set_seed(42)
model = RandomForestClassifier(random_state=42)
ThReproducibility Requirements
For full reproducibility of an ML experiment, you must fix: (1) random seeds, (2) library versions, (3) hardware (CPU/GPU), (4) data version, (5) code version. Even with all fixes, floating-point non-determinism may cause minor numerical differences across platforms.
7. CI/CD for ML
GitHub Actions Workflow
# .github/workflows/ml-pipeline.yml
name: ML Pipeline
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest
- name: Run tests
run: pytest tests/
- name: Lint
run: flake8 src/
train:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Train model
run: python train.py
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
- name: Evaluate model
run: python evaluate.py
- name: Deploy to staging
if: success()
run: python deploy.py --env staging
ML Testing
import pytest
import numpy as np
from model import load_model
@pytest.fixture
def sample_data():
return np.random.randn(100, 10)
@pytest.fixture
def trained_model():
return load_model("models/production.pkl")
def test_model_output_shape(trained_model, sample_data):
predictions = trained_model.predict(sample_data)
assert predictions.shape == (100,)
def test_model_predictions_range(trained_model, sample_data):
predictions = trained_model.predict(sample_data)
assert predictions.min() >= 0
assert predictions.max() <= 1
def test_model_performance(trained_model, test_data, test_labels):
from sklearn.metrics import accuracy_score
predictions = trained_model.predict(test_data)
accuracy = accuracy_score(test_labels, predictions)
assert accuracy > 0.85, f"Model accuracy {accuracy} below threshold"
def test_model_inference_time(trained_model, sample_data):
import time
start = time.time()
for _ in range(100):
trained_model.predict(sample_data[:1])
elapsed = time.time() - start
assert elapsed / 100 < 0.1, "Inference too slow"
đĄ ML Testing Strategy
ML testing goes beyond unit tests. Include: (1) data validation tests, (2) model performance threshold tests, (3) inference latency tests, (4) input/output schema tests, (5) fairness/bias tests. Each catches different failure modes.
8. Key Takeaways
đSummary: MLOps & Experiment Tracking
- Experiment tracking (MLflow, W&B) logs parameters, metrics, and artifacts for reproducibility
- Data versioning (DVC) tracks dataset changes alongside code
- Model registry manages model versions, stages, and deployment
- Reproducibility requires environment management, random seeds, and version control
- CI/CD for ML automates testing, training, and deployment pipelines
- Start simple: MLflow tracking + Git â DVC â Full pipeline automation
- Track not just metrics but also data versions and environment details
- Use model stages (Staging â Production â Archived) for safe rollouts
9. Practice Exercises
Exercise 1: MLflow Experiment Tracking
# TODO: Set up MLflow locally
# Track 3+ experiments with different hyperparameters
# Compare results in MLflow UI
# Log: params, metrics, model, artifacts
Exercise 2: W&B Sweep
# TODO: Run a W&B sweep on your project
# Use: Bayesian optimization
# Compare: 20+ hyperparameter combinations
# Analyze: which parameters matter most
Exercise 3: DVC Pipeline
# TODO: Create a DVC pipeline with 3 stages
# Stages: prepare, train, evaluate
# Add: parameter tracking
# Test: reproduce pipeline with different params
Exercise 4: ML Testing
# TODO: Write tests for your ML model
# Tests: output shape, value range, performance threshold, inference time
# Add: data validation tests (schema, missing values)
# Integrate: into CI/CD pipeline