πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Data Quality Validation

⭐ Premium

Advertisement

Data Quality Validation

Bad data silently corrupts models and erodes trust. Learn to systematically validate data quality using modern frameworks and establish data contracts that prevent issues before they occur.

The Data Quality Spectrum

Data quality issues range from simple nulls to subtle distribution shifts that only surface in production metrics.

Data Quality DimensionsCompletenessMissing valuesNull ratesAccuracyValue mismatchesOut-of-rangeConsistencyDuplicate recordsCross-sourceTimelinessStale dataLatency issuesValidityFormat violationsSchema driftImpact: HighImpact: CriticalImpact: HighImpact: MediumImpact: High

Great Expectations

Great Expectations lets you define, validate, and document expectations about your data.

import great_expectations as gx

# Initialize a DataContext
context = gx.get_context()

# Create a DataFrame validator
import pandas as pd
df = pd.read_csv("customers.csv")
validator = context.sources.pandas_default.read_csv("customers.csv")

# Define expectations
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_be_unique("customer_id")
validator.expect_column_values_to_be_between("age", min_value=0, max_value=120)
validator.expect_column_values_to_match_regex("email", r"^[\w\.-]+@[\w\.-]+\.\w+$")
validator.expect_column_values_to_be_in_set(
    "status", ["active", "inactive", "pending", "churned"]
)
validator.expect_column_pair_values_A_to_be_greater_than_B(
    "account_open_date", "last_activity_date"
)

# Statistical expectations
validator.expect_column_mean_to_be_between("purchase_amount", min_value=10, max_value=500)
validator.expect_column_stdev_to_be_between("response_time_ms", min_value=0, max_value=5000)
validator.expect_compound_columns_to_be_unique(["user_id", "transaction_date"])

# Save and run
results = validator.validate()
print(results)

Pandera for Typed Validation

Pandera integrates with pandas and provides schema-based validation with type safety.

import pandera as pa
from pandera import Column, DataFrameSchema, Check

# Define a strict schema
schema = DataFrameSchema({
    "customer_id": Column(int, Check.greater_than(0), unique=True, nullable=False),
    "name": Column(str, Check.str_length(min_value=1, max_value=200), nullable=False),
    "email": Column(str, Check.str_matches(r"^[\w\.-]+@[\w\.-]+\.\w+$"), nullable=False),
    "age": Column(int, Check.in_range(0, 120), nullable=True),
    "signup_date": Column(pa.DateTime, nullable=False),
    "lifetime_value": Column(float, Check.greater_than_or_equal_to(0), nullable=False),
    "segment": Column(str, Check.isin(["enterprise", "mid_market", "smb", "consumer"])),
})

# Validate
df = pd.read_csv("customers.csv", parse_dates=["signup_date"])
validated_df = schema.validate(df)

# Custom checks
schema_with_custom = DataFrameSchema(
    columns={...},
    checks=[
        Check(lambda df: df["end_date"] > df["start_date"], element_wise=False),
        Check(lambda df: df.groupby("customer_id").size().max() <= 1000, element_wise=False),
    ]
)

# Decorator for validation in pipelines
@pa.check_types
def process_customers(df: pa.typing.DataFrame[Schema]) -> pd.DataFrame:
    # df is guaranteed to conform to Schema
    return df[df["status"] == "active"]

Data Contracts

Data contracts formalize the agreement between data producers and consumers.

from dataclasses import dataclass, field
from typing import List, Optional, Dict
from enum import Enum
import json

class QualityLevel(Enum):
    CRITICAL = "critical"  # Pipeline fails
    WARNING = "warning"    # Alert but continue
    INFO = "info"          # Log only

@dataclass
class DataContract:
    schema_name: str
    owner: str
    sla_minutes: int
    columns: Dict[str, dict]
    quality_rules: List[dict]
    
    def validate(self, df):
        violations = []
        
        for col_name, col_spec in self.columns.items():
            if col_spec.get("required", True) and col_name not in df.columns:
                violations.append(f"Missing required column: {col_name}")
                continue
            
            if col_name in df.columns:
                if col_spec.get("nullable", False) == False:
                    null_count = df[col_name].isna().sum()
                    if null_count > 0:
                        violations.append(f"Null values in non-nullable column {col_name}: {null_count}")
        
        for rule in self.quality_rules:
            if not rule["check"](df):
                violations.append(f"Quality rule failed: {rule['name']}")
        
        return violations

# Define a contract
contract = DataContract(
    schema_name="customer_events",
    owner="data-platform-team",
    sla_minutes=15,
    columns={
        "event_id": {"type": "string", "required": True, "nullable": False},
        "user_id": {"type": "string", "required": True, "nullable": False},
        "event_type": {"type": "string", "required": True, "nullable": False,
                       "allowed_values": ["click", "view", "purchase", "signup"]},
        "timestamp": {"type": "datetime", "required": True, "nullable": False},
        "amount": {"type": "float", "required": False, "nullable": True,
                   "min": 0, "max": 1000000}
    },
    quality_rules=[
        {
            "name": "event_freshness",
            "check": lambda df: (pd.Timestamp.now() - df["timestamp"].max()).total_seconds() < 900,
            "level": QualityLevel.CRITICAL
        },
        {
            "name": "unique_events",
            "check": lambda df: df["event_id"].is_unique,
            "level": QualityLevel.CRITICAL
        },
        {
            "name": "reasonable_amounts",
            "check": lambda df: (df["amount"].dropna() >= 0).all(),
            "level": QualityLevel.WARNING
        }
    ]
)

Monitoring and Alerting

import time
from datetime import datetime
from dataclasses import dataclass
from typing import Callable
import logging

@dataclass
class QualityCheck:
    name: str
    check_fn: Callable
    threshold: float
    window_minutes: int = 60

class DataQualityMonitor:
    def __init__(self):
        self.checks = []
        self.alert_history = []
        self.logger = logging.getLogger("data_quality")
    
    def add_check(self, check: QualityCheck):
        self.checks.append(check)
    
    def run_checks(self, data_source):
        results = []
        for check in self.checks:
            try:
                metric = check.check_fn(data_source)
                status = "PASS" if metric >= check.threshold else "FAIL"
                results.append({
                    "check": check.name,
                    "metric": metric,
                    "threshold": check.threshold,
                    "status": status,
                    "timestamp": datetime.now()
                })
                
                if status == "FAIL":
                    self._send_alert(check, metric)
                    
            except Exception as e:
                self.logger.error(f"Check {check.name} failed with error: {e}")
                results.append({
                    "check": check.name,
                    "status": "ERROR",
                    "error": str(e),
                    "timestamp": datetime.now()
                })
        
        return results
    
    def _send_alert(self, check, metric):
        alert = {
            "check": check.name,
            "metric": metric,
            "threshold": check.threshold,
            "timestamp": datetime.now(),
            "severity": "critical"
        }
        self.alert_history.append(alert)
        # Integrate with PagerDuty, Slack, etc.
        self.logger.warning(f"ALERT: {check.name} failed ({metric} < {check.threshold})")

# Usage
monitor = DataQualityMonitor()

monitor.add_check(QualityCheck(
    name="null_rate",
    check_fn=lambda df: 1 - df.isnull().mean().mean(),
    threshold=0.98
))

monitor.add_check(QualityCheck(
    name="freshness_minutes",
    check_fn=lambda df: (pd.Timestamp.now() - df["timestamp"].max()).total_seconds() / 60,
    threshold=30
))

results = monitor.run_checks(df)

Key Takeaways

  • Start with completeness and uniqueness checks, then add statistical validations
  • Data contracts prevent "garbage in, garbage out" by encoding expectations as code
  • Automate quality checks in CI/CD pipelines to catch issues before deployment
  • Track quality metrics over time to detect gradual degradation

Advertisement