Project 2: End-to-End ML Pipeline
Build a production-grade ML pipeline that ingests raw data, engineers features, trains models, evaluates performance, and outputs deployment-ready artifacts.
Pipeline Architecture
1. Data Ingestion and Validation
Schema Enforcement
import pandas as pd
from pandera import DataFrameSchema, Column, Check
schema = DataFrameSchema({
"age": Column(int, Check.in_range(0, 120)),
"income": Column(float, Check.greater_than(0)),
"category": Column(str, Check.isin(["A", "B", "C"])),
"target": Column(int, Check.isin([0, 1]))
})
def ingest_data(path: str) -> pd.DataFrame:
df = pd.read_csv(path)
validated = schema.validate(df)
return validated
Data Drift Detection
from scipy.stats import ks_2samp
def detect_drift(reference: pd.Series, current: pd.Series, threshold=0.05):
stat, p_value = ks_2samp(reference, current)
return {"drifted": p_value < threshold, "statistic": stat, "p_value": p_value}
2. Feature Engineering Pipeline
Sklearn Pipeline Implementation
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.feature_selection import SelectKBest, f_classif
numeric_features = ["age", "income", "tenure"]
categorical_features = ["category", "region"]
preprocessor = ColumnTransformer([
("num", Pipeline([
("scaler", StandardScaler()),
("selector", SelectKBest(f_classif, k=5))
]), numeric_features),
("cat", OneHotEncoder(handle_unknown="ignore"), categorical_features)
])
full_pipeline = Pipeline([
("preprocess", preprocessor),
("model", None) # set dynamically
])
3. Model Training and Selection
Hyperparameter Search Space
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import randint, uniform
param_dist = {
"model__n_estimators": randint(100, 1000),
"model__max_depth": randint(3, 20),
"model__learning_rate": uniform(0.01, 0.3),
"model__subsample": uniform(0.6, 0.4)
}
search = RandomizedSearchCV(
full_pipeline, param_dist,
n_iter=50, cv=5, scoring="roc_auc",
random_state=42, n_jobs=-1
)
4. Evaluation Framework
5. Experiment Tracking
import mlflow
import mlflow.sklearn
with mlflow.start_run(run_name="xgboost-v1"):
mlflow.log_params(search.best_params_)
mlflow.log_metric("cv_roc_auc", search.best_score_)
mlflow.sklearn.log_model(search.best_estimator_, "model")
mlflow.log_artifact("feature_importance.png")
6. Model Serialization
import joblib
import json
joblib.dump(search.best_estimator_, "pipeline.joblib")
# Inference payload
payload = {
"features": {"age": 35, "income": 75000, "category": "A"},
"model_version": "1.0.0"
}
json.dump(payload, open("sample_payload.json", "w"))
Project Structure
Architecture Diagram
ml-pipeline/
├── data/
│ ├── raw/
│ └── processed/
├── src/
│ ├── ingest.py
│ ├── features.py
│ ├── train.py
│ ├── evaluate.py
│ └── predict.py
├── pipelines/
│ └── pipeline.yaml
├── models/
├── configs/
│ └── params.yaml
├── tests/
├── notebooks/
├── Dockerfile
└── main.py
Key Takeaways
- Modularity: Each stage is independently testable and replaceable
- Reproducibility: Pipeline configs + versioned data = reproducible experiments
- Automation: Orchestrate with Airflow, Prefect, or Kubeflow Pipelines
- Monitoring: Track data drift, model performance, and system health post-deployment