Automated Data Quality Pipeline
Great Expectations + Airflow + Soda Core + Data Contracts
ℹ️
Project Difficulty: Advanced | Duration: 2-3 weeks | Cloud: AWS/GCP Build a comprehensive data quality framework that validates, monitors, and reports on data quality across your entire data platform.
Project Overview
Problem Statement
Poor data quality costs organizations an average of $12.9M annually. Without automated quality checks, data issues go undetected until they cause incorrect analytics, failed ML models, or bad business decisions.
Objectives
- Automate data quality validation at every pipeline stage
- Implement data contracts between producers and consumers
- Provide real-time quality monitoring and alerting
- Enable data quality scoring and SLA tracking
- Support data profiling and anomaly detection
Tech Stack
| Component | Technology | Purpose |
|---|---|---|
| Quality Framework | Great Expectations | Expectation management |
| Orchestration | Apache Airflow | Pipeline scheduling |
| Data Contracts | Pandera / Soda Core | Schema validation |
| Monitoring | DataHub / Grafana | Quality dashboards |
| Alerting | PagerDuty / Slack | Incident notifications |
Architecture Diagram
Data Source Setup and Schema
Data Quality Expectations
# expectations/quality_definitions.py
import great_expectations as gx
from great_expectations.core import ExpectationSuite
from great_expectations.core.batch import RuntimeBatchRequest
from typing import Dict, List, Any
from datetime import datetime
import logging
logging.basicConfig(level.INFO)
logger = logging.getLogger(__name__)
class DataQualityDefinitions:
def __init__(self, context):
self.context = context
def create_customer_expectations(self) -> ExpectationSuite:
suite = ExpectationSuite(expectation_suite_name="customer_data_quality")
# Schema expectations
suite.add_expectation(
gx.expectations.ExpectTableColumnsToMatchOrderedList(
column_list=[
"customer_id", "email", "first_name", "last_name",
"phone", "created_at", "updated_at", "status"
]
)
)
# Null checks
for col in ["customer_id", "email", "first_name", "last_name"]:
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column=col)
)
# Uniqueness
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="customer_id")
)
# Email format
suite.add_expectation(
gx.expectations.ExpectColumnValuesToMatchRegex(
column="email",
regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
)
)
# Status values
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeInSet(
column="status",
value_set=["active", "inactive", "suspended", "pending"]
)
)
# Record count minimum
suite.add_expectation(
gx.expectations.ExpectTableRowCountToBeBetween(
min_value=1000,
max_value=10000000
)
)
# Freshness
suite.add_expectation(
gx.expectations.ExpectColumnMaxToBeBetween(
column="updated_at",
min_value=datetime(2024, 1, 1),
max_value=datetime.now()
)
)
return suite
def create_order_expectations(self) -> ExpectationSuite:
suite = ExpectationSuite(expectation_suite_name="order_data_quality")
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="customer_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeGreaterThan(
column="total_amount",
min_value=0
)
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeInSet(
column="status",
value_set=["pending", "processing", "shipped", "delivered", "cancelled"]
)
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToMatchRegex(
column="order_number",
regex=r"^ORD-\d{8}-\d{4}$"
)
)
# Referential integrity
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeInSet(
column="customer_id",
value_set="SELECT customer_id FROM dim_customers"
)
)
return suite
def create_product_expectations(self) -> ExpectationSuite:
suite = ExpectationSuite(expectation_suite_name="product_data_quality")
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="product_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="product_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeGreaterThan(
column="price", min_value=0
)
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeGreaterThan(
column="stock_quantity", min_value=0
)
)
suite.add_expectation(
gx.expectations.ExpectColumnValueLengthsToBeBetween(
column="sku", min_value=3, max_value=50
)
)
return suite
def create_financial_expectations(self) -> ExpectationSuite:
suite = ExpectationSuite(expectation_suite_name="financial_data_quality")
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="transaction_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="transaction_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeGreaterThan(
column="amount", min_value=0
)
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToMatchRegex(
column="currency",
regex=r"^[A-Z]{3}$"
)
)
# Statistical expectations
suite.add_expectation(
gx.expectations.ExpectColumnMeanToBeBetween(
column="amount",
min_value=10,
max_value=10000
)
)
suite.add_expectation(
gx.expectations.ExpectColumnStdevToBeBetween(
column="amount",
min_value=0,
max_value=50000
)
)
return suite
Pandera Data Contracts
# contracts/data_contracts.py
import pandera as pa
from pandera import Column, Check, DataFrameSchema, Index
from typing import Optional
from datetime import datetime
class CustomerDataContract(DataFrameSchema):
customer_id = Column(str, unique=True, nullable=False)
email = Column(str, nullable=False, checks=[
Check.str_matches(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
])
first_name = Column(str, nullable=False, checks=[
Check.str_length(min_value=1, max_value=100)
])
last_name = Column(str, nullable=False, checks=[
Check.str_length(min_value=1, max_value=100)
])
phone = Column(str, nullable=True, checks=[
Check.str_matches(r"^\+?[\d\-\(\)]+$")
])
created_at = Column(pa.DateTime, nullable=False)
updated_at = Column(pa.DateTime, nullable=False)
status = Column(str, nullable=False, checks=[
Check.isin(["active", "inactive", "suspended", "pending"])
])
class Config:
strict = True
coerce = True
class OrderDataContract(DataFrameSchema):
order_id = Column(str, unique=True, nullable=False, checks=[
Check.str_matches(r"^ORD-\d{8}-\d{4}$")
])
customer_id = Column(str, nullable=False)
total_amount = Column(float, nullable=False, checks=[
Check.greater_than(0),
Check.less_than(1000000)
])
tax_amount = Column(float, nullable=False, checks=[
Check.greater_than_or_equal_to(0)
])
status = Column(str, nullable=False, checks=[
Check.isin(["pending", "processing", "shipped", "delivered", "cancelled"])
])
created_at = Column(pa.DateTime, nullable=False)
updated_at = Column(pa.DateTime, nullable=False)
item_count = Column(int, nullable=False, checks=[
Check.greater_than(0),
Check.less_than(100)
])
class Config:
strict = True
coerce = True
class ProductDataContract(DataFrameSchema):
product_id = Column(str, unique=True, nullable=False)
sku = Column(str, nullable=False, checks=[
Check.str_length(min_value=3, max_value=50),
Check.str_matches(r"^SKU-[A-Z]{3}-\d{4}$")
])
name = Column(str, nullable=False, checks=[
Check.str_length(min_value=1, max_value=255)
])
category = Column(str, nullable=False, checks=[
Check.isin(["Electronics", "Clothing", "Home", "Sports", "Books"])
])
price = Column(float, nullable=False, checks=[
Check.greater_than(0),
Check.less_than(100000)
])
cost = Column(float, nullable=True, checks=[
Check.greater_than_or_equal_to(0)
])
stock_quantity = Column(int, nullable=False, checks=[
Check.greater_than_or_equal_to(0)
])
class Config:
strict = True
coerce = True
class FinancialDataContract(DataFrameSchema):
transaction_id = Column(str, unique=True, nullable=False)
order_id = Column(str, nullable=False)
amount = Column(float, nullable=False, checks=[
Check.greater_than(0)
])
currency = Column(str, nullable=False, checks=[
Check.str_matches(r"^[A-Z]{3}$"),
Check.isin(["USD", "EUR", "GBP", "JPY", "CAD", "AUD"])
])
payment_method = Column(str, nullable=False, checks=[
Check.isin(["credit_card", "debit_card", "paypal", "bank_transfer"])
])
status = Column(str, nullable=False, checks=[
Check.isin(["pending", "completed", "failed", "refunded"])
])
created_at = Column(pa.DateTime, nullable=False)
class Config:
strict = True
coerce = True
Step-by-Step Implementation Guide
Step 1: Great Expectations Suite Manager
# quality/suite_manager.py
import great_expectations as gx
from great_expectations.core import ExpectationSuite
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.checkpoint import SimpleCheckpoint
from great_expectations.data_context.types.base import (
DataContextConfig,
DatasourceConfig,
FilesystemStoreBackendDefaults,
)
from typing import Dict, List, Optional, Tuple
from datetime import datetime
import json
import logging
logging.basicConfig(level.INFO)
logger = logging.getLogger(__name__)
class ExpectationSuiteManager:
def __init__(self, base_path: str = "./great_expectations"):
self.base_path = base_path
self.context = self._initialize_context()
self.suites: Dict[str, ExpectationSuite] = {}
def _initialize_context(self) -> gx.DataContext:
config = DataContextConfig(
config_version=3.0,
datasources={
"spark_datasource": DatasourceConfig(
class_name="SparkDFDatasource",
data_connector_config={
"default_runtime_data_connector_name": {
"class_name": "RuntimeDataConnector",
"batch_identifiers": ["default_identifier_name"],
}
},
),
"pandas_datasource": DatasourceConfig(
class_name="PandasDatasource",
data_connector_config={
"default_runtime_data_connector_name": {
"class_name": "RuntimeDataConnector",
"batch_identifiers": ["default_identifier_name"],
}
},
),
},
store_defaults=FilesystemStoreBackendDefaults(base_directory=self.base_path),
)
return gx.get_context(project_config=config)
def register_suite(self, suite_name: str, suite: ExpectationSuite):
self.context.add_expectation_suite(expectation_suite=suite)
self.suites[suite_name] = suite
logger.info(f"Registered expectation suite: {suite_name}")
def create_checkpoint(self, checkpoint_name: str, suite_names: List[str],
data_source_name: str = "pandas_datasource"):
checkpoint_config = {
"class_name": "Checkpoint",
"module_name": "great_expectations.checkpoint",
"run_name_template": "%Y%m%d-%H%M%S",
"expectation_suite_name": suite_names[0] if len(suite_names) == 1 else None,
"batch_request": {
"datasource_name": data_source_name,
"data_connector_name": "default_runtime_data_connector_name",
"data_asset_name": "default_asset_name",
},
"action_list": [
{
"name": "store_validation_result",
"action": {
"class_name": "StoreValidationResultAction",
"target_store_name": "expectations_store",
},
},
{
"name": "update_data_docs",
"action": {
"class_name": "UpdateDataDocsAction",
},
},
{
"name": "send_alert",
"action": {
"class_name": "SlackNotificationAction",
"slack_webhook": "${SLACK_WEBHOOK_URL}",
"notify_on": "all",
"renderer": {
"module_name": "great_expectations.render.renderer.slack_renderer",
"class_name": "SlackRenderer",
},
},
},
],
"site_name": "data_docs_site",
}
self.context.add_checkpoint(**checkpoint_config)
logger.info(f"Created checkpoint: {checkpoint_name}")
def run_validation(self, checkpoint_name: str, dataframe,
suite_name: str) -> Dict:
batch_request = RuntimeBatchRequest(
datasource_name="pandas_datasource",
data_connector_name="default_runtime_data_connector_name",
data_asset_name="default_asset_name",
runtime_parameters={"batch_data": dataframe},
batch_identifiers={"default_identifier_name": "default_identifier"},
)
result = self.context.run_checkpoint(
checkpoint_name=checkpoint_name,
batch_request=batch_request,
)
validation_result = {
"suite_name": suite_name,
"success": result.success,
"statistics": {
"evaluated_expectations": result.statistics["evaluated_expectations"],
"successful_expectations": result.statistics["successful_expectations"],
"unsuccessful_expectations": result.statistics["unsuccessful_expectations"],
"success_percent": result.statistics["success_percent"],
},
"timestamp": datetime.utcnow().isoformat(),
"results": []
}
for validation_result_item in result.run_results.values():
for res in validation_result_item["validation_result"]["results"]:
validation_result["results"].append({
"expectation_type": res["expectation_config"]["expectation_type"],
"success": res["success"],
"kwargs": res["expectation_config"]["kwargs"],
"result": res.get("result", {}),
})
return validation_result
def get_quality_score(self, validation_result: Dict) -> float:
stats = validation_result["statistics"]
return stats["success_percent"]
def generate_quality_report(self, results: List[Dict]) -> Dict:
report = {
"timestamp": datetime.utcnow().isoformat(),
"total_suites": len(results),
"passed_suites": sum(1 for r in results if r["success"]),
"failed_suites": sum(1 for r in results if not r["success"]),
"average_quality_score": sum(self.get_quality_score(r) for r in results) / len(results) if results else 0,
"suites": []
}
for result in results:
report["suites"].append({
"name": result["suite_name"],
"success": result["success"],
"quality_score": self.get_quality_score(result),
"failed_expectations": [
r for r in result["results"] if not r["success"]
],
})
return report
Step 2: Airflow Quality DAG
# dags/data_quality_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
from airflow.models import Variable
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
def run_quality_checks(**context):
import great_expectations as gx
import pandas as pd
import json
ti = context['ti']
table_name = context['params']['table_name']
connection_string = Variable.get('data_quality_db_connection')
engine = pd.read_sql_table(table_name, connection_string)
df = pd.read_sql_table(table_name, con=connection_string)
context_obj = gx.get_context()
checkpoint_name = f"{table_name}_quality_checkpoint"
batch_request = {
"datasource_name": "pandas_datasource",
"data_connector_name": "default_runtime_data_connector_name",
"data_asset_name": "default_asset_name",
"runtime_parameters": {"batch_data": df.to_json()},
"batch_identifiers": {"default_identifier_name": table_name}
}
result = context_obj.run_checkpoint(
checkpoint_name=checkpoint_name,
batch_request=batch_request,
)
quality_score = result.statistics['success_percent']
ti.xcom_push(key='quality_score', value=quality_score)
ti.xcom_push(key='validation_success', value=result.success)
ti.xcom_push(key='failed_expectations', value=[
r.expectation_config.expectation_type
for r in result.run_results.values()
for res in r['validation_result']['results']
if not res['success']
])
if quality_score < 95:
raise ValueError(f"Quality score {quality_score}% is below threshold 95%")
return result.success
def profile_data(**context):
import pandas as pd
from great_expectations.profile.basic_suite_builder_profiler import BasicSuiteBuilderProfiler
ti = context['ti']
table_name = context['params']['table_name']
connection_string = Variable.get('data_quality_db_connection')
df = pd.read_sql_table(table_name, con=connection_string)
context_obj = gx.get_context()
expectation_suite = context_obj.profile_data_asset(
df,
profiler=BasicSuiteBuilderProfiler,
expectation_suite_name=f"{table_name}_profiled_suite"
)
ti.xcom_push(key='profiled_suite', value=expectation_suite.to_json_dict())
return True
def check_schema_drift(**context):
import pandas as pd
import json
ti = context['ti']
table_name = context['params']['table_name']
connection_string = Variable.get('data_quality_db_connection')
df = pd.read_sql_table(table_name, con=connection_string)
current_schema = {col: str(dtype) for col, dtype in df.dtypes.items()}
reference_schema = json.loads(Variable.get(f'{table_name}_reference_schema'))
new_columns = set(current_schema.keys()) - set(reference_schema.keys())
removed_columns = set(reference_schema.keys()) - set(current_schema.keys())
type_changes = {
col: {"old": reference_schema[col], "new": current_schema[col]}
for col in set(current_schema.keys()) & set(reference_schema.keys())
if current_schema[col] != reference_schema[col]
}
drift_report = {
"new_columns": list(new_columns),
"removed_columns": list(removed_columns),
"type_changes": type_changes,
"has_drift": bool(new_columns or removed_columns or type_changes)
}
ti.xcom_push(key='schema_drift', value=drift_report)
if drift_report['has_drift']:
import logging
logging.warning(f"Schema drift detected for {table_name}: {drift_report}")
return drift_report
def generate_quality_dashboard(**context):
import json
ti = context['ti']
quality_scores = ti.xcom.pull(key='quality_score', task_ids='quality_checks.validate_data')
validation_success = ti.xcom.pull(key='validation_success', task_ids='quality_checks.validate_data')
failed_expectations = ti.xcom.pull(key='failed_expectations', task_ids='quality_checks.validate_data')
schema_drift = ti.xcom.pull(key='schema_drift', task_ids='schema_checks.check_drift')
dashboard_data = {
"execution_date": context['execution_date'].isoformat(),
"table": context['params']['table_name'],
"quality_score": quality_scores,
"validation_passed": validation_success,
"failed_expectations": failed_expectations,
"schema_drift": schema_drift,
"alerts": []
}
if quality_scores < 90:
dashboard_data["alerts"].append({
"severity": "critical",
"message": f"Quality score {quality_scores}% is critically low"
})
elif quality_scores < 95:
dashboard_data["alerts"].append({
"severity": "warning",
"message": f"Quality score {quality_scores}% is below target"
})
if schema_drift and schema_drift.get('has_drift'):
dashboard_data["alerts"].append({
"severity": "info",
"message": "Schema drift detected"
})
return dashboard_data
with DAG(
'data_quality_pipeline',
default_args=default_args,
description='Automated data quality validation pipeline',
schedule_interval='0 */4 * * *',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['data-quality', 'validation'],
params={
'table_names': ['customers', 'orders', 'products', 'transactions']
}
) as dag:
start_task = PythonOperator(
task_id='start_quality_pipeline',
python_callable=lambda: print("Starting data quality pipeline"),
)
for table_name in params['table_names']:
with TaskGroup(group_id=f'{table_name}_quality') as quality_group:
profile_task = PythonOperator(
task_id='profile_data',
python_callable=profile_data,
params={'table_name': table_name},
)
schema_check = PythonOperator(
task_id='check_drift',
python_callable=check_schema_drift,
params={'table_name': table_name},
)
validate_task = PythonOperator(
task_id='validate_data',
python_callable=run_quality_checks,
params={'table_name': table_name},
)
dashboard_task = PythonOperator(
task_id='update_dashboard',
python_callable=generate_quality_dashboard,
params={'table_name': table_name},
)
profile_task >> schema_check >> validate_task >> dashboard_task
end_task = PythonOperator(
task_id='end_quality_pipeline',
python_callable=lambda: print("Quality pipeline completed"),
)
start_task >> quality_groups >> end_task
Infrastructure Setup (Terraform)
# infrastructure/data_quality.tf
variable "environment" {
default = "production"
}
resource "aws_s3_bucket" "great_expectations" {
bucket = "great-expectations-${var.environment}"
}
resource "aws_s3_bucket_versioning" "great_expectations" {
bucket = aws_s3_bucket.great_expectations.id
versioning_configuration { status = "Enabled" }
}
resource "aws_s3_bucket_server_side_encryption_configuration" "great_expectations" {
bucket = aws_s3_bucket.great_expectations.id
rule {
apply_server_side_encryption_by_default { sse_algorithm = "aws:kms" }
}
}
resource "aws_s3_bucket" "quality_reports" {
bucket = "data-quality-reports-${var.environment}"
}
resource "aws_s3_bucket_lifecycle_configuration" "quality_reports" {
bucket = aws_s3_bucket.quality_reports.id
rule {
id = "archive-old-reports"
status = "Enabled"
transition { days = 30; storage_class = "STANDARD_IA" }
transition { days = 90; storage_class = "GLACIER" }
expiration { days = 365 }
}
}
resource "aws_mwaa_environment" "airflow" {
name = "data-quality-airflow-${var.environment}"
airflow_version = "2.7.0"
environment_class = "mw1.medium"
source_bucket_arn = aws_s3_bucket.airflow_dags.arn
dag_s3_path = "dags/"
execution_role_arn = aws_iam_role.airflow.arn
network_configuration {
security_group_ids = [aws_security_group.airflow.id]
subnet_ids = var.private_subnets
}
logging_configuration {
dag_processing_logs { enabled = true; log_level = "WARNING" }
scheduler_logs { enabled = true; log_level = "WARNING" }
task_logs { enabled = true; log_level = "INFO" }
webserver_logs { enabled = true; log_level = "WARNING" }
worker_logs { enabled = true; log_level = "WARNING" }
}
webserver_access_mode = "PRIVATE_ONLY"
max_workers = 10
min_workers = 2
environment_variables = {
GREAT_EXPECTATIONS_STORE_URL = "s3://${aws_s3_bucket.great_expectations.id}/expectations/"
QUALITY_REPORTS_BUCKET = aws_s3_bucket.quality_reports.id
SLACK_WEBHOOK_URL = var.slack_webhook_url
}
}
resource "aws_s3_bucket" "airflow_dags" {
bucket = "airflow-dags-${var.environment}"
}
resource "aws_rds_cluster" "quality_metadata" {
cluster_identifier = "quality-metadata-${var.environment}"
engine = "aurora-postgresql"
engine_version = "15.4"
database_name = "quality_metadata"
master_username = "admin"
master_password = var.db_password
db_subnet_group_name = aws_db_subnet_group.quality.name
vpc_security_group_ids = [aws_security_group.quality_db.id]
storage_encrypted = true
skip_final_snapshot = var.environment != "production"
}
resource "aws_rds_cluster_instance" "quality_metadata" {
count = var.environment == "production" ? 2 : 1
identifier = "quality-metadata-${count.index}"
cluster_identifier = aws_rds_cluster.quality_metadata.id
instance_class = "db.r6g.large"
}
resource "aws_cloudwatch_metric_alarm" "quality_score_low" {
alarm_name = "data-quality-score-low"
comparison_operator = "LessThanThreshold"
evaluation_periods = 2
metric_name = "DataQualityScore"
namespace = "DataQuality"
period = 300
statistic = "Average"
threshold = 90
alarm_description = "Data quality score dropped below 90%"
alarm_actions = [aws_sns_topic.quality_alerts.arn]
}
resource "aws_sns_topic" "quality_alerts" {
name = "data-quality-alerts-${var.environment}"
}
output "airflow_webserver_url" {
value = aws_mwaa_environment.airflow.webserver_url
}
output "quality_reports_bucket" {
value = aws_s3_bucket.quality_reports.bucket
}
Testing and Validation
# tests/test_data_quality.py
import pytest
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
class TestDataQualityPipeline:
@pytest.fixture
def valid_customers(self):
return pd.DataFrame({
'customer_id': [f'CUST-{i:06d}' for i in range(100)],
'email': [f'user{i}@example.com' for i in range(100)],
'first_name': [f'First{i}' for i in range(100)],
'last_name': [f'Last{i}' for i in range(100)],
'phone': [f'+1-555-{i:04d}' for i in range(100)],
'created_at': [datetime.now() - timedelta(days=i) for i in range(100)],
'updated_at': [datetime.now() for _ in range(100)],
'status': ['active'] * 80 + ['inactive'] * 20
})
@pytest.fixture
def invalid_customers(self):
return pd.DataFrame({
'customer_id': [f'CUST-{i:06d}' for i in range(50)] + [None] * 5,
'email': [f'user{i}@example.com' for i in range(45)] + ['invalid'] * 10,
'first_name': [f'First{i}' for i in range(50)] + [None] * 5,
'last_name': [f'Last{i}' for i in range(50)] + [None] * 5,
'phone': [f'+1-555-{i:04d}' for i in range(55)],
'created_at': [datetime.now() for _ in range(55)],
'updated_at': [datetime.now() for _ in range(55)],
'status': ['active'] * 40 + ['invalid_status'] * 15
})
def test_valid_customer_expectations(self, valid_customers):
from contracts.data_contracts import CustomerDataContract
schema = CustomerDataContract()
validated = schema.validate(valid_customers)
assert len(validated) == len(valid_customers)
def test_invalid_customer_catches_errors(self, invalid_customers):
from contracts.data_contracts import CustomerDataContract
schema = CustomerDataContract()
with pytest.raises(pa.errors.SchemaErrors):
schema.validate(invalid_customers, lazy=True)
def test_quality_score_calculation(self):
total_expectations = 20
successful_expectations = 18
quality_score = (successful_expectations / total_expectations) * 100
assert quality_score == 90.0
def test_schema_drift_detection(self):
reference_schema = {'col1': 'int64', 'col2': 'object', 'col3': 'float64'}
current_schema = {'col1': 'int64', 'col2': 'object', 'col4': 'bool'}
new_cols = set(current_schema.keys()) - set(reference_schema.keys())
removed_cols = set(reference_schema.keys()) - set(current_schema.keys())
assert 'col4' in new_cols
assert 'col3' in removed_cols
def test_anomaly_detection(self):
np.random.seed(42)
values = np.random.normal(100, 10, 1000).tolist()
values.append(500) # Anomaly
mean = np.mean(values[:-1])
std = np.std(values[:-1])
threshold = 3 * std
is_anomaly = abs(values[-1] - mean) > threshold
assert is_anomaly is True
def test_data_freshness_check(self):
last_updated = datetime.now() - timedelta(hours=2)
max_age_hours = 4
is_fresh = (datetime.now() - last_updated).total_seconds() / 3600 < max_age_hours
assert is_fresh is True
def test_completeness_check(self):
df = pd.DataFrame({'a': [1, None, 3], 'b': [4, 5, None]})
completeness = 1 - df.isnull().sum().sum() / (df.shape[0] * df.shape[1])
assert completeness < 1.0
def test_uniqueness_check(self):
values = [1, 2, 3, 4, 5, 5]
is_unique = len(values) == len(set(values))
assert is_unique is False
def test_quality_report_generation(self):
results = [
{'suite': 'customers', 'success': True, 'score': 99.5},
{'suite': 'orders', 'success': False, 'score': 85.0},
{'suite': 'products', 'success': True, 'score': 97.0}
]
report = {
'total_suites': len(results),
'passed': sum(1 for r in results if r['success']),
'failed': sum(1 for r in results if not r['success']),
'avg_score': sum(r['score'] for r in results) / len(results)
}
assert report['total_suites'] == 3
assert report['passed'] == 2
assert report['failed'] == 1
assert report['avg_score'] > 90
Cost Analysis
Monthly Cost Breakdown (Production)
| Component | Specification | Monthly Cost |
|---|---|---|
| MWAA (Airflow) | mw1.medium, 2 workers | $400 |
| RDS Aurora | db.r6g.large, Multi-AZ | $350 |
| S3 Storage | 500GB expectations + reports | $15 |
| CloudWatch | Logs and metrics | $50 |
| SNS | Alert notifications | $5 |
| Data Transfer | Cross-AZ | $30 |
| Total | $850 |
Cost Optimization Strategies
💡
Tip: Optimize data quality costs:
- Selective Validation: Only validate critical tables at high frequency
- Sampling: Use statistical sampling for large datasets
- Caching: Cache validation results to avoid redundant runs
- Right-sizing: Scale MWAA workers based on DAG complexity
- S3 Lifecycle: Archive old quality reports automatically
ROI Analysis
| Metric | Before Quality Pipeline | After Quality Pipeline | Improvement |
|---|---|---|---|
| Data Issues Detected | Manual (sporadic) | Automated (continuous) | 100% coverage |
| Time to Detection | Days/weeks | Minutes | 100-1000x faster |
| Data Downtime | 8 hours/month | 30 minutes/month | 16x reduction |
| Cost of Bad Data | 5K/month | 90% reduction |
Interview Talking Points
Architecture Decisions
ℹ️
Best Practice: Focus on these data quality concepts in interviews:
-
Why Great Expectations over custom validators?
- Declarative expectation syntax
- Built-in data documentation
- Integration with Airflow and Spark
- Community-maintained expectations
-
Why data contracts?
- Formal agreement between producers and consumers
- Prevents breaking changes
- Enables schema evolution
- Improves data reliability
-
How to handle quality failures?
- Alert immediately for critical failures
- Quarantine bad data in staging
- Auto-retry with backoff for transient issues
- Escalation path for persistent failures
Common Interview Questions
Q: "How do you balance quality checks with pipeline performance?"
quality_optimization = {
"Tiered Checks": "Critical checks at every stage, comprehensive at end",
"Sampling": "Full validation on sample, lightweight on all",
"Async Validation": "Non-blocking checks with async alerts",
"Incremental": "Only validate changed data since last check"
}
Q: "How do you track data quality over time?"
quality_tracking = {
"Quality Score": "Percentage of passing expectations per dataset",
"SLA Monitoring": "Track freshness, completeness, accuracy metrics",
"Trend Analysis": "Quality score trends over time",
"Root Cause": "Correlate quality drops with pipeline changes"
}
Q: "How do you implement data contracts at scale?"
contract_strategy = {
"Schema Registry": "Central schema store with versioning",
"CI/CD Integration": "Validate contracts in pull requests",
"Runtime Enforcement": "Block bad data at ingestion",
"Consumer Validation": "Consumers verify contract compliance"
}
Deployment Checklist
- Set up Great Expectations context and stores
- Create expectation suites for all critical tables
- Configure Pandera data contracts
- Deploy Airflow quality DAGs
- Set up quality monitoring dashboards
- Configure alerting thresholds and channels
- Test quality checks with sample bad data
- Document quality SLAs for each dataset
- Train teams on quality expectations
- Establish quality review process
⚠️
Warning: Start with a few critical tables and expand gradually. Too many expectations too quickly can overwhelm teams and reduce adoption.
This project demonstrates data quality engineering skills and is highly relevant for data engineering interviews at companies with strict data governance requirements.