πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Data Pipelines for AI

🟒 Free Lesson

Advertisement

Data Pipelines for AI

Raw DataDatabasesAPIs, FilesExtractIngestionValidationTransformCleanFeature EngStoreFeature StoreVersioned DataServeTraining DataInference DataMLData Quality Monitoring + Lineage Tracking

Data pipelines for AI ensure clean, versioned, and feature-engineered data flows from sources to model training and inference.

Feature Engineering Pipeline

import pandas as pd
import numpy as np
from dataclasses import dataclass
from typing import List, Callable

@dataclass
class Feature:
    name: str
    transform: Callable
    description: str

class FeaturePipeline:
    def __init__(self):
        self.features: List[Feature] = []

    def add_feature(self, name: str, transform: Callable, description: str = ""):
        self.features.append(Feature(name, transform, description))

    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        result = df.copy()
        for feature in self.features:
            result[feature.name] = feature.transform(result)
        return result

    def get_feature_names(self) -> List[str]:
        return [f.name for f in self.features]

# Usage
pipeline = FeaturePipeline()
pipeline.add_feature("revenue_per_unit", lambda df: df["revenue"] / df["units_sold"])
pipeline.add_feature("profit_margin", lambda df: (df["revenue"] - df["cost"]) / df["revenue"])
pipeline.add_feature("month", lambda df: pd.to_datetime(df["date"]).dt.month)

Data Version Control

import hashlib
import json
from datetime import datetime

class DataVersionControl:
    def __init__(self, storage_path: str = "data_versions"):
        self.storage_path = storage_path
        self.versions = {}

    def commit(self, dataset_name: str, df: pd.DataFrame, message: str) -> str:
        content_hash = hashlib.sha256(df.to_csv().encode()).hexdigest()[:12]
        version = {
            "hash": content_hash,
            "timestamp": datetime.now().isoformat(),
            "message": message,
            "rows": len(df),
            "columns": list(df.columns),
            "schema": df.dtypes.astype(str).to_dict()
        }
        if dataset_name not in self.versions:
            self.versions[dataset_name] = []
        self.versions[dataset_name].append(version)
        df.to_parquet(f"{self.storage_path}/{dataset_name}_{content_hash}.parquet")
        return content_hash

    def checkout(self, dataset_name: str, version_hash: str) -> pd.DataFrame:
        return pd.read_parquet(f"{self.storage_path}/{dataset_name}_{version_hash}.parquet")

    def log(self, dataset_name: str) -> list:
        return self.versions.get(dataset_name, [])

# Usage
dvc = DataVersionControl()
hash1 = dvc.commit("training_data", train_df, "Initial dataset")
hash2 = dvc.commit("training_data", updated_df, "Added new features")
train_df = dvc.checkout("training_data", hash1)

Data Quality Monitor

class DataQualityMonitor:
    def __init__(self):
        self.rules = []
        self.metrics_history = []

    def add_rule(self, name: str, check_fn, severity: str = "error"):
        self.rules.append({"name": name, "check": check_fn, "severity": severity})

    def check_quality(self, df: pd.DataFrame) -> dict:
        results = {"passed": [], "failed": [], "warnings": []}
        for rule in self.rules:
            try:
                passed = rule["check"](df)
                if passed:
                    results["passed"].append(rule["name"])
                else:
                    entry = {"name": rule["name"], "severity": rule["severity"]}
                    if rule["severity"] == "error":
                        results["failed"].append(entry)
                    else:
                        results["warnings"].append(entry)
            except Exception as e:
                results["failed"].append({"name": rule["name"], "error": str(e)})

        results["overall"] = len(results["failed"]) == 0
        results["score"] = len(results["passed"]) / len(self.rules) if self.rules else 1.0
        self.metrics_history.append(results)
        return results

    def add_default_rules(self):
        self.add_rule("no_nulls", lambda df: df.isnull().sum().sum() == 0, "warning")
        self.add_rule("min_rows", lambda df: len(df) > 100, "error")
        self.add_rule("no_duplicates", lambda df: df.duplicated().sum() == 0, "warning")

# Usage
monitor = DataQualityMonitor()
monitor.add_default_rules()
quality = monitor.check_quality(df)

Key Takeaways

  • Feature pipelines automate feature creation and transformation
  • Data versioning tracks dataset changes for reproducibility
  • Quality monitoring catches data issues before training
  • Feature stores serve features consistently for training and inference
  • Lineage tracking documents data origins and transformations
⭐

Premium Content

Data Pipelines for AI

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Generative AI Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement