πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Data Validation & Testing

MLOpsData Quality⭐ Premium

Advertisement

Data Validation & Testing

Difficulty: Senior Level | Companies: Google, Meta, Netflix, Uber, Stripe

Data Quality Framework

Data issues cause 80% of ML failures. Proper validation prevents corrupted data from entering training.

ℹ️

Google's Data Validation framework (part of TFX) processes petabytes of data daily with automated quality checks.

Great Expectations Integration

# data_validation.py
import great_expectations as gx
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from datetime import datetime

@dataclass
class ValidationResult:
    suite_name: str
    passed: bool
    statistics: Dict[str, int]
    failures: List[Dict]
    timestamp: str

class DataValidator:
    def __init__(self, data_path: str):
        self.data_path = data_path
        self.context = gx.get_context()
        self.validation_results: List[ValidationResult] = []

    def create_expectation_suite(self, suite_name: str) -> Any:
        suite = self.context.add_expectation_suite(expectation_suite_name=suite_name)
        return suite

    def add_column_expectations(self, suite, column_name: str, expectations: List[Dict]):
        for expectation in expectations:
            exp_type = expectation.pop("type")
            if exp_type == "expect_column_values_to_not_be_null":
                suite.add_expectation(
                    gx.expectations.ExpectColumnValuesToNotBeNull(column=column_name)
                )
            elif exp_type == "expect_column_values_to_be_between":
                suite.add_expectation(
                    gx.expectations.ExpectColumnValuesToBeBetween(
                        column=column_name,
                        min_value=expectation.get("min_value"),
                        max_value=expectation.get("max_value")
                    )
                )
            elif exp_type == "expect_column_values_to_be_in_set":
                suite.add_expectation(
                    gx.expectations.ExpectColumnValuesToBeInSet(
                        column=column_name,
                        value_set=expectation.get("value_set")
                    )
                )
            elif exp_type == "expect_column_values_to_match_regex":
                suite.add_expectation(
                    gx.expectations.ExpectColumnValuesToMatchRegex(
                        column=column_name,
                        regex=expectation.get("regex")
                    )
                )
        return suite

    def validate_data(self, suite_name: str) -> ValidationResult:
        df = pd.read_parquet(self.data_path)
        batch_request = self.context.add_datasource(
            name="pandas_datasource",
            class_name="Datasource",
            module_name="datasource.pandas",
            batch_kwargs={"path": self.data_path, "datasource": "pandas_datasource"}
        )

        checkpoint = self.context.add_checkpoint(
            name="checkpoint",
            validations=[{
                "batch_request": batch_request,
                "expectation_suite_name": suite_name
            }]
        )

        result = checkpoint.run()
        validation_result = ValidationResult(
            suite_name=suite_name,
            passed=result.success,
            statistics=result.statistics,
            failures=[],
            timestamp=datetime.now().isoformat()
        )

        self.validation_results.append(validation_result)
        return validation_result


class StatisticalValidator:
    def __init__(self):
        self.validation_rules: Dict[str, Dict] = {}

    def add_rule(self, feature_name: str, rule_type: str, params: Dict):
        if feature_name not in self.validation_rules:
            self.validation_rules[feature_name] = []
        self.validation_rules[feature_name].append({"type": rule_type, **params})

    def validate(self, df: pd.DataFrame) -> Dict[str, List[str]]:
        errors = {}
        for feature_name, rules in self.validation_rules.items():
            if feature_name not in df.columns:
                errors[feature_name] = [f"Column {feature_name} not found"]
                continue

            column = df[feature_name]
            feature_errors = []

            for rule in rules:
                if rule["type"] == "range":
                    if column.min() < rule["min"] or column.max() > rule["max"]:
                        feature_errors.append(
                            f"Values outside range [{rule['min']}, {rule['max']}]"
                        )
                elif rule["type"] == "distribution":
                    mean_diff = abs(column.mean() - rule["expected_mean"])
                    if mean_diff > rule["tolerance"]:
                        feature_errors.append(
                            f"Mean {column.mean():.4f} differs from expected {rule['expected_mean']:.4f}"
                        )
                elif rule["type"] == "completeness":
                    null_ratio = column.isnull().mean()
                    if null_ratio > rule["max_null_ratio"]:
                        feature_errors.append(
                            f"Null ratio {null_ratio:.4f} exceeds max {rule['max_null_ratio']}"
                        )
                elif rule["type"] == "uniqueness":
                    unique_ratio = column.nunique() / len(column)
                    if unique_ratio < rule["min_unique_ratio"]:
                        feature_errors.append(
                            f"Unique ratio {unique_ratio:.4f} below min {rule['min_unique_ratio']}"
                        )

            if feature_errors:
                errors[feature_name] = feature_errors

        return errors


# Usage
validator = DataValidator("data/raw/latest.parquet")
suite = validator.create_expectation_suite("customer_features_suite")

column_expectations = [
    {"type": "expect_column_values_to_not_be_null"},
    {"type": "expect_column_values_to_be_between", "min_value": 0, "max_value": 1000000},
]

for col in ["customer_id", "lifetime_value", "total_purchases"]:
    validator.add_column_expectations(suite, col, column_expectations)

result = validator.validate_data("customer_features_suite")
print(f"Validation passed: {result.passed}")

stat_validator = StatisticalValidator()
stat_validator.add_rule("lifetime_value", "range", {"min": 0, "max": 1000000})
stat_validator.add_rule("lifetime_value", "completeness", {"max_null_ratio": 0.05})
stat_validator.add_rule("total_purchases", "range", {"min": 1, "max": 10000})

df = pd.DataFrame({
    "lifetime_value": np.random.uniform(0, 10000, 1000),
    "total_purchases": np.random.randint(1, 100, 1000)
})

errors = stat_validator.validate(df)
print(f"Validation errors: {errors}")

Data Quality Monitoring

# data_monitoring.py
import pandas as pd
import numpy as np
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime
import json

@dataclass
class QualityMetric:
    name: str
    value: float
    threshold: float
    passed: bool
    timestamp: str

class DataQualityMonitor:
    def __init__(self):
        self.metrics_history: List[QualityMetric] = []
        self.baseline_stats: Dict[str, float] = {}

    def set_baseline(self, df: pd.DataFrame):
        self.baseline_stats = {
            "mean": df.mean().to_dict(),
            "std": df.std().to_dict(),
            "null_ratio": df.isnull().mean().to_dict(),
            "unique_ratio": (df.nunique() / len(df)).to_dict(),
        }

    def check_quality(self, df: pd.DataFrame) -> List[QualityMetric]:
        metrics = []
        timestamp = datetime.now().isoformat()

        overall_null_ratio = df.isnull().mean().mean()
        metrics.append(QualityMetric(
            name="overall_null_ratio",
            value=overall_null_ratio,
            threshold=0.1,
            passed=overall_null_ratio < 0.1,
            timestamp=timestamp
        ))

        for col in df.columns:
            if col in self.baseline_stats["mean"]:
                current_mean = df[col].mean()
                baseline_mean = self.baseline_stats["mean"][col]
                baseline_std = self.baseline_stats["std"][col]

                z_score = abs(current_mean - baseline_mean) / max(baseline_std, 1e-6)
                metrics.append(QualityMetric(
                    name=f"{col}_mean_drift",
                    value=z_score,
                    threshold=3.0,
                    passed=z_score < 3.0,
                    timestamp=timestamp
                ))

        self.metrics_history.extend(metrics)
        return metrics

    def get_quality_report(self) -> Dict:
        if not self.metrics_history:
            return {}

        failed_metrics = [m for m in self.metrics_history if not m.passed]
        return {
            "total_checks": len(self.metrics_history),
            "passed_checks": len(self.metrics_history) - len(failed_metrics),
            "failed_checks": len(failed_metrics),
            "pass_rate": (len(self.metrics_history) - len(failed_metrics)) / max(1, len(self.metrics_history)),
            "failed_metric_names": [m.name for m in failed_metrics]
        }


monitor = DataQualityMonitor()
baseline_df = pd.DataFrame({
    "feature_1": np.random.normal(0, 1, 1000),
    "feature_2": np.random.uniform(0, 100, 1000),
})
monitor.set_baseline(baseline_df)

current_df = pd.DataFrame({
    "feature_1": np.random.normal(2, 1.5, 1000),
    "feature_2": np.random.uniform(0, 150, 1000),
})

metrics = monitor.check_quality(current_df)
for metric in metrics:
    status = "PASS" if metric.passed else "FAIL"
    print(f"[{status}] {metric.name}: {metric.value:.4f} (threshold: {metric.threshold})")

Follow-Up Questions

  1. How do you handle data validation for streaming data?
  2. What are the trade-offs between strict and lenient validation?
  3. How would you implement data versioning alongside model versioning?
  4. What metrics best capture overall data quality?

Advertisement