Data Pipelines for AI
Data pipelines for AI ensure clean, versioned, and feature-engineered data flows from sources to model training and inference.
Feature Engineering Pipeline
import pandas as pd
import numpy as np
from dataclasses import dataclass
from typing import List, Callable
@dataclass
class Feature:
name: str
transform: Callable
description: str
class FeaturePipeline:
def __init__(self):
self.features: List[Feature] = []
def add_feature(self, name: str, transform: Callable, description: str = ""):
self.features.append(Feature(name, transform, description))
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
result = df.copy()
for feature in self.features:
result[feature.name] = feature.transform(result)
return result
def get_feature_names(self) -> List[str]:
return [f.name for f in self.features]
# Usage
pipeline = FeaturePipeline()
pipeline.add_feature("revenue_per_unit", lambda df: df["revenue"] / df["units_sold"])
pipeline.add_feature("profit_margin", lambda df: (df["revenue"] - df["cost"]) / df["revenue"])
pipeline.add_feature("month", lambda df: pd.to_datetime(df["date"]).dt.month)
Data Version Control
import hashlib
import json
from datetime import datetime
class DataVersionControl:
def __init__(self, storage_path: str = "data_versions"):
self.storage_path = storage_path
self.versions = {}
def commit(self, dataset_name: str, df: pd.DataFrame, message: str) -> str:
content_hash = hashlib.sha256(df.to_csv().encode()).hexdigest()[:12]
version = {
"hash": content_hash,
"timestamp": datetime.now().isoformat(),
"message": message,
"rows": len(df),
"columns": list(df.columns),
"schema": df.dtypes.astype(str).to_dict()
}
if dataset_name not in self.versions:
self.versions[dataset_name] = []
self.versions[dataset_name].append(version)
df.to_parquet(f"{self.storage_path}/{dataset_name}_{content_hash}.parquet")
return content_hash
def checkout(self, dataset_name: str, version_hash: str) -> pd.DataFrame:
return pd.read_parquet(f"{self.storage_path}/{dataset_name}_{version_hash}.parquet")
def log(self, dataset_name: str) -> list:
return self.versions.get(dataset_name, [])
# Usage
dvc = DataVersionControl()
hash1 = dvc.commit("training_data", train_df, "Initial dataset")
hash2 = dvc.commit("training_data", updated_df, "Added new features")
train_df = dvc.checkout("training_data", hash1)
Data Quality Monitor
class DataQualityMonitor:
def __init__(self):
self.rules = []
self.metrics_history = []
def add_rule(self, name: str, check_fn, severity: str = "error"):
self.rules.append({"name": name, "check": check_fn, "severity": severity})
def check_quality(self, df: pd.DataFrame) -> dict:
results = {"passed": [], "failed": [], "warnings": []}
for rule in self.rules:
try:
passed = rule["check"](df)
if passed:
results["passed"].append(rule["name"])
else:
entry = {"name": rule["name"], "severity": rule["severity"]}
if rule["severity"] == "error":
results["failed"].append(entry)
else:
results["warnings"].append(entry)
except Exception as e:
results["failed"].append({"name": rule["name"], "error": str(e)})
results["overall"] = len(results["failed"]) == 0
results["score"] = len(results["passed"]) / len(self.rules) if self.rules else 1.0
self.metrics_history.append(results)
return results
def add_default_rules(self):
self.add_rule("no_nulls", lambda df: df.isnull().sum().sum() == 0, "warning")
self.add_rule("min_rows", lambda df: len(df) > 100, "error")
self.add_rule("no_duplicates", lambda df: df.duplicated().sum() == 0, "warning")
# Usage
monitor = DataQualityMonitor()
monitor.add_default_rules()
quality = monitor.check_quality(df)
Key Takeaways
- Feature pipelines automate feature creation and transformation
- Data versioning tracks dataset changes for reproducibility
- Quality monitoring catches data issues before training
- Feature stores serve features consistently for training and inference
- Lineage tracking documents data origins and transformations