πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Building a Data Quality Framework from Scratch

Data EngineeringData Quality⭐ Premium

Advertisement

Google & Airbnb Interview

Building a Data Quality Framework from Scratch

Ensuring data reliability and trustworthiness at scale

Interview Question

"Design a data quality framework that: (1) validates data at ingestion, (2) monitors data freshness, (3) detects anomalies, (4) quarantines bad data, (5) alerts stakeholders. How do you measure data quality, and what metrics do you track?"

Difficulty: Hard | Frequently asked at Google, Airbnb, Uber, Netflix


Theoretical Foundation

Data Quality Dimensions

Data quality is measured across multiple dimensions:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Data Quality Dimensions                        β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                             β”‚
β”‚  1. Completeness: Are all expected records present?         β”‚
β”‚     Metric: % of non-null values                           β”‚
β”‚     Formula: (non-null count / total count) Γ— 100          β”‚
β”‚                                                             β”‚
β”‚  2. Accuracy: Does data reflect reality?                    β”‚
β”‚     Metric: % of values matching source of truth           β”‚
β”‚     Formula: (correct count / total count) Γ— 100           β”‚
β”‚                                                             β”‚
β”‚  3. Consistency: Is data consistent across systems?         β”‚
β”‚     Metric: % of values matching across sources            β”‚
β”‚     Formula: (matching values / total values) Γ— 100        β”‚
β”‚                                                             β”‚
β”‚  4. Timeliness: Is data available when needed?              β”‚
β”‚     Metric: Time from event to availability                β”‚
β”‚     Formula: availability_time - event_time                β”‚
β”‚                                                             β”‚
β”‚  5. Uniqueness: Are records unique?                         β”‚
β”‚     Metric: % of unique records                            β”‚
β”‚     Formula: (unique count / total count) Γ— 100            β”‚
β”‚                                                             β”‚
β”‚  6. Validity: Does data conform to rules?                   β”‚
β”‚     Metric: % of values passing validation rules           β”‚
β”‚     Formula: (valid count / total count) Γ— 100             β”‚
β”‚                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Data Quality Score

DQS=w1Γ—C+w2Γ—A+w3Γ—Co+w4Γ—T+w5Γ—U+w6Γ—VDQS = w_1 \times C + w_2 \times A + w_3 \times Co + w_4 \times T + w_5 \times U + w_6 \times V

Where:

  • CC = Completeness score
  • AA = Accuracy score
  • CoCo = Consistency score
  • TT = Timeliness score
  • UU = Uniqueness score
  • VV = Validity score
  • wiw_i = Weights (sum to 1)

Example weights for different use cases:

Use CaseCompletenessAccuracyConsistencyTimelinessUniquenessValidity
Financial reporting0.30.30.20.10.050.05
Real-time analytics0.10.10.10.40.10.2
ML training0.20.20.10.10.20.2

Data Quality Pipeline

Data Quality PipelineSource→Ingestion→Validation→Quarantine→LoadFreshnessRules EngineBad DataClean DataMonitoringAlertsReportsAnalytics

Validation Rules

Schema Validation

schema_rules = {
    "columns": {
        "user_id": {"type": "string", "nullable": False},
        "email": {"type": "string", "nullable": False, "pattern": r"^[\w\.-]+@[\w\.-]+\.\w+$"},
        "age": {"type": "integer", "nullable": True, "min": 0, "max": 150},
        "signup_date": {"type": "date", "nullable": False},
    }
}

Statistical Validation

statistical_rules = {
    "row_count": {
        "min": 1000,
        "max": 10000000,
        "expected": 50000,
        "tolerance": 0.2  # Β±20%
    },
    "null_rate": {
        "user_id": {"max": 0.0},
        "email": {"max": 0.01},
        "age": {"max": 0.05},
    }
}

Business Rules

business_rules = [
    # Foreign key relationships
    {"type": "referential", "table": "orders", "column": "user_id", 
     "references": {"table": "users", "column": "id"}},
    
    # Cross-column validation
    {"type": "expression", "condition": "start_date <= end_date"},
    
    # Business logic
    {"type": "custom", "function": "validate_email_domain"},
]

Anomaly Detection Methods

1. Statistical Process Control (SPC)

UCL=ΞΌ+3Οƒ\text{UCL} = \mu + 3\sigma
LCL=ΞΌβˆ’3Οƒ\text{LCL} = \mu - 3\sigma

Points outside control limits are anomalies.

2. Z-Score

z=xβˆ’ΞΌΟƒz = \frac{x - \mu}{\sigma}

If ∣z∣>3|z| > 3, the value is anomalous.

3. IQR Method

IQR=Q3βˆ’Q1\text{IQR} = Q3 - Q1
LowerΒ Bound=Q1βˆ’1.5Γ—IQR\text{Lower Bound} = Q1 - 1.5 \times \text{IQR}
UpperΒ Bound=Q3+1.5Γ—IQR\text{Upper Bound} = Q3 + 1.5 \times \text{IQR}

4. Time Series Decomposition

Yt=Tt+St+RtY_t = T_t + S_t + R_t

Where:

  • TtT_t = Trend component
  • StS_t = Seasonal component
  • RtR_t = Residual (anomaly if large)

Data Freshness

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Data Freshness Metrics                         β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                             β”‚
β”‚  1. Latency: Time from event to availability               β”‚
β”‚     Formula: available_time - event_time                    β”‚
β”‚     Target: < 1 hour (batch), < 1 minute (streaming)      β”‚
β”‚                                                             β”‚
β”‚  2. Freshness: Time since last update                      β”‚
β”‚     Formula: now - last_update_time                        β”‚
β”‚     Target: < schedule_interval + buffer                   β”‚
β”‚                                                             β”‚
β”‚  3. Staleness: How outdated the data is                    β”‚
β”‚     Formula: now - max(timestamp_column)                   β”‚
β”‚     Target: < business_requirement                         β”‚
β”‚                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Code Implementation

Data Quality Framework Core

from dataclasses import dataclass
from typing import List, Dict, Callable, Optional
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
from enum import Enum

class QualityCheckResult(Enum):
    PASS = "pass"
    FAIL = "fail"
    WARN = "warn"

@dataclass
class QualityCheck:
    name: str
    description: str
    check_func: Callable
    severity: str  # critical, warning, info
    dimensions: List[str]  # completeness, accuracy, etc.

@dataclass
class QualityReport:
    timestamp: datetime
    table_name: str
    checks: List[Dict]
    overall_score: float
    passed: bool
    details: Dict

class DataQualityFramework:
    def __init__(self, spark_session=None):
        self.spark = spark_session
        self.checks = {}
        self.reports = []
        
    def register_check(self, check: QualityCheck):
        """Register a quality check"""
        self.checks[check.name] = check
    
    def run_checks(self, df, table_name: str) -> QualityReport:
        """Run all registered checks on a DataFrame"""
        
        results = []
        scores = {}
        
        for name, check in self.checks.items():
            try:
                result = check.check_func(df)
                results.append({
                    'name': name,
                    'description': check.description,
                    'result': result,
                    'severity': check.severity,
                    'dimensions': check.dimensions,
                })
                
                # Calculate dimension scores
                for dim in check.dimensions:
                    if dim not in scores:
                        scores[dim] = []
                    scores[dim].append(1.0 if result == QualityCheckResult.PASS else 0.0)
                    
            except Exception as e:
                results.append({
                    'name': name,
                    'description': check.description,
                    'result': QualityCheckResult.FAIL,
                    'severity': check.severity,
                    'error': str(e),
                })
        
        # Calculate overall score
        all_scores = []
        for dim, dim_scores in scores.items():
            all_scores.append(np.mean(dim_scores))
        
        overall_score = np.mean(all_scores) if all_scores else 0.0
        
        # Determine if passed (no critical failures)
        critical_failures = [r for r in results 
                           if r['result'] == QualityCheckResult.FAIL 
                           and r['severity'] == 'critical']
        
        report = QualityReport(
            timestamp=datetime.now(),
            table_name=table_name,
            checks=results,
            overall_score=overall_score,
            passed=len(critical_failures) == 0,
            details=scores,
        )
        
        self.reports.append(report)
        return report

Built-in Quality Checks

class BuiltinChecks:
    """Built-in quality checks"""
    
    @staticmethod
    def completeness_check(columns: List[str], max_null_rate: float = 0.0):
        """Check completeness of specified columns"""
        
        def check(df):
            for col in columns:
                null_rate = df.filter(df[col].isNull()).count() / df.count()
                if null_rate > max_null_rate:
                    return QualityCheckResult.FAIL
            return QualityCheckResult.PASS
        
        return QualityCheck(
            name=f"completeness_{'_'.join(columns)}",
            description=f"Check completeness of {columns}",
            check_func=check,
            severity='critical',
            dimensions=['completeness'],
        )
    
    @staticmethod
    def uniqueness_check(columns: List[str]):
        """Check uniqueness of specified columns"""
        
        def check(df):
            total_count = df.count()
            distinct_count = df.select(columns).distinct().count()
            
            if total_count != distinct_count:
                return QualityCheckResult.FAIL
            return QualityCheckResult.PASS
        
        return QualityCheck(
            name=f"uniqueness_{'_'.join(columns)}",
            description=f"Check uniqueness of {columns}",
            check_func=check,
            severity='critical',
            dimensions=['uniqueness'],
        )
    
    @staticmethod
    def range_check(column: str, min_val=None, max_val=None):
        """Check if values are within range"""
        
        def check(df):
            if min_val is not None:
                below_min = df.filter(df[col] < min_val).count()
                if below_min > 0:
                    return QualityCheckResult.FAIL
            
            if max_val is not None:
                above_max = df.filter(df[col] > max_val).count()
                if above_max > 0:
                    return QualityCheckResult.FAIL
            
            return QualityCheckResult.PASS
        
        return QualityCheck(
            name=f"range_{column}",
            description=f"Check range of {column}",
            check_func=check,
            severity='warning',
            dimensions=['validity'],
        )
    
    @staticmethod
    def pattern_check(column: str, pattern: str):
        """Check if values match pattern"""
        
        def check(df):
            import re
            non_matching = df.filter(~df[col].rlike(pattern)).count()
            if non_matching > 0:
                return QualityCheckResult.FAIL
            return QualityCheckResult.PASS
        
        return QualityCheck(
            name=f"pattern_{column}",
            description=f"Check pattern of {column}",
            check_func=check,
            severity='warning',
            dimensions=['validity'],
        )
    
    @staticmethod
    def freshness_check(timestamp_column: str, max_age_hours: int = 24):
        """Check data freshness"""
        
        def check(df):
            from pyspark.sql import functions as F
            max_timestamp = df.agg(F.max(timestamp_column)).collect()[0][0]
            
            if max_timestamp is None:
                return QualityCheckResult.FAIL
            
            age = datetime.now() - max_timestamp
            if age > timedelta(hours=max_age_hours):
                return QualityCheckResult.FAIL
            
            return QualityCheckResult.PASS
        
        return QualityCheck(
            name=f"freshness_{timestamp_column}",
            description=f"Check freshness of {timestamp_column}",
            check_func=check,
            severity='critical',
            dimensions=['timeliness'],
        )
    
    @staticmethod
    def row_count_check(min_rows: int, max_rows: int = None):
        """Check row count is within bounds"""
        
        def check(df):
            count = df.count()
            if count < min_rows:
                return QualityCheckResult.FAIL
            if max_rows and count > max_rows:
                return QualityCheckResult.FAIL
            return QualityCheckResult.PASS
        
        return QualityCheck(
            name="row_count",
            description=f"Check row count between {min_rows} and {max_rows}",
            check_func=check,
            severity='critical',
            dimensions=['completeness'],
        )

Anomaly Detection

class AnomalyDetector:
    """Detect anomalies in data"""
    
    def __init__(self, spark_session):
        self.spark = spark_session
    
    def detect_statistical_anomalies(self, df, column, threshold=3.0):
        """Detect anomalies using Z-score"""
        from pyspark.sql import functions as F
        
        # Calculate mean and std
        stats = df.agg(
            F.mean(column).alias('mean'),
            F.stddev(column).alias('stddev')
        ).collect()[0]
        
        mean = stats['mean']
        stddev = stats['stddev']
        
        # Calculate Z-score
        anomalies = df.withColumn(
            'z_score',
            (F.col(column) - mean) / stddev
        ).filter(F.abs(F.col('z_score')) > threshold)
        
        return anomalies
    
    def detect_iqr_anomalies(self, df, column):
        """Detect anomalies using IQR method"""
        from pyspark.sql import functions as F
        
        # Calculate quartiles
        quantiles = df.approxQuantile(column, [0.25, 0.75], 0.01)
        q1, q3 = quantiles
        
        iqr = q3 - q1
        lower_bound = q1 - 1.5 * iqr
        upper_bound = q3 + 1.5 * iqr
        
        # Find anomalies
        anomalies = df.filter(
            (F.col(column) < lower_bound) | 
            (F.col(column) > upper_bound)
        )
        
        return anomalies
    
    def detect_time_series_anomalies(self, df, timestamp_col, value_col, 
                                      window_size=7, threshold=2.0):
        """Detect anomalies in time series data"""
        from pyspark.sql import functions as F
        from pyspark.sql.window import Window
        
        # Calculate rolling average and std
        window = Window.orderBy(timestamp_col).rowsBetween(-window_size, -1)
        
        df_with_stats = df.withColumn(
            'rolling_avg', F.avg(value_col).over(window)
        ).withColumn(
            'rolling_std', F.stddev(value_col).over(window)
        )
        
        # Detect anomalies
        anomalies = df_with_stats.withColumn(
            'z_score',
            (F.col(value_col) - F.col('rolling_avg')) / F.col('rolling_std')
        ).filter(F.abs(F.col('z_score')) > threshold)
        
        return anomalies

Data Quarantine

class DataQuarantine:
    """Quarantine bad data for investigation"""
    
    def __init__(self, spark_session, quarantine_path):
        self.spark = spark_session
        self.quarantine_path = quarantine_path
    
    def quarantine_records(self, bad_records, reason, table_name):
        """Quarantine bad records"""
        from pyspark.sql import functions as F
        
        # Add metadata
        quarantined = bad_records \
            .withColumn('_quarantine_reason', F.lit(reason)) \
            .withColumn('_quarantine_timestamp', F.current_timestamp()) \
            .withColumn('_source_table', F.lit(table_name))
        
        # Write to quarantine storage
        quarantined.write \
            .format("delta") \
            .mode("append") \
            .partitionBy("_quarantine_timestamp") \
            .save(f"{self.quarantine_path}/{table_name}")
        
        return quarantined.count()
    
    def get_quarantine_stats(self, table_name):
        """Get quarantine statistics"""
        from pyspark.sql import functions as F
        
        quarantine_df = self.spark.read \
            .format("delta") \
            .load(f"{self.quarantine_path}/{table_name}")
        
        stats = quarantine_df.groupBy('_quarantine_reason') \
            .agg(
                F.count("*").alias("count"),
                F.min("_quarantine_timestamp").alias("first_seen"),
                F.max("_quarantine_timestamp").alias("last_seen")
            )
        
        return stats

Monitoring and Alerting

class DataQualityMonitor:
    """Monitor data quality and send alerts"""
    
    def __init__(self, alert_config):
        self.alert_config = alert_config
    
    def send_alert(self, report: QualityReport):
        """Send alert based on quality report"""
        
        if not report.passed:
            self._send_critical_alert(report)
        elif report.overall_score < 0.9:
            self._send_warning_alert(report)
    
    def _send_critical_alert(self, report):
        """Send critical alert"""
        import requests
        
        critical_failures = [c for c in report.checks 
                           if c['result'] == QualityCheckResult.FAIL 
                           and c['severity'] == 'critical']
        
        message = f"""
🚨 CRITICAL DATA QUALITY FAILURE

Table: {report.table_name}
Time: {report.timestamp}
Score: {report.overall_score:.2%}

Failed Checks:
{chr(10).join(f"- {c['name']}: {c.get('error', 'Failed')}" for c in critical_failures)}

Please investigate immediately.
        """
        
        # Send to Slack/PagerDuty
        requests.post(
            self.alert_config['slack_webhook'],
            json={'text': message}
        )
    
    def _send_warning_alert(self, report):
        """Send warning alert"""
        import requests
        
        message = f"""
⚠️ Data Quality Warning

Table: {report.table_name}
Time: {report.timestamp}
Score: {report.overall_score:.2%}

Some checks are below threshold.
        """
        
        requests.post(
            self.alert_config['slack_webhook'],
            json={'text': message}
        )

Complete Example

# ============================================================
# COMPLETE DATA QUALITY FRAMEWORK EXAMPLE
# ============================================================

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DataQualityFramework") \
    .getOrCreate()

# Initialize framework
dq_framework = DataQualityFramework(spark)

# Register checks
dq_framework.register_check(
    BuiltinChecks.completeness_check(['user_id', 'email'], max_null_rate=0.01)
)

dq_framework.register_check(
    BuiltinChecks.uniqueness_check(['user_id'])
)

dq_framework.register_check(
    BuiltinChecks.range_check('age', min_val=0, max_val=150)
)

dq_framework.register_check(
    BuiltinChecks.freshness_check('updated_at', max_age_hours=24)
)

dq_framework.register_check(
    BuiltinChecks.row_count_check(min_rows=1000)
)

# Read data
df = spark.read.parquet("s3://data-lake/users/")

# Run quality checks
report = dq_framework.run_checks(df, "users")

# Process report
if not report.passed:
    # Quarantine bad data
    quarantine = DataQuarantine(spark, "s3://quarantine/")
    
    # Find failing records (example)
    bad_records = df.filter(df.user_id.isNull())
    quarantine.quarantine_records(bad_records, "null_user_id", "users")

# Send alerts
monitor = DataQualityMonitor({
    'slack_webhook': Variable.get('slack_webhook_url')
})
monitor.send_alert(report)

# Generate quality report
print(f"Overall Score: {report.overall_score:.2%}")
print(f"Passed: {report.passed}")
for check in report.checks:
    print(f"  {check['name']}: {check['result'].value}")

Great Expectations Integration

import great_expectations as ge

# ============================================================
# GREAT EXPECTATIONS INTEGRATION
# ============================================================

# Convert to Great Expectations DataFrame
ge_df = ge.from_pandas(df.toPandas())

# Define expectations
ge_df.expect_column_values_to_not_be_null('user_id')
ge_df.expect_column_values_to_be_unique('user_id')
ge_df.expect_column_values_to_be_between('age', min_value=0, max_value=150)
ge_df.expect_column_values_to_match_regex('email', r'^[\w\.-]+@[\w\.-]+\.\w+$')

# Validate
results = ge_df.validate()

# Check results
if not results.success:
    print("Validation failed!")
    for result in results.results:
        if not result.success:
            print(f"  {result.expectation_config.expectation_type}: {result.result}")

πŸ’‘

Production Tip: Start with critical checks (completeness, freshness) and gradually add more sophisticated checks. Don't try to implement everything at onceβ€”iterate based on actual data issues you encounter.


Common Follow-Up Questions

Q1: How do you measure data quality ROI?

Track metrics:

  • Incidents prevented: Number of bad data incidents caught before impact
  • Time saved: Hours spent on data debugging reduced
  • Revenue protected: Business impact of data quality improvements
  • Customer satisfaction: Reduction in data-related complaints

Q2: How do you handle data quality in real-time?

# Real-time quality checks with Spark Streaming
streaming_df = spark.readStream \
    .format("kafka") \
    .load()

# Apply quality checks
quality_checked = streaming_df \
    .withColumn("is_valid", 
        (F.col("user_id").isNotNull()) &
        (F.col("amount") > 0)
    )

# Split into good and bad streams
good_stream = quality_checked.filter(F.col("is_valid"))
bad_stream = quality_checked.filter(~F.col("is_valid"))

# Write to separate sinks
good_stream.writeStream.format("delta").start("s3://good-data/")
bad_stream.writeStream.format("delta").start("s3://quarantine/")

Q3: How do you handle data quality across teams?

  • Data contracts: Define quality expectations between teams
  • SLAs: Agree on freshness and quality targets
  • Shared metrics: Common quality dashboards
  • Escalation process: Clear ownership for quality issues

Q4: How do you prioritize quality checks?

Prioritize based on:

  1. Business impact: What's the cost of bad data?
  2. Frequency: How often does this issue occur?
  3. Detection difficulty: How easy is it to detect?
  4. Remediation cost: How hard is it to fix?

⚠️

Critical Consideration: Data quality is not just a technical problemβ€”it's a business problem. Work with stakeholders to define quality requirements, and focus on the checks that provide the most business value.


Company-Specific Tips

Google Interview Tips

  • Discuss data contracts and schema evolution
  • Explain anomaly detection at scale
  • Mention data freshness monitoring
  • Talk about automated remediation

Airbnb Interview Tips

  • Focus on data trust and user-facing metrics
  • Discuss A/B testing data quality
  • Mention ML data quality for recommendations
  • Talk about cost optimization for quality checks

Uber Interview Tips

  • Discuss real-time data quality for ride matching
  • Explain geospatial data validation
  • Mention payment data accuracy
  • Talk about multi-region quality monitoring

ℹ️

Final Takeaway: Data quality is a continuous process, not a one-time project. Start with critical checks, monitor constantly, and iterate based on actual issues. The goal is not perfect data, but reliable data that stakeholders can trust.

Advertisement