15. Data Quality in PySpark

Free Lesson

Advertisement

15. Data Quality in PySpark

DfData Quality

Data quality measures how well data fits its intended purpose across dimensions: accuracy, completeness, consistency, timeliness, validity, and uniqueness. Poor data quality leads to incorrect analytics, failed pipelines, and unreliable ML models.

DfAnomaly Detection

Anomaly detection identifies data points that deviate significantly from expected patterns. In PySpark, this is implemented via statistical methods (z-score, IQR), isolation forests, or rule-based validation.

Zi=fracXiβˆ’musigmaZ_i = \\frac{X_i - \\mu}{\\sigma}

Data Quality Score

DQ=fracNvalidNtotaltimes100DQ = \\frac{N_{valid}}{N_{total}} \\times 100\\%

Here,

  • DQDQ=Data quality score (percentage of valid records)
  • NvalidN_{valid}=Number of records passing all validation rules
  • NtotalN_{total}=Total number of records

Use Great Expectations or Deequ for production data quality checks. These libraries provide declarative validation, automated profiling, and anomaly detection with PySpark integration.

Implement data quality checks at pipeline ingestion (before processing) to prevent corrupt data from propagating downstream. Use quarantine tables for records that fail validation.

ThIQR Outlier Detection

Theorem: A data point X is considered an outlier using the IQR method if X < Q_1 - 1.5 Γ— IQR or X > Q_3 + 1.5 Γ— IQR, where Q_1 and Q_3 are the first and third quartiles, and IQR = Q_3 - Q_1. This method is robust to non-normal distributions.

  • Data quality dimensions: accuracy, completeness, consistency, timeliness, validity, uniqueness
  • Anomaly detection: z-score (normal distributions), IQR (robust), isolation forests (complex patterns)
  • DQ score = valid_records / total_records; target > 99% for production data
  • Validate at ingestion; quarantine invalid records; monitor DQ metrics over time

πŸ—οΈ Data Quality Framework Architecture

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    DATA QUALITY FRAMEWORK ARCHITECTURE                   β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    DATA QUALITY LAYERS                           β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚   β”‚
β”‚  β”‚  β”‚  Layer 1: VALIDATION                                     β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  β€’ Schema validation                                    β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  β€’ Data type checking                                   β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  β€’ Null/empty detection                                 β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  β€’ Business rule validation                             β”‚    β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚   β”‚
β”‚  β”‚                         β”‚                                        β”‚   β”‚
β”‚  β”‚                         β–Ό                                        β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚   β”‚
β”‚  β”‚  β”‚  Layer 2: PROFILING                                      β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  β€’ Statistical analysis                                 β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  β€’ Distribution analysis                                β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  β€’ Correlation analysis                                 β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  β€’ Pattern detection                                    β”‚    β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚   β”‚
β”‚  β”‚                         β”‚                                        β”‚   β”‚
β”‚  β”‚                         β–Ό                                        β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚   β”‚
β”‚  β”‚  β”‚  Layer 3: MONITORING                                     β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  β€’ Real-time metrics                                    β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  β€’ Alerting                                             β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  β€’ Dashboards                                           β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  β€’ Trend analysis                                       β”‚    β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚   β”‚
β”‚  β”‚                         β”‚                                        β”‚   β”‚
β”‚  β”‚                         β–Ό                                        β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚   β”‚
β”‚  β”‚  β”‚  Layer 4: REMEDIATION                                    β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  β€’ Data cleansing                                       β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  β€’ Imputation                                           β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  β€’ Filtering                                            β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  β€’ Quarantine                                           β”‚    β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    QUALITY METRICS PIPELINE                      β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  Data ──▢ Validate ──▢ Profile ──▢ Monitor ──▢ Remediate        β”‚   β”‚
β”‚  β”‚    β”‚         β”‚           β”‚           β”‚             β”‚              β”‚   β”‚
β”‚  β”‚    β–Ό         β–Ό           β–Ό           β–Ό             β–Ό              β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”           β”‚   β”‚
β”‚  β”‚  β”‚Raw  β”‚  β”‚Pass/β”‚    β”‚Statsβ”‚    β”‚Alertβ”‚      β”‚Cleanβ”‚           β”‚   β”‚
β”‚  β”‚  β”‚Data β”‚  β”‚Fail β”‚    β”‚     β”‚    β”‚     β”‚      β”‚Data β”‚           β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”˜           β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ” Anomaly Detection Diagram

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    ANOMALY DETECTION METHODS                             β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    STATISTICAL METHODS                           β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β”‚
β”‚  β”‚  β”‚  Z-Score        β”‚  β”‚  IQR Method     β”‚  β”‚  Percentile     β”‚  β”‚   β”‚
β”‚  β”‚  β”‚                 β”‚  β”‚                 β”‚  β”‚                 β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  |x - ΞΌ| > 3Οƒ   β”‚  β”‚  < Q1-1.5*IQR   β”‚  β”‚  < 1st or       β”‚  β”‚   β”‚
β”‚  β”‚  β”‚                 β”‚  β”‚  or > Q3+1.5*IQR β”‚  β”‚  > 99th pctile  β”‚  β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  Data Distribution:                                              β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚   β”‚
β”‚  β”‚  β”‚      Normal   β”‚  Anomaly β”‚   Normal   β”‚  Anomaly       β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  β–β–‚β–ƒβ–„β–…β–†β–‡β–ˆβ–‡β–†β–…β–„β–ƒβ–‚β–β–‚β–ƒβ–„β–…β–†β–‡β–ˆβ–‡β–†β–…β–„β–ƒβ–‚β–β–‚β–ƒβ–„β–…β–†β–‡β–ˆβ–‡β–†β–…β–„β–ƒβ–‚β–       β”‚    β”‚   β”‚
β”‚  β”‚  β”‚               β”‚  ◄─────►│            β”‚  ◄─────►        β”‚    β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    MACHINE LEARNING METHODS                      β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β”‚
β”‚  β”‚  β”‚  Isolation      β”‚  β”‚  DBSCAN         β”‚  β”‚  Autoencoder    β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  Forest         β”‚  β”‚  Clustering     β”‚  β”‚  Neural Net     β”‚  β”‚   β”‚
β”‚  β”‚  β”‚                 β”‚  β”‚                 β”‚  β”‚                 β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  Isolates       β”‚  β”‚  Groups dense   β”‚  β”‚  Learns normal  β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  anomalies      β”‚  β”‚  regions, noise β”‚  β”‚  patterns,      β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  efficiently    β”‚  β”‚  = anomalies    β”‚  β”‚  flags outliers β”‚  β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  Comparison:                                                    β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚   β”‚
β”‚  β”‚  β”‚  Method          β”‚ Speed  β”‚ Accuracy β”‚ Scalability     β”‚    β”‚   β”‚
β”‚  β”‚  │──────────────────┼────────┼──────────┼─────────────────│    β”‚   β”‚
β”‚  β”‚  β”‚  Z-Score         β”‚ Fast   β”‚ Medium   β”‚ High            β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  IQR             β”‚ Fast   β”‚ Medium   β”‚ High            β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  Isolation Forestβ”‚ Medium β”‚ High     β”‚ Medium          β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  DBSCAN          β”‚ Slow   β”‚ High     β”‚ Low             β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  Autoencoder     β”‚ Slow   β”‚ Very Highβ”‚ Low             β”‚    β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    RULE-BASED METHODS                            β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚   β”‚
β”‚  β”‚  β”‚  Business Rules                                          β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  β€’ Value ranges (age: 0-150)                            β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  β€’ Format patterns (email regex)                        β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  β€’ Referential integrity (foreign keys)                 β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  β€’ Temporal constraints (start < end)                   β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  β€’ Cross-field validation (salary > 0 if employed)      β”‚    β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ“Š Data Profiling Pipeline

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    DATA PROFILING PIPELINE                               β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                         β”‚
β”‚  Input Data                                                            β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”      β”‚   β”‚
β”‚  β”‚  β”‚ id  β”‚name β”‚emailβ”‚ age β”‚sal  β”‚dept β”‚date β”‚stat β”‚null β”‚      β”‚   β”‚
β”‚  β”‚  β”œβ”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€      β”‚   β”‚
β”‚  β”‚  β”‚ 1   β”‚Aliceβ”‚a@b  β”‚ 25  β”‚50000β”‚ IT  β”‚2024 β”‚ A   β”‚ nullβ”‚      β”‚   β”‚
β”‚  β”‚  β”‚ 2   β”‚Bob  β”‚b@c  β”‚ 30  β”‚60000β”‚ HR  β”‚2024 β”‚ B   β”‚ val β”‚      β”‚   β”‚
β”‚  β”‚  β”‚ 3   β”‚null β”‚null β”‚ -5  β”‚null β”‚ IT  β”‚2024 β”‚ A   β”‚ nullβ”‚      β”‚   β”‚
β”‚  β”‚  β”‚ ... β”‚...  β”‚...  β”‚ ... β”‚...  β”‚...  β”‚...  β”‚...  β”‚... β”‚      β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”˜      β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                              β”‚                                          β”‚
β”‚                              β–Ό                                          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    PROFILING STAGES                              β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  Stage 1: SCHEMA PROFILING                                      β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚   β”‚
β”‚  β”‚  β”‚  Column    β”‚ Type    β”‚ Nullable β”‚ Unique β”‚ Pattern      β”‚    β”‚   β”‚
β”‚  β”‚  │────────────┼─────────┼──────────┼────────┼──────────────│    β”‚   β”‚
β”‚  β”‚  β”‚  id        β”‚ INT     β”‚ No       β”‚ 100%   β”‚ Numeric      β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  name      β”‚ STRING  β”‚ Yes      β”‚ 95%    β”‚ Text         β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  email     β”‚ STRING  β”‚ Yes      β”‚ 98%    β”‚ Email regex  β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  age       β”‚ INT     β”‚ Yes      β”‚ 85%    β”‚ Range 0-150  β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  salary    β”‚ DOUBLE  β”‚ Yes      β”‚ 90%    β”‚ Positive num β”‚    β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  Stage 2: STATISTICAL PROFILING                                 β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚   β”‚
β”‚  β”‚  β”‚  Column    β”‚ Mean  β”‚ StdDev β”‚ Min   β”‚ Max   β”‚ Skewness  β”‚    β”‚   β”‚
β”‚  β”‚  │────────────┼───────┼────────┼───────┼───────┼───────────│    β”‚   β”‚
β”‚  β”‚  β”‚  age       β”‚ 32.5  β”‚ 8.2    β”‚ -5    β”‚ 120   β”‚ 0.8       β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  salary    β”‚ 55000 β”‚ 12000  β”‚ -1000 β”‚ 200K  β”‚ 1.2       β”‚    β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  Stage 3: QUALITY SCORING                                       β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚   β”‚
β”‚  β”‚  β”‚  Dimension        β”‚ Score β”‚ Status  β”‚ Recommendation   β”‚    β”‚   β”‚
β”‚  β”‚  │───────────────────┼───────┼─────────┼──────────────────│    β”‚   β”‚
β”‚  β”‚  β”‚  Completeness     β”‚ 85%   β”‚ Warning β”‚ Handle nulls     β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  Validity         β”‚ 92%   β”‚ Good    β”‚ Minor fixes      β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  Consistency      β”‚ 78%   β”‚ Poor    β”‚ Investigate      β”‚    β”‚   β”‚
β”‚  β”‚  β”‚  Timeliness       β”‚ 95%   β”‚ Excellentβ”‚ No action        β”‚    β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ“š Detailed Explanation

Data quality is a critical aspect of any data pipeline, ensuring that data is accurate, complete, consistent, and reliable for downstream consumption. In PySpark, data quality encompasses validation, profiling, monitoring, and remediation activities that collectively ensure data meets the required standards for analytics and decision-making.

Data validation is the first line of defense against poor data quality. It involves checking data against predefined rules and constraints to identify invalid or inconsistent records. Validation rules can be simple (e.g., not null, data type matching) or complex (e.g., cross-field validation, business rule compliance). PySpark provides extensive APIs for implementing validation logic, including built-in functions for null handling, data type conversion, and pattern matching.

Data profiling provides a comprehensive understanding of data characteristics. It involves analyzing statistical properties, distributions, patterns, and relationships within the data. Profiling helps identify data quality issues that may not be apparent through simple validation, such as skewed distributions, unusual patterns, or correlation anomalies. PySpark's distributed processing capabilities make it feasible to profile large datasets efficiently.

Anomaly detection identifies unusual patterns or outliers in the data that may indicate quality issues or interesting events. Statistical methods like Z-score and IQR are simple and effective for many use cases, while machine learning approaches like Isolation Forest and autoencoders can capture more complex patterns. The choice of method depends on the data characteristics, performance requirements, and the nature of anomalies being detected.

Data quality monitoring provides continuous visibility into data quality metrics over time. It involves tracking key quality dimensions (completeness, validity, consistency, timeliness) and setting up alerts for quality degradation. Effective monitoring requires defining quality thresholds, establishing baselines, and implementing automated alerting mechanisms.

Remediation addresses identified quality issues through various techniques: data cleansing (correcting errors), imputation (filling missing values), filtering (removing invalid records), and quarantine (isolating problematic data for investigation). The choice of remediation strategy depends on the nature of the quality issue, the importance of the data, and the downstream requirements.

Quality scoring provides a quantitative measure of data quality, enabling comparison across datasets and tracking improvements over time. Quality scores are typically computed as weighted averages of individual quality dimensions, with weights reflecting the relative importance of each dimension.

Best practices for data quality include: defining clear quality requirements upfront, implementing validation at multiple pipeline stages, profiling data regularly, monitoring quality metrics continuously, establishing remediation workflows, and documenting quality issues and their resolutions.

Advanced techniques include data contracts (formal agreements on data quality expectations), data observability (comprehensive monitoring of data health), and automated quality testing (continuous validation of data against expectations). These techniques are essential for maintaining data quality in complex, evolving data ecosystems.

πŸ“Š Key Concepts Table

DimensionDescriptionMeasurementTarget
CompletenessPercentage of non-null valuesCount non-null / total> 95%
ValidityPercentage of values passing validation rulesCount valid / total> 98%
ConsistencyAgreement across related data sourcesCross-source comparison> 90%
TimelinessData freshness and availabilityTime since last update< 1 hour
AccuracyCorrectness of valuesComparison with ground truth> 99%
UniquenessAbsence of duplicate recordsCount distinct / total> 99%

πŸ’» Code Examples

Basic Data Validation

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("DataValidation") \
    .getOrCreate()

# Define validation rules
validation_rules = {
    "id": {"not_null": True, "unique": True, "type": "integer"},
    "name": {"not_null": True, "min_length": 2, "max_length": 100},
    "email": {"not_null": True, "pattern": r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"},
    "age": {"not_null": True, "min": 0, "max": 150},
    "salary": {"not_null": True, "min": 0, "max": 1000000}
}

# Read data
df = spark.read.json("/path/to/data.json")

# Apply validation rules
validated_df = df.withColumn(
    "is_valid",
    when(
        (col("id").isNotNull()) &
        (col("name").isNotNull()) & (length(col("name")) >= 2) &
        (col("email").isNotNull()) & (col("email").rlike(r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")) &
        (col("age").isNotNull()) & (col("age").between(0, 150)) &
        (col("salary").isNotNull()) & (col("salary").between(0, 1000000)),
        lit(True)
    ).otherwise(lit(False))
)

# Separate valid and invalid records
valid_df = validated_df.filter(col("is_valid") == True).drop("is_valid")
invalid_df = validated_df.filter(col("is_valid") == False).drop("is_valid")

# Show validation results
print(f"Valid records: {valid_df.count()}")
print(f"Invalid records: {invalid_df.count()}")

# Show invalid records for investigation
invalid_df.show(10)

Data Profiling

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("DataProfiling") \
    .getOrCreate()

# Read data
df = spark.read.json("/path/to/data.json")

# Basic profiling
def profile_dataframe(df):
    """Generate comprehensive profile of a DataFrame"""
    
    # Get schema information
    schema_info = []
    for field in df.schema.fields:
        col_name = field.name
        col_type = str(field.dataType)
        
        # Count nulls
        null_count = df.filter(col(col_name).isNull()).count()
        total_count = df.count()
        null_percentage = (null_count / total_count) * 100 if total_count > 0 else 0
        
        # Count distinct values
        distinct_count = df.select(col_name).distinct().count()
        distinct_percentage = (distinct_count / total_count) * 100 if total_count > 0 else 0
        
        # For numeric columns, compute statistics
        if col_type in ["IntegerType", "LongType", "DoubleType", "FloatType"]:
            stats = df.select(
                mean(col_name).alias("mean"),
                stddev(col_name).alias("stddev"),
                min(col_name).alias("min"),
                max(col_name).alias("max")
            ).collect()[0]
            
            schema_info.append({
                "column": col_name,
                "type": col_type,
                "null_count": null_count,
                "null_percentage": round(null_percentage, 2),
                "distinct_count": distinct_count,
                "distinct_percentage": round(distinct_percentage, 2),
                "mean": round(stats["mean"], 2) if stats["mean"] else None,
                "stddev": round(stats["stddev"], 2) if stats["stddev"] else None,
                "min": stats["min"],
                "max": stats["max"]
            })
        else:
            schema_info.append({
                "column": col_name,
                "type": col_type,
                "null_count": null_count,
                "null_percentage": round(null_percentage, 2),
                "distinct_count": distinct_count,
                "distinct_percentage": round(distinct_percentage, 2),
                "mean": None,
                "stddev": None,
                "min": None,
                "max": None
            })
    
    return schema_info

# Generate profile
profile = profile_dataframe(df)

# Display profile
for col_info in profile:
    print(f"\nColumn: {col_info['column']}")
    print(f"  Type: {col_info['type']}")
    print(f"  Null Count: {col_info['null_count']} ({col_info['null_percentage']}%)")
    print(f"  Distinct Count: {col_info['distinct_count']} ({col_info['distinct_percentage']}%)")
    if col_info['mean'] is not None:
        print(f"  Mean: {col_info['mean']}")
        print(f"  StdDev: {col_info['stddev']}")
        print(f"  Min: {col_info['min']}")
        print(f"  Max: {col_info['max']}")

Anomaly Detection with Z-Score

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("AnomalyDetection") \
    .getOrCreate()

# Read data
df = spark.read.json("/path/to/data.json")

# Z-Score anomaly detection
def detect_anomalies_zscore(df, column, threshold=3.0):
    """Detect anomalies using Z-score method"""
    
    # Calculate mean and standard deviation
    stats = df.select(
        mean(column).alias("mean"),
        stddev(column).alias("stddev")
    ).collect()[0]
    
    mean_val = stats["mean"]
    stddev_val = stats["stddev"]
    
    if stddev_val == 0:
        # No variation in data
        return df.withColumn("z_score", lit(0.0)).withColumn("is_anomaly", lit(False))
    
    # Calculate Z-score and flag anomalies
    result_df = df.withColumn(
        "z_score",
        (col(column) - mean_val) / stddev_val
    ).withColumn(
        "is_anomaly",
        when(abs(col("z_score")) > threshold, True).otherwise(False)
    )
    
    return result_df

# Detect anomalies in salary column
anomalies_df = detect_anomalies_zscore(df, "salary", threshold=3.0)

# Show anomalies
anomalies_df.filter(col("is_anomaly") == True).show(10)

# Summary of anomalies
anomaly_count = anomalies_df.filter(col("is_anomaly") == True).count()
total_count = anomalies_df.count()
print(f"Anomalies detected: {anomaly_count} out of {total_count} records")
print(f"Anomaly percentage: {(anomaly_count / total_count) * 100:.2f}%")

Data Quality Scoring

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("DataQualityScoring") \
    .getOrCreate()

# Read data
df = spark.read.json("/path/to/data.json")

# Define quality dimensions and weights
quality_config = {
    "completeness": {
        "weight": 0.3,
        "columns": ["id", "name", "email", "age", "salary"]
    },
    "validity": {
        "weight": 0.3,
        "rules": {
            "age": lambda col: col.between(0, 150),
            "salary": lambda col: col > 0,
            "email": lambda col: col.rlike(r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")
        }
    },
    "uniqueness": {
        "weight": 0.2,
        "columns": ["id"]
    },
    "consistency": {
        "weight": 0.2,
        "rules": {
            "name_length": lambda col: length(col) >= 2
        }
    }
}

# Calculate completeness score
def calculate_completeness(df, columns):
    """Calculate completeness score based on non-null values"""
    total_records = df.count()
    if total_records == 0:
        return 0.0
    
    completeness_scores = []
    for col_name in columns:
        non_null_count = df.filter(col(col_name).isNotNull()).count()
        completeness_scores.append(non_null_count / total_records)
    
    return sum(completeness_scores) / len(completeness_scores)

# Calculate validity score
def calculate_validity(df, rules):
    """Calculate validity score based on validation rules"""
    total_records = df.count()
    if total_records == 0:
        return 0.0
    
    validity_scores = []
    for col_name, rule_func in rules.items():
        valid_count = df.filter(rule_func(col(col_name))).count()
        validity_scores.append(valid_count / total_records)
    
    return sum(validity_scores) / len(validity_scores)

# Calculate uniqueness score
def calculate_uniqueness(df, columns):
    """Calculate uniqueness score based on distinct values"""
    total_records = df.count()
    if total_records == 0:
        return 0.0
    
    uniqueness_scores = []
    for col_name in columns:
        distinct_count = df.select(col_name).distinct().count()
        uniqueness_scores.append(distinct_count / total_records)
    
    return sum(uniqueness_scores) / len(uniqueness_scores)

# Calculate overall quality score
def calculate_quality_score(df, config):
    """Calculate overall data quality score"""
    
    completeness_score = calculate_completeness(df, config["completeness"]["columns"])
    validity_score = calculate_validity(df, config["validity"]["rules"])
    uniqueness_score = calculate_uniqueness(df, config["uniqueness"]["columns"])
    
    # For consistency, we'll use a simple rule
    consistency_score = 0.95  # Placeholder - implement actual consistency checks
    
    # Calculate weighted average
    overall_score = (
        completeness_score * config["completeness"]["weight"] +
        validity_score * config["validity"]["weight"] +
        uniqueness_score * config["uniqueness"]["weight"] +
        consistency_score * config["consistency"]["weight"]
    )
    
    return {
        "completeness": completeness_score,
        "validity": validity_score,
        "uniqueness": uniqueness_score,
        "consistency": consistency_score,
        "overall": overall_score
    }

# Calculate quality scores
quality_scores = calculate_quality_score(df, quality_config)

# Display results
print("Data Quality Scores:")
print(f"  Completeness: {quality_scores['completeness']:.2%}")
print(f"  Validity: {quality_scores['validity']:.2%}")
print(f"  Uniqueness: {quality_scores['uniqueness']:.2%}")
print(f"  Consistency: {quality_scores['consistency']:.2%}")
print(f"  Overall: {quality_scores['overall']:.2%}")

πŸ“ˆ Performance Metrics

MetricTargetWarningCriticalOptimization
Validation Pass Rate> 98%95-98%< 95%Review validation rules
Profiling Time< 5min5-15min> 15minSample data, optimize queries
Anomaly Detection Accuracy> 90%80-90%< 80%Tune thresholds, add features
Quality Score> 95%90-95%< 90%Address quality issues
Remediation Success Rate> 95%90-95%< 90%Improve remediation logic

πŸ† Best Practices

  1. Define quality requirements upfront - Establish clear expectations for data quality
  2. Implement validation at multiple stages - Validate data at ingestion, processing, and output
  3. Profile data regularly - Schedule profiling jobs to detect quality changes
  4. Monitor quality metrics continuously - Set up dashboards and alerts for quality degradation
  5. Establish remediation workflows - Define processes for handling quality issues
  6. Document quality issues - Maintain records of quality problems and their resolutions
  7. Use automated testing - Implement continuous validation of data quality
  8. Balance quality with performance - Optimize quality checks for efficiency
  9. Involve stakeholders - Ensure quality requirements align with business needs
  10. Continuously improve - Regularly review and enhance quality processes

πŸ”— Related Topics

  • 11-structured-streaming.mdx: Data quality in streaming pipelines
  • 16-schema-evolution.mdx: Schema validation and evolution
  • 20-monitoring-metrics.mdx: Monitoring data quality metrics
  • 14-merge-upsert.mdx: Data quality during merge operations

See Also

  • Kafka Streams (kafka/03): Data validation in Kafka message processing
  • Data Engineering Streaming (data-engineering/022): Data quality monitoring in streaming pipelines

Advertisement

Need Expert PySpark Help?

Get personalized Spark optimization, cluster tuning, or production data pipeline consulting.

Advertisement