Project 3: Deploy a Deep Learning Model

Module 3: Advanced ML + Deep LearningFree Lesson

Advertisement

Project 3: Deploy a Deep Learning Model

šŸ’” This project brings together everything you've learned — from data preprocessing to production deployment. You'll build a complete ML system: train a deep learning model, create a REST API, containerize it, and deploy with monitoring.


1. Project Overview

System Architecture

Architecture Diagram
ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
│                    Production System                         │
ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤
│                                                             │
│  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”    ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”    ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”              │
│  │ Client  │───→│   Nginx  │───→│ FastAPI  │              │
│  │ (React) │    │ (Proxy)  │    │  (API)   │              │
│  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜    ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜    ā””ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”˜              │
│                                      │                      │
│                              ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”“ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”              │
│                              │               │              │
│                         ā”Œā”€ā”€ā”€ā”€ā”“ā”€ā”€ā”€ā”€ā”    ā”Œā”€ā”€ā”€ā”€ā”“ā”€ā”€ā”€ā”€ā”        │
│                         │ Model   │    │  Redis  │        │
│                         │ Server  │    │ (Cache) │        │
│                         ā””ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”˜    ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜        │
│                              │                              │
│                         ā”Œā”€ā”€ā”€ā”€ā”“ā”€ā”€ā”€ā”€ā”                         │
│                         │ Docker  │                         │
│                         │ (GPU)   │                         │
│                         ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜                         │
│                                                             │
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜

Tech Stack

ComponentTechnologyPurpose
ModelPyTorch/TensorFlowDeep learning
APIFastAPIREST endpoints
ContainerDockerPackaging
OrchestrationDocker ComposeMulti-service
MonitoringPrometheus + GrafanaMetrics
LoggingELK StackCentralized logs

DfEnd-to-End ML System

An end-to-end ML system encompasses the complete lifecycle: data ingestion, preprocessing, model training, evaluation, deployment, monitoring, and feedback loops. Production ML systems require careful consideration of latency, throughput, reliability, and observability.


2. Step 1: Train the Model

Data Pipeline

import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from pathlib import Path
from PIL import Image

class ImageDataset(Dataset):
    def __init__(self, root_dir, split="train"):
        self.root_dir = Path(root_dir) / split
        self.images = list(self.root_dir.glob("**/*.jpg"))
        self.labels = [img.parent.name for img in self.images]

        # Create label mapping
        self.label_map = {label: i for i, label in enumerate(set(self.labels))}

        # Transforms
        if split == "train":
            self.transform = transforms.Compose([
                transforms.Resize((224, 224)),
                transforms.RandomHorizontalFlip(),
                transforms.ToTensor(),
                transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
            ])
        else:
            self.transform = transforms.Compose([
                transforms.Resize((224, 224)),
                transforms.ToTensor(),
                transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
            ])

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = Image.open(self.images[idx]).convert("RGB")
        label = self.label_map[self.labels[idx]]
        return self.transform(image), label

Model Architecture

import torch.nn as nn
import torchvision.models as models

class ImageClassifier(nn.Module):
    def __init__(self, num_classes, pretrained=True):
        super().__init__()
        self.backbone = models.resnet50(pretrained=pretrained)

        # Freeze early layers
        for param in list(self.backbone.parameters())[:-20]:
            param.requires_grad = False

        # Replace classifier
        in_features = self.backbone.fc.in_features
        self.backbone.fc = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(in_features, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        return self.backbone(x)

    def predict(self, x):
        self.eval()
        with torch.no_grad():
            logits = self.forward(x)
            probs = torch.softmax(logits, dim=1)
            return probs

ā„¹ļø Transfer Learning Strategy

Freezing early layers preserves pre-trained features (edges, textures) while allowing later layers to adapt to your specific task. This is especially effective when you have limited training data — you need fewer parameters to train.

Training Script

import torch.optim as optim
from torch.optim.lr_scheduler import CosineAnnealingLR
import mlflow
import mlflow.pytorch

def train_model(model, train_loader, val_loader, config):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=config["lr"], weight_decay=config["weight_decay"])
    scheduler = CosineAnnealingLR(optimizer, T_max=config["epochs"])

    mlflow.set_experiment("image-classification")

    with mlflow.start_run(run_name=config.get("run_name", "resnet50")):
        mlflow.log_params(config)

        best_val_acc = 0.0

        for epoch in range(config["epochs"]):
            # Training
            model.train()
            train_loss = 0.0
            correct = 0
            total = 0

            for images, labels in train_loader:
                images, labels = images.to(device), labels.to(device)

                optimizer.zero_grad()
                outputs = model(images)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

                train_loss += loss.item()
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += predicted.eq(labels).sum().item()

            train_acc = 100.0 * correct / total
            avg_train_loss = train_loss / len(train_loader)

            # Validation
            model.eval()
            val_loss = 0.0
            correct = 0
            total = 0

            with torch.no_grad():
                for images, labels in val_loader:
                    images, labels = images.to(device), labels.to(device)
                    outputs = model(images)
                    loss = criterion(outputs, labels)

                    val_loss += loss.item()
                    _, predicted = outputs.max(1)
                    total += labels.size(0)
                    correct += predicted.eq(labels).sum().item()

            val_acc = 100.0 * correct / total
            avg_val_loss = val_loss / len(val_loader)

            scheduler.step()

            # Log metrics
            mlflow.log_metrics({
                "train_loss": avg_train_loss,
                "train_acc": train_acc,
                "val_loss": avg_val_loss,
                "val_acc": val_acc,
                "lr": scheduler.get_last_lr()[0],
            }, step=epoch)

            print(f"Epoch {epoch+1}/{config['epochs']}: "
                  f"Train Loss: {avg_train_loss:.4f}, Train Acc: {train_acc:.2f}%, "
                  f"Val Loss: {avg_val_loss:.4f}, Val Acc: {val_acc:.2f}%")

            # Save best model
            if val_acc > best_val_acc:
                best_val_acc = val_acc
                torch.save(model.state_dict(), "models/best_model.pth")

        # Log final model
        mlflow.pytorch.log_model(model, "model")
        mlflow.log_metric("best_val_acc", best_val_acc)

    return model

3. Step 2: Create the API

FastAPI Application

from fastapi import FastAPI, File, UploadFile, HTTPException
from pydantic import BaseModel
import torch
import torch.nn.functional as F
from torchvision import transforms
from PIL import Image
import io
from typing import List

app = FastAPI(title="Image Classifier API", version="1.0.0")

# Load model
model = None
class_names = None

@app.on_event("startup")
async def load_model():
    global model, class_names
    from model import ImageClassifier

    model = ImageClassifier(num_classes=10)
    model.load_state_dict(torch.load("models/best_model.pth", map_location="cpu"))
    model.eval()

    class_names = ["cat", "dog", "bird", "fish", "frog",
                   "hamster", "rabbit", "snake", "turtle", "hamster"]
    print("Model loaded successfully")

# Transform for inference
inference_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

class Prediction(BaseModel):
    class_name: str
    confidence: float

class PredictionResponse(BaseModel):
    predictions: List[Prediction]
    top_prediction: str
    top_confidence: float

@app.get("/health")
def health():
    return {
        "status": "healthy",
        "model_loaded": model is not None,
        "device": "cuda" if torch.cuda.is_available() else "cpu"
    }

@app.post("/predict", response_model=PredictionResponse)
async def predict(file: UploadFile = File(...)):
    if not file.content_type.startswith("image/"):
        raise HTTPException(status_code=400, detail="File must be an image")

    try:
        # Read and preprocess image
        contents = await file.read()
        image = Image.open(io.BytesIO(contents)).convert("RGB")
        input_tensor = inference_transform(image).unsqueeze(0)

        # Predict
        with torch.no_grad():
            outputs = model(input_tensor)
            probs = F.softmax(outputs, dim=1)[0]

        # Get top predictions
        top_probs, top_indices = torch.topk(probs, 3)
        predictions = [
            Prediction(
                class_name=class_names[idx.item()],
                confidence=prob.item()
            )
            for prob, idx in zip(top_probs, top_indices)
        ]

        return PredictionResponse(
            predictions=predictions,
            top_prediction=class_names[top_indices[0].item()],
            top_confidence=top_probs[0].item()
        )

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/predict/batch")
async def predict_batch(files: List[UploadFile] = File(...)):
    results = []
    for file in files:
        try:
            contents = await file.read()
            image = Image.open(io.BytesIO(contents)).convert("RGB")
            input_tensor = inference_transform(image).unsqueeze(0)

            with torch.no_grad():
                outputs = model(input_tensor)
                probs = F.softmax(outputs, dim=1)[0]

            top_prob, top_idx = torch.max(probs, 0)
            results.append({
                "filename": file.filename,
                "prediction": class_names[top_idx.item()],
                "confidence": top_prob.item()
            })
        except Exception as e:
            results.append({
                "filename": file.filename,
                "error": str(e)
            })

    return {"results": results}

4. Step 3: Containerize

Dockerfile

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    libgl1-mesa-glx \
    libglib2.0-0 \
    && rm -rf /var/lib/apt/lists/*

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY . .

# Create models directory
RUN mkdir -p models

EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# requirements.txt
fastapi==0.104.1
uvicorn==0.24.0
torch==2.1.0
torchvision==0.16.0
pillow==10.1.0
python-multipart==0.0.6
prometheus-client==0.19.0

Docker Compose

# docker-compose.yml
version: '3.8'

services:
  api:
    build: .
    ports:
      - "8000:8000"
    volumes:
      - ./models:/app/models:ro
    environment:
      - MODEL_PATH=/app/models/best_model.pth
      - LOG_LEVEL=info
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
      - ./certs:/etc/nginx/certs:ro
    depends_on:
      - api

  prometheus:
    image: prom/prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml

  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin

DfContainer Orchestration

Docker Compose defines and runs multi-container applications. For production at scale, use Kubernetes for auto-scaling, rolling updates, self-healing, and service discovery. Docker Compose is ideal for development and small-scale deployments.


5. Step 4: Deploy

Deployment Script

import subprocess
import sys
from pathlib import Path

def build_and_deploy(environment="staging"):
    """Build and deploy the ML API"""

    print(f"Deploying to {environment}...")

    # Build Docker image
    subprocess.run(["docker", "build", "-t", f"ml-api:{environment}", "."], check=True)

    # Tag for registry
    registry = "your-registry.com"
    subprocess.run(["docker", "tag", f"ml-api:{environment}",
                    f"{registry}/ml-api:{environment}"], check=True)

    # Push to registry
    subprocess.run(["docker", "push", f"{registry}/ml-api:{environment}"], check=True)

    # Deploy (Kubernetes example)
    if environment == "production":
        subprocess.run(["kubectl", "apply", "-f", "k8s/production.yaml"], check=True)
    else:
        subprocess.run(["kubectl", "apply", "-f", "k8s/staging.yaml"], check=True)

    print(f"Deployment to {environment} complete!")

if __name__ == "__main__":
    env = sys.argv[1] if len(sys.argv) > 1 else "staging"
    build_and_deploy(env)

Kubernetes Deployment

# k8s/production.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ml-api
  labels:
    app: ml-api
spec:
  replicas: 3
  selector:
    matchLabels:
      app: ml-api
  template:
    metadata:
      labels:
        app: ml-api
    spec:
      containers:
      - name: ml-api
        image: your-registry.com/ml-api:production
        ports:
        - containerPort: 8000
        resources:
          requests:
            memory: "2Gi"
            cpu: "1000m"
          limits:
            memory: "4Gi"
            cpu: "2000m"
            nvidia.com/gpu: 1
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: ml-api
spec:
  selector:
    app: ml-api
  ports:
  - port: 80
    targetPort: 8000
  type: LoadBalancer

ā„¹ļø Kubernetes Probes

Liveness probes check if the container is running; if it fails, Kubernetes restarts the pod. Readiness probes check if the pod is ready to receive traffic; if it fails, the pod is removed from the load balancer. This ensures traffic only goes to healthy pods.


6. Step 5: Monitor

Prometheus Metrics

from prometheus_client import Counter, Histogram, generate_latest
import time

PREDICTION_COUNT = Counter("predictions_total", "Total predictions")
PREDICTION_LATENCY = Histogram("prediction_latency_seconds", "Prediction latency")
ERROR_COUNT = Counter("prediction_errors_total", "Total prediction errors")

@app.middleware("http")
async def add_middleware(request, call_next):
    start_time = time.time()
    response = await call_next(request)
    process_time = time.time() - start_time
    PREDICTION_LATENCY.observe(process_time)
    return response

@app.post("/predict/monitored")
async def predict_monitored(file: UploadFile = File(...)):
    try:
        PREDICTION_COUNT.inc()
        # ... prediction logic ...
    except Exception as e:
        ERROR_COUNT.inc()
        raise

@app.get("/metrics")
def metrics():
    return generate_latest()

Grafana Dashboard Queries

# Request rate
rate(predictions_total[5m])

# Latency percentiles
histogram_quantile(0.95, rate(prediction_latency_seconds_bucket[5m]))
histogram_quantile(0.99, rate(prediction_latency_seconds_bucket[5m]))

# Error rate
rate(prediction_errors_total[5m]) / rate(predictions_total[5m])

šŸ’” Monitoring Strategy

Key metrics to monitor: (1) Request rate (throughput), (2) Latency percentiles (p50, p95, p99), (3) Error rate, (4) Model accuracy (if ground truth available), (5) Input data distribution (detect drift). Set up alerts for anomalies in any of these.


7. Testing

Unit Tests

import pytest
from fastapi.testclient import TestClient
from main import app

client = TestClient(app)

def test_health():
    response = client.get("/health")
    assert response.status_code == 200
    assert response.json()["status"] == "healthy"

def test_predict():
    with open("test_image.jpg", "rb") as f:
        response = client.post("/predict", files={"file": ("test.jpg", f, "image/jpeg")})
    assert response.status_code == 200
    assert "predictions" in response.json()

def test_predict_batch():
    files = [
        ("files", ("test1.jpg", open("test1.jpg", "rb"), "image/jpeg")),
        ("files", ("test2.jpg", open("test2.jpg", "rb"), "image/jpeg")),
    ]
    response = client.post("/predict/batch", files=files)
    assert response.status_code == 200
    assert "results" in response.json()

Load Testing

# locustfile.py
from locust import HttpUser, task, between

class MLApiUser(HttpUser):
    wait_time = between(1, 3)

    @task
    def predict(self):
        with open("test_image.jpg", "rb") as f:
            self.client.post("/predict", files={"file": ("test.jpg", f, "image/jpeg")})

    @task(3)
    def health_check(self):
        self.client.get("/health")

8. Project Deliverables

Checklist

  • Train model with > 90% accuracy
  • Create FastAPI with /predict, /health endpoints
  • Docker container builds and runs
  • Docker Compose with all services
  • Prometheus metrics endpoint
  • Grafana dashboard configured
  • Unit tests passing
  • Load test shows < 200ms latency at p99
  • Documentation complete
  • CI/CD pipeline configured

Documentation

## API Documentation
- `GET /health` - Health check
- `POST /predict` - Single image prediction
- `POST /predict/batch` - Batch prediction
- `GET /metrics` - Prometheus metrics

## Deployment
1. Build: `docker build -t ml-api .`
2. Run: `docker-compose up -d`
3. Test: `curl http://localhost:8000/health`

## Monitoring
- Grafana: http://localhost:3000
- Prometheus: http://localhost:9090

9. Key Takeaways

šŸ“‹Summary: Deploy a Deep Learning Model

  • Complete pipeline: Data → Training → API → Docker → Deploy → Monitor
  • FastAPI handles inference requests with async support
  • Docker ensures reproducible deployment environments
  • Kubernetes provides scaling and orchestration for production
  • Monitoring with Prometheus and Grafana tracks performance
  • Testing validates functionality and performance under load
  • Documentation enables team collaboration and maintenance
  • Start with a simple deployment, then add complexity as needed
  • Always include health checks and graceful error handling
  • Monitor both system metrics (latency, throughput) and model metrics (accuracy, drift)

10. Extension Ideas

Advanced Features

  1. A/B Testing: Deploy multiple model versions, route traffic
  2. Canary Releases: Gradually roll out new versions
  3. Model Optimization: ONNX, TensorRT for faster inference
  4. Edge Deployment: Convert to TFLite for mobile
  5. Auto-scaling: Scale based on request load

Monitoring Enhancements

  1. Data Drift: Monitor input distribution changes
  2. Model Performance: Track accuracy over time
  3. Alerting: Notify on errors or degradation
  4. Logging: Centralized log aggregation

11. Resources

Advertisement

Need Expert Data Science Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement