CW

Project 2: End-to-End ML Pipeline

Module 11: End-to-End MLFree Lesson

Advertisement

Project 2: End-to-End ML Pipeline

Build a production-grade ML pipeline that ingests raw data, engineers features, trains models, evaluates performance, and outputs deployment-ready artifacts.

Pipeline Architecture

Raw DataCSV/DB/APIData ProcessingClean/ValidateFeature Eng.TransformModel TrainFit/TuneEvaluateMetricsDeployData ValidationFeature StoreModel RegistryEnd-to-End ML Pipeline ArchitectureEach stage is modular, testable, and reproducible

1. Data Ingestion and Validation

Schema Enforcement

import pandas as pd
from pandera import DataFrameSchema, Column, Check

schema = DataFrameSchema({
    "age": Column(int, Check.in_range(0, 120)),
    "income": Column(float, Check.greater_than(0)),
    "category": Column(str, Check.isin(["A", "B", "C"])),
    "target": Column(int, Check.isin([0, 1]))
})

def ingest_data(path: str) -> pd.DataFrame:
    df = pd.read_csv(path)
    validated = schema.validate(df)
    return validated

Data Drift Detection

from scipy.stats import ks_2samp

def detect_drift(reference: pd.Series, current: pd.Series, threshold=0.05):
    stat, p_value = ks_2samp(reference, current)
    return {"drifted": p_value < threshold, "statistic": stat, "p_value": p_value}

2. Feature Engineering Pipeline

Raw FeaturesNumeric TransformCategorical Enc.Text VectorizeFeatureUnionFeatureSelectionOutputFeature Engineering Pipelinesklearn Pipeline / ColumnTransformer for reproducibility

Sklearn Pipeline Implementation

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.feature_selection import SelectKBest, f_classif

numeric_features = ["age", "income", "tenure"]
categorical_features = ["category", "region"]

preprocessor = ColumnTransformer([
    ("num", Pipeline([
        ("scaler", StandardScaler()),
        ("selector", SelectKBest(f_classif, k=5))
    ]), numeric_features),
    ("cat", OneHotEncoder(handle_unknown="ignore"), categorical_features)
])

full_pipeline = Pipeline([
    ("preprocess", preprocessor),
    ("model", None)  # set dynamically
])

3. Model Training and Selection

Hyperparameter Search Space

from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import randint, uniform

param_dist = {
    "model__n_estimators": randint(100, 1000),
    "model__max_depth": randint(3, 20),
    "model__learning_rate": uniform(0.01, 0.3),
    "model__subsample": uniform(0.6, 0.4)
}

search = RandomizedSearchCV(
    full_pipeline, param_dist,
    n_iter=50, cv=5, scoring="roc_auc",
    random_state=42, n_jobs=-1
)

4. Evaluation Framework

Evaluation Metrics DashboardClassificationAccuracy, F1, AUCPrecision, RecallConfusion MatrixRegressionRMSE, MAE, R²MAPE, ResidualsPrediction Dist.RankingNDCG, MAPMRR, Hit RateAUC-PRCross-Validation StrategyStratified K-Fold | Time Series Split | Group K-FoldNested CV for unbiased estimation | Bootstrap confidence intervalsMetrics must align with business objectives

5. Experiment Tracking

import mlflow
import mlflow.sklearn

with mlflow.start_run(run_name="xgboost-v1"):
    mlflow.log_params(search.best_params_)
    mlflow.log_metric("cv_roc_auc", search.best_score_)
    mlflow.sklearn.log_model(search.best_estimator_, "model")
    mlflow.log_artifact("feature_importance.png")

6. Model Serialization

import joblib
import json

joblib.dump(search.best_estimator_, "pipeline.joblib")

# Inference payload
payload = {
    "features": {"age": 35, "income": 75000, "category": "A"},
    "model_version": "1.0.0"
}
json.dump(payload, open("sample_payload.json", "w"))

Project Structure

Architecture Diagram
ml-pipeline/
├── data/
│   ├── raw/
│   └── processed/
├── src/
│   ├── ingest.py
│   ├── features.py
│   ├── train.py
│   ├── evaluate.py
│   └── predict.py
├── pipelines/
│   └── pipeline.yaml
├── models/
├── configs/
│   └── params.yaml
├── tests/
├── notebooks/
├── Dockerfile
└── main.py

Key Takeaways

  • Modularity: Each stage is independently testable and replaceable
  • Reproducibility: Pipeline configs + versioned data = reproducible experiments
  • Automation: Orchestrate with Airflow, Prefect, or Kubeflow Pipelines
  • Monitoring: Track data drift, model performance, and system health post-deployment

Advertisement

Need Expert Data Science Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement