Project 3: Deploy a Deep Learning Model
š” This project brings together everything you've learned ā from data preprocessing to production deployment. You'll build a complete ML system: train a deep learning model, create a REST API, containerize it, and deploy with monitoring.
1. Project Overview
System Architecture
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā Production System ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā¤
ā ā
ā āāāāāāāāāāā āāāāāāāāāāāā āāāāāāāāāāāā ā
ā ā Client āāāāāā Nginx āāāāāā FastAPI ā ā
ā ā (React) ā ā (Proxy) ā ā (API) ā ā
ā āāāāāāāāāāā āāāāāāāāāāāā āāāāāā¬āāāāāā ā
ā ā ā
ā āāāāāāāāā“āāāāāāāā ā
ā ā ā ā
ā āāāāāā“āāāāā āāāāāā“āāāāā ā
ā ā Model ā ā Redis ā ā
ā ā Server ā ā (Cache) ā ā
ā āāāāāā¬āāāāā āāāāāāāāāāā ā
ā ā ā
ā āāāāāā“āāāāā ā
ā ā Docker ā ā
ā ā (GPU) ā ā
ā āāāāāāāāāāā ā
ā ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
Tech Stack
| Component | Technology | Purpose |
|---|---|---|
| Model | PyTorch/TensorFlow | Deep learning |
| API | FastAPI | REST endpoints |
| Container | Docker | Packaging |
| Orchestration | Docker Compose | Multi-service |
| Monitoring | Prometheus + Grafana | Metrics |
| Logging | ELK Stack | Centralized logs |
DfEnd-to-End ML System
An end-to-end ML system encompasses the complete lifecycle: data ingestion, preprocessing, model training, evaluation, deployment, monitoring, and feedback loops. Production ML systems require careful consideration of latency, throughput, reliability, and observability.
2. Step 1: Train the Model
Data Pipeline
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from pathlib import Path
from PIL import Image
class ImageDataset(Dataset):
def __init__(self, root_dir, split="train"):
self.root_dir = Path(root_dir) / split
self.images = list(self.root_dir.glob("**/*.jpg"))
self.labels = [img.parent.name for img in self.images]
# Create label mapping
self.label_map = {label: i for i, label in enumerate(set(self.labels))}
# Transforms
if split == "train":
self.transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
else:
self.transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
def __len__(self):
return len(self.images)
def __getitem__(self, idx):
image = Image.open(self.images[idx]).convert("RGB")
label = self.label_map[self.labels[idx]]
return self.transform(image), label
Model Architecture
import torch.nn as nn
import torchvision.models as models
class ImageClassifier(nn.Module):
def __init__(self, num_classes, pretrained=True):
super().__init__()
self.backbone = models.resnet50(pretrained=pretrained)
# Freeze early layers
for param in list(self.backbone.parameters())[:-20]:
param.requires_grad = False
# Replace classifier
in_features = self.backbone.fc.in_features
self.backbone.fc = nn.Sequential(
nn.Dropout(0.5),
nn.Linear(in_features, 512),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(512, num_classes)
)
def forward(self, x):
return self.backbone(x)
def predict(self, x):
self.eval()
with torch.no_grad():
logits = self.forward(x)
probs = torch.softmax(logits, dim=1)
return probs
ā¹ļø Transfer Learning Strategy
Freezing early layers preserves pre-trained features (edges, textures) while allowing later layers to adapt to your specific task. This is especially effective when you have limited training data ā you need fewer parameters to train.
Training Script
import torch.optim as optim
from torch.optim.lr_scheduler import CosineAnnealingLR
import mlflow
import mlflow.pytorch
def train_model(model, train_loader, val_loader, config):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=config["lr"], weight_decay=config["weight_decay"])
scheduler = CosineAnnealingLR(optimizer, T_max=config["epochs"])
mlflow.set_experiment("image-classification")
with mlflow.start_run(run_name=config.get("run_name", "resnet50")):
mlflow.log_params(config)
best_val_acc = 0.0
for epoch in range(config["epochs"]):
# Training
model.train()
train_loss = 0.0
correct = 0
total = 0
for images, labels in train_loader:
images, labels = images.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
train_loss += loss.item()
_, predicted = outputs.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()
train_acc = 100.0 * correct / total
avg_train_loss = train_loss / len(train_loader)
# Validation
model.eval()
val_loss = 0.0
correct = 0
total = 0
with torch.no_grad():
for images, labels in val_loader:
images, labels = images.to(device), labels.to(device)
outputs = model(images)
loss = criterion(outputs, labels)
val_loss += loss.item()
_, predicted = outputs.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()
val_acc = 100.0 * correct / total
avg_val_loss = val_loss / len(val_loader)
scheduler.step()
# Log metrics
mlflow.log_metrics({
"train_loss": avg_train_loss,
"train_acc": train_acc,
"val_loss": avg_val_loss,
"val_acc": val_acc,
"lr": scheduler.get_last_lr()[0],
}, step=epoch)
print(f"Epoch {epoch+1}/{config['epochs']}: "
f"Train Loss: {avg_train_loss:.4f}, Train Acc: {train_acc:.2f}%, "
f"Val Loss: {avg_val_loss:.4f}, Val Acc: {val_acc:.2f}%")
# Save best model
if val_acc > best_val_acc:
best_val_acc = val_acc
torch.save(model.state_dict(), "models/best_model.pth")
# Log final model
mlflow.pytorch.log_model(model, "model")
mlflow.log_metric("best_val_acc", best_val_acc)
return model
3. Step 2: Create the API
FastAPI Application
from fastapi import FastAPI, File, UploadFile, HTTPException
from pydantic import BaseModel
import torch
import torch.nn.functional as F
from torchvision import transforms
from PIL import Image
import io
from typing import List
app = FastAPI(title="Image Classifier API", version="1.0.0")
# Load model
model = None
class_names = None
@app.on_event("startup")
async def load_model():
global model, class_names
from model import ImageClassifier
model = ImageClassifier(num_classes=10)
model.load_state_dict(torch.load("models/best_model.pth", map_location="cpu"))
model.eval()
class_names = ["cat", "dog", "bird", "fish", "frog",
"hamster", "rabbit", "snake", "turtle", "hamster"]
print("Model loaded successfully")
# Transform for inference
inference_transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
class Prediction(BaseModel):
class_name: str
confidence: float
class PredictionResponse(BaseModel):
predictions: List[Prediction]
top_prediction: str
top_confidence: float
@app.get("/health")
def health():
return {
"status": "healthy",
"model_loaded": model is not None,
"device": "cuda" if torch.cuda.is_available() else "cpu"
}
@app.post("/predict", response_model=PredictionResponse)
async def predict(file: UploadFile = File(...)):
if not file.content_type.startswith("image/"):
raise HTTPException(status_code=400, detail="File must be an image")
try:
# Read and preprocess image
contents = await file.read()
image = Image.open(io.BytesIO(contents)).convert("RGB")
input_tensor = inference_transform(image).unsqueeze(0)
# Predict
with torch.no_grad():
outputs = model(input_tensor)
probs = F.softmax(outputs, dim=1)[0]
# Get top predictions
top_probs, top_indices = torch.topk(probs, 3)
predictions = [
Prediction(
class_name=class_names[idx.item()],
confidence=prob.item()
)
for prob, idx in zip(top_probs, top_indices)
]
return PredictionResponse(
predictions=predictions,
top_prediction=class_names[top_indices[0].item()],
top_confidence=top_probs[0].item()
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/predict/batch")
async def predict_batch(files: List[UploadFile] = File(...)):
results = []
for file in files:
try:
contents = await file.read()
image = Image.open(io.BytesIO(contents)).convert("RGB")
input_tensor = inference_transform(image).unsqueeze(0)
with torch.no_grad():
outputs = model(input_tensor)
probs = F.softmax(outputs, dim=1)[0]
top_prob, top_idx = torch.max(probs, 0)
results.append({
"filename": file.filename,
"prediction": class_names[top_idx.item()],
"confidence": top_prob.item()
})
except Exception as e:
results.append({
"filename": file.filename,
"error": str(e)
})
return {"results": results}
4. Step 3: Containerize
Dockerfile
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
libgl1-mesa-glx \
libglib2.0-0 \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
# Create models directory
RUN mkdir -p models
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# requirements.txt
fastapi==0.104.1
uvicorn==0.24.0
torch==2.1.0
torchvision==0.16.0
pillow==10.1.0
python-multipart==0.0.6
prometheus-client==0.19.0
Docker Compose
# docker-compose.yml
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
volumes:
- ./models:/app/models:ro
environment:
- MODEL_PATH=/app/models/best_model.pth
- LOG_LEVEL=info
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
- ./certs:/etc/nginx/certs:ro
depends_on:
- api
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
DfContainer Orchestration
Docker Compose defines and runs multi-container applications. For production at scale, use Kubernetes for auto-scaling, rolling updates, self-healing, and service discovery. Docker Compose is ideal for development and small-scale deployments.
5. Step 4: Deploy
Deployment Script
import subprocess
import sys
from pathlib import Path
def build_and_deploy(environment="staging"):
"""Build and deploy the ML API"""
print(f"Deploying to {environment}...")
# Build Docker image
subprocess.run(["docker", "build", "-t", f"ml-api:{environment}", "."], check=True)
# Tag for registry
registry = "your-registry.com"
subprocess.run(["docker", "tag", f"ml-api:{environment}",
f"{registry}/ml-api:{environment}"], check=True)
# Push to registry
subprocess.run(["docker", "push", f"{registry}/ml-api:{environment}"], check=True)
# Deploy (Kubernetes example)
if environment == "production":
subprocess.run(["kubectl", "apply", "-f", "k8s/production.yaml"], check=True)
else:
subprocess.run(["kubectl", "apply", "-f", "k8s/staging.yaml"], check=True)
print(f"Deployment to {environment} complete!")
if __name__ == "__main__":
env = sys.argv[1] if len(sys.argv) > 1 else "staging"
build_and_deploy(env)
Kubernetes Deployment
# k8s/production.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: ml-api
labels:
app: ml-api
spec:
replicas: 3
selector:
matchLabels:
app: ml-api
template:
metadata:
labels:
app: ml-api
spec:
containers:
- name: ml-api
image: your-registry.com/ml-api:production
ports:
- containerPort: 8000
resources:
requests:
memory: "2Gi"
cpu: "1000m"
limits:
memory: "4Gi"
cpu: "2000m"
nvidia.com/gpu: 1
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: ml-api
spec:
selector:
app: ml-api
ports:
- port: 80
targetPort: 8000
type: LoadBalancer
ā¹ļø Kubernetes Probes
Liveness probes check if the container is running; if it fails, Kubernetes restarts the pod. Readiness probes check if the pod is ready to receive traffic; if it fails, the pod is removed from the load balancer. This ensures traffic only goes to healthy pods.
6. Step 5: Monitor
Prometheus Metrics
from prometheus_client import Counter, Histogram, generate_latest
import time
PREDICTION_COUNT = Counter("predictions_total", "Total predictions")
PREDICTION_LATENCY = Histogram("prediction_latency_seconds", "Prediction latency")
ERROR_COUNT = Counter("prediction_errors_total", "Total prediction errors")
@app.middleware("http")
async def add_middleware(request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
PREDICTION_LATENCY.observe(process_time)
return response
@app.post("/predict/monitored")
async def predict_monitored(file: UploadFile = File(...)):
try:
PREDICTION_COUNT.inc()
# ... prediction logic ...
except Exception as e:
ERROR_COUNT.inc()
raise
@app.get("/metrics")
def metrics():
return generate_latest()
Grafana Dashboard Queries
# Request rate
rate(predictions_total[5m])
# Latency percentiles
histogram_quantile(0.95, rate(prediction_latency_seconds_bucket[5m]))
histogram_quantile(0.99, rate(prediction_latency_seconds_bucket[5m]))
# Error rate
rate(prediction_errors_total[5m]) / rate(predictions_total[5m])
š” Monitoring Strategy
Key metrics to monitor: (1) Request rate (throughput), (2) Latency percentiles (p50, p95, p99), (3) Error rate, (4) Model accuracy (if ground truth available), (5) Input data distribution (detect drift). Set up alerts for anomalies in any of these.
7. Testing
Unit Tests
import pytest
from fastapi.testclient import TestClient
from main import app
client = TestClient(app)
def test_health():
response = client.get("/health")
assert response.status_code == 200
assert response.json()["status"] == "healthy"
def test_predict():
with open("test_image.jpg", "rb") as f:
response = client.post("/predict", files={"file": ("test.jpg", f, "image/jpeg")})
assert response.status_code == 200
assert "predictions" in response.json()
def test_predict_batch():
files = [
("files", ("test1.jpg", open("test1.jpg", "rb"), "image/jpeg")),
("files", ("test2.jpg", open("test2.jpg", "rb"), "image/jpeg")),
]
response = client.post("/predict/batch", files=files)
assert response.status_code == 200
assert "results" in response.json()
Load Testing
# locustfile.py
from locust import HttpUser, task, between
class MLApiUser(HttpUser):
wait_time = between(1, 3)
@task
def predict(self):
with open("test_image.jpg", "rb") as f:
self.client.post("/predict", files={"file": ("test.jpg", f, "image/jpeg")})
@task(3)
def health_check(self):
self.client.get("/health")
8. Project Deliverables
Checklist
- Train model with > 90% accuracy
- Create FastAPI with /predict, /health endpoints
- Docker container builds and runs
- Docker Compose with all services
- Prometheus metrics endpoint
- Grafana dashboard configured
- Unit tests passing
- Load test shows < 200ms latency at p99
- Documentation complete
- CI/CD pipeline configured
Documentation
## API Documentation
- `GET /health` - Health check
- `POST /predict` - Single image prediction
- `POST /predict/batch` - Batch prediction
- `GET /metrics` - Prometheus metrics
## Deployment
1. Build: `docker build -t ml-api .`
2. Run: `docker-compose up -d`
3. Test: `curl http://localhost:8000/health`
## Monitoring
- Grafana: http://localhost:3000
- Prometheus: http://localhost:9090
9. Key Takeaways
šSummary: Deploy a Deep Learning Model
- Complete pipeline: Data ā Training ā API ā Docker ā Deploy ā Monitor
- FastAPI handles inference requests with async support
- Docker ensures reproducible deployment environments
- Kubernetes provides scaling and orchestration for production
- Monitoring with Prometheus and Grafana tracks performance
- Testing validates functionality and performance under load
- Documentation enables team collaboration and maintenance
- Start with a simple deployment, then add complexity as needed
- Always include health checks and graceful error handling
- Monitor both system metrics (latency, throughput) and model metrics (accuracy, drift)
10. Extension Ideas
Advanced Features
- A/B Testing: Deploy multiple model versions, route traffic
- Canary Releases: Gradually roll out new versions
- Model Optimization: ONNX, TensorRT for faster inference
- Edge Deployment: Convert to TFLite for mobile
- Auto-scaling: Scale based on request load
Monitoring Enhancements
- Data Drift: Monitor input distribution changes
- Model Performance: Track accuracy over time
- Alerting: Notify on errors or degradation
- Logging: Centralized log aggregation