Data Validation & Testing
Difficulty: Senior Level | Companies: Google, Meta, Netflix, Uber, Stripe
Data Quality Framework
Data issues cause 80% of ML failures. Proper validation prevents corrupted data from entering training.
βΉοΈ
Google's Data Validation framework (part of TFX) processes petabytes of data daily with automated quality checks.
Great Expectations Integration
# data_validation.py
import great_expectations as gx
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from datetime import datetime
@dataclass
class ValidationResult:
suite_name: str
passed: bool
statistics: Dict[str, int]
failures: List[Dict]
timestamp: str
class DataValidator:
def __init__(self, data_path: str):
self.data_path = data_path
self.context = gx.get_context()
self.validation_results: List[ValidationResult] = []
def create_expectation_suite(self, suite_name: str) -> Any:
suite = self.context.add_expectation_suite(expectation_suite_name=suite_name)
return suite
def add_column_expectations(self, suite, column_name: str, expectations: List[Dict]):
for expectation in expectations:
exp_type = expectation.pop("type")
if exp_type == "expect_column_values_to_not_be_null":
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column=column_name)
)
elif exp_type == "expect_column_values_to_be_between":
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column=column_name,
min_value=expectation.get("min_value"),
max_value=expectation.get("max_value")
)
)
elif exp_type == "expect_column_values_to_be_in_set":
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeInSet(
column=column_name,
value_set=expectation.get("value_set")
)
)
elif exp_type == "expect_column_values_to_match_regex":
suite.add_expectation(
gx.expectations.ExpectColumnValuesToMatchRegex(
column=column_name,
regex=expectation.get("regex")
)
)
return suite
def validate_data(self, suite_name: str) -> ValidationResult:
df = pd.read_parquet(self.data_path)
batch_request = self.context.add_datasource(
name="pandas_datasource",
class_name="Datasource",
module_name="datasource.pandas",
batch_kwargs={"path": self.data_path, "datasource": "pandas_datasource"}
)
checkpoint = self.context.add_checkpoint(
name="checkpoint",
validations=[{
"batch_request": batch_request,
"expectation_suite_name": suite_name
}]
)
result = checkpoint.run()
validation_result = ValidationResult(
suite_name=suite_name,
passed=result.success,
statistics=result.statistics,
failures=[],
timestamp=datetime.now().isoformat()
)
self.validation_results.append(validation_result)
return validation_result
class StatisticalValidator:
def __init__(self):
self.validation_rules: Dict[str, Dict] = {}
def add_rule(self, feature_name: str, rule_type: str, params: Dict):
if feature_name not in self.validation_rules:
self.validation_rules[feature_name] = []
self.validation_rules[feature_name].append({"type": rule_type, **params})
def validate(self, df: pd.DataFrame) -> Dict[str, List[str]]:
errors = {}
for feature_name, rules in self.validation_rules.items():
if feature_name not in df.columns:
errors[feature_name] = [f"Column {feature_name} not found"]
continue
column = df[feature_name]
feature_errors = []
for rule in rules:
if rule["type"] == "range":
if column.min() < rule["min"] or column.max() > rule["max"]:
feature_errors.append(
f"Values outside range [{rule['min']}, {rule['max']}]"
)
elif rule["type"] == "distribution":
mean_diff = abs(column.mean() - rule["expected_mean"])
if mean_diff > rule["tolerance"]:
feature_errors.append(
f"Mean {column.mean():.4f} differs from expected {rule['expected_mean']:.4f}"
)
elif rule["type"] == "completeness":
null_ratio = column.isnull().mean()
if null_ratio > rule["max_null_ratio"]:
feature_errors.append(
f"Null ratio {null_ratio:.4f} exceeds max {rule['max_null_ratio']}"
)
elif rule["type"] == "uniqueness":
unique_ratio = column.nunique() / len(column)
if unique_ratio < rule["min_unique_ratio"]:
feature_errors.append(
f"Unique ratio {unique_ratio:.4f} below min {rule['min_unique_ratio']}"
)
if feature_errors:
errors[feature_name] = feature_errors
return errors
# Usage
validator = DataValidator("data/raw/latest.parquet")
suite = validator.create_expectation_suite("customer_features_suite")
column_expectations = [
{"type": "expect_column_values_to_not_be_null"},
{"type": "expect_column_values_to_be_between", "min_value": 0, "max_value": 1000000},
]
for col in ["customer_id", "lifetime_value", "total_purchases"]:
validator.add_column_expectations(suite, col, column_expectations)
result = validator.validate_data("customer_features_suite")
print(f"Validation passed: {result.passed}")
stat_validator = StatisticalValidator()
stat_validator.add_rule("lifetime_value", "range", {"min": 0, "max": 1000000})
stat_validator.add_rule("lifetime_value", "completeness", {"max_null_ratio": 0.05})
stat_validator.add_rule("total_purchases", "range", {"min": 1, "max": 10000})
df = pd.DataFrame({
"lifetime_value": np.random.uniform(0, 10000, 1000),
"total_purchases": np.random.randint(1, 100, 1000)
})
errors = stat_validator.validate(df)
print(f"Validation errors: {errors}")
Data Quality Monitoring
# data_monitoring.py
import pandas as pd
import numpy as np
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime
import json
@dataclass
class QualityMetric:
name: str
value: float
threshold: float
passed: bool
timestamp: str
class DataQualityMonitor:
def __init__(self):
self.metrics_history: List[QualityMetric] = []
self.baseline_stats: Dict[str, float] = {}
def set_baseline(self, df: pd.DataFrame):
self.baseline_stats = {
"mean": df.mean().to_dict(),
"std": df.std().to_dict(),
"null_ratio": df.isnull().mean().to_dict(),
"unique_ratio": (df.nunique() / len(df)).to_dict(),
}
def check_quality(self, df: pd.DataFrame) -> List[QualityMetric]:
metrics = []
timestamp = datetime.now().isoformat()
overall_null_ratio = df.isnull().mean().mean()
metrics.append(QualityMetric(
name="overall_null_ratio",
value=overall_null_ratio,
threshold=0.1,
passed=overall_null_ratio < 0.1,
timestamp=timestamp
))
for col in df.columns:
if col in self.baseline_stats["mean"]:
current_mean = df[col].mean()
baseline_mean = self.baseline_stats["mean"][col]
baseline_std = self.baseline_stats["std"][col]
z_score = abs(current_mean - baseline_mean) / max(baseline_std, 1e-6)
metrics.append(QualityMetric(
name=f"{col}_mean_drift",
value=z_score,
threshold=3.0,
passed=z_score < 3.0,
timestamp=timestamp
))
self.metrics_history.extend(metrics)
return metrics
def get_quality_report(self) -> Dict:
if not self.metrics_history:
return {}
failed_metrics = [m for m in self.metrics_history if not m.passed]
return {
"total_checks": len(self.metrics_history),
"passed_checks": len(self.metrics_history) - len(failed_metrics),
"failed_checks": len(failed_metrics),
"pass_rate": (len(self.metrics_history) - len(failed_metrics)) / max(1, len(self.metrics_history)),
"failed_metric_names": [m.name for m in failed_metrics]
}
monitor = DataQualityMonitor()
baseline_df = pd.DataFrame({
"feature_1": np.random.normal(0, 1, 1000),
"feature_2": np.random.uniform(0, 100, 1000),
})
monitor.set_baseline(baseline_df)
current_df = pd.DataFrame({
"feature_1": np.random.normal(2, 1.5, 1000),
"feature_2": np.random.uniform(0, 150, 1000),
})
metrics = monitor.check_quality(current_df)
for metric in metrics:
status = "PASS" if metric.passed else "FAIL"
print(f"[{status}] {metric.name}: {metric.value:.4f} (threshold: {metric.threshold})")
Follow-Up Questions
- How do you handle data validation for streaming data?
- What are the trade-offs between strict and lenient validation?
- How would you implement data versioning alongside model versioning?
- What metrics best capture overall data quality?