Data Quality at Scale: Frameworks & Automation
Difficulty: Senior Level | Companies: Netflix, Uber, Airbnb, Stripe, Spotify
1. Data Quality Dimensions
| Dimension | Definition | Example |
|---|---|---|
| Completeness | No missing values | null_ratio < 0.01 |
| Accuracy | Values match reality | amount >= 0 |
| Consistency | No contradictions | order_date <= ship_date |
| Timeliness | Data is fresh | last_updated < 1hr ago |
| Uniqueness | No duplicates | count(*) = count(DISTINCT id) |
| Validity | Follows format rules | email LIKE '%@%.%' |
2. Great Expectations Framework
import great_expectations as gx
context = gx.get_context()
# Define expectations
validator = context.sources.pandas_default.read_csv("data/orders.csv")
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_unique("order_id")
validator.expect_column_values_to_be_between("amount", min_value=0, max_value=100000)
validator.expect_column_values_to_match_regex("email", r"^[\w\.-]+@[\w\.-]+\.\w+$")
validator.expect_table_row_count_to_be_between(min_value=1000, max_value=10000000)
# Run validation
results = validator.validate()
# Build checkpoint
checkpoint = context.add_or_update_checkpoint(
name="orders_checkpoint",
validations=[{"batch_request": batch_request, "expectation_suite_name": "orders_suite"}],
)
checkpoint.run()
3. Automated Quality Pipeline
class DataQualityPipeline:
def __init__(self, spark):
self.spark = spark
self.results = []
def check_completeness(self, df, columns, threshold=0.99):
for col in columns:
ratio = df.filter(F.col(col).isNotNull()).count() / df.count()
self.results.append({
"check": f"completeness_{col}",
"passed": ratio >= threshold,
"value": ratio,
"threshold": threshold,
})
def check_uniqueness(self, df, key_columns):
total = df.count()
distinct = df.select(key_columns).distinct().count()
ratio = distinct / total
self.results.append({
"check": f"uniqueness_{'_'.join(key_columns)}",
"passed": ratio >= 0.999,
"value": ratio,
})
def check_freshness(self, table_name, timestamp_col, max_age_minutes=60):
max_ts = self.spark.sql(f"SELECT MAX({timestamp_col}) FROM {table_name}").collect()[0][0]
age_minutes = (datetime.now() - max_ts).total_seconds() / 60
self.results.append({
"check": f"freshness_{table_name}",
"passed": age_minutes <= max_age_minutes,
"age_minutes": age_minutes,
})
def check_volume_anomaly(self, table_name, current_count, historical_avg, historical_std):
z_score = (current_count - historical_avg) / historical_std if historical_std > 0 else 0
self.results.append({
"check": f"volume_{table_name}",
"passed": abs(z_score) <= 3,
"z_score": z_score,
})
def get_summary(self):
passed = sum(1 for r in self.results if r["passed"])
return {"total": len(self.results), "passed": passed, "failed": len(self.results) - passed}
βΉοΈ
Best Practice: Fail fast β run quality checks at ingestion, not after transformation. A bad record should never enter your pipeline.
Follow-Up Questions
- How would you implement data quality monitoring for 10,000+ tables?
- Design a data quality scoring system for executive dashboards.
- How do you handle quality checks for streaming data?
- Design an automated root cause analysis system for quality failures.
- How would you measure the business impact of data quality issues?