Data Mesh Architecture with PySpark

Free Lesson

Advertisement

Data Mesh Architecture with PySpark

Architecture Diagram: Data Mesh Conceptual Model

Architecture Diagram
╔══════════════════════════════════════════════════════════════════════════════════════════════╗
β•‘                          DATA MESH ARCHITECTURE (Conceptual)                                  β•‘
╠══════════════════════════════════════════════════════════════════════════════════════════════╣
β•‘                                                                                              β•‘
β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β•‘
β•‘  β”‚                    FOUR FOUNDATIONAL PRINCIPLES                                         β”‚  β•‘
β•‘  β”‚                                                                                        β”‚  β•‘
β•‘  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚  β•‘
β•‘  β”‚  β”‚   1. DOMAIN      β”‚  β”‚   2. DATA AS     β”‚  β”‚   3. SELF-SERVE  β”‚  β”‚  4. FEDERATED  β”‚ β”‚  β•‘
β•‘  β”‚  β”‚   ORIENTED       β”‚  β”‚   A PRODUCT      β”‚  β”‚   DATA           β”‚  β”‚  COMPUTATIONAL  β”‚ β”‚  β•‘
β•‘  β”‚  β”‚                  β”‚  β”‚                  β”‚  β”‚   PLATFORM       β”‚  β”‚  GOVERNANCE     β”‚ β”‚  β•‘
β•‘  β”‚  β”‚  β€’ Business      β”‚  β”‚  β€’ Discoverable  β”‚  β”‚                  β”‚  β”‚                β”‚ β”‚  β•‘
β•‘  β”‚  β”‚    domains own   β”‚  β”‚  β€’ Addressable   β”‚  β”‚  β€’ Infrastructureβ”‚  β”‚  β€’ Global      β”‚ β”‚  β•‘
β•‘  β”‚  β”‚    their data    β”‚  β”‚  β€’ Trustworthy   β”‚  β”‚    as Code       β”‚  β”‚    standards   β”‚ β”‚  β•‘
β•‘  β”‚  β”‚  β€’ Cross-domain  β”‚  β”‚  β€’ Self-describingβ”‚ β”‚  β€’ Domain teams  β”‚  β”‚  β€’ Domain      β”‚ β”‚  β•‘
β•‘  β”‚  β”‚    via products  β”‚  β”‚  β€’ Interoperable β”‚  β”‚    build their   β”‚  β”‚    autonomy    β”‚ β”‚  β•‘
β•‘  β”‚  β”‚  β€’ No centralizedβ”‚  β”‚  β€’ Secure        β”‚  β”‚    own pipelines β”‚  β”‚  β€’ Automated   β”‚ β”‚  β•‘
β•‘  β”‚  β”‚    data team     β”‚  β”‚  β€’ Versioned     β”‚  β”‚  β€’ Platform team β”‚  β”‚    policy      β”‚ β”‚  β•‘
β•‘  β”‚  β”‚                  β”‚  β”‚                  β”‚  β”‚    provides      β”‚  β”‚    enforcement β”‚ β”‚  β•‘
β•‘  β”‚  β”‚                  β”‚  β”‚                  β”‚  β”‚    abstractions  β”‚  β”‚                β”‚ β”‚  β•‘
β•‘  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚  β•‘
β•‘  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β•‘
β•‘                                                                                              β•‘
β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β•‘
β•‘  β”‚                         DOMAIN OWNERSHIP STRUCTURE                                      β”‚  β•‘
β•‘  β”‚                                                                                        β”‚  β•‘
β•‘  β”‚                        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                β”‚  β•‘
β•‘  β”‚                        β”‚      PLATFORM TEAM            β”‚                                β”‚  β•‘
β•‘  β”‚                        β”‚  (Infrastructure & Tooling)   β”‚                                β”‚  β•‘
β•‘  β”‚                        β”‚                              β”‚                                β”‚  β•‘
β•‘  β”‚                        β”‚  β€’ Data lakehouse infrastructure                              β”‚  β•‘
β•‘  β”‚                        β”‚  β€’ Compute orchestration (Spark)                              β”‚  β•‘
β•‘  β”‚                        β”‚  β€’ Identity & access management                               β”‚  β•‘
β•‘  β”‚                        β”‚  β€’ Observability & monitoring                                  β”‚  β•‘
β•‘  β”‚                        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                β”‚  β•‘
β•‘  β”‚                                       β”‚                                                β”‚  β•‘
β•‘  β”‚            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                     β”‚  β•‘
β•‘  β”‚            β”‚                          β”‚                          β”‚                      β”‚  β•‘
β•‘  β”‚            β–Ό                          β–Ό                          β–Ό                      β”‚  β•‘
β•‘  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                     β”‚  β•‘
β•‘  β”‚  β”‚  SALES DOMAIN    β”‚  β”‚  MARKETING DOMAINβ”‚  β”‚  SUPPLY CHAIN   β”‚                     β”‚  β•‘
β•‘  β”‚  β”‚  (Domain Team)   β”‚  β”‚  (Domain Team)   β”‚  β”‚  DOMAIN Team)   β”‚                     β”‚  β•‘
β•‘  β”‚  β”‚                  β”‚  β”‚                  β”‚  β”‚                  β”‚                     β”‚  β•‘
β•‘  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚                     β”‚  β•‘
β•‘  β”‚  β”‚  β”‚ Data       β”‚  β”‚  β”‚  β”‚ Data       β”‚  β”‚  β”‚  β”‚ Data       β”‚  β”‚                     β”‚  β•‘
β•‘  β”‚  β”‚  β”‚ Product:   β”‚  β”‚  β”‚  β”‚ Product:   β”‚  β”‚  β”‚  β”‚ Product:   β”‚  β”‚                     β”‚  β•‘
β•‘  β”‚  β”‚  β”‚ customer_  β”‚  β”‚  β”‚  β”‚ campaign_  β”‚  β”‚  β”‚  β”‚ inventory_ β”‚  β”‚                     β”‚  β•‘
β•‘  β”‚  β”‚  β”‚ lifetime_  β”‚  β”‚  β”‚  β”‚ attributionβ”‚  β”‚  β”‚  β”‚ forecast   β”‚  β”‚                     β”‚  β•‘
β•‘  β”‚  β”‚  β”‚ value      β”‚  β”‚  β”‚  β”‚ _model     β”‚  β”‚  β”‚  β”‚            β”‚  β”‚                     β”‚  β•‘
β•‘  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚                     β”‚  β•‘
β•‘  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚                     β”‚  β•‘
β•‘  β”‚  β”‚  β”‚ Data       β”‚  β”‚  β”‚  β”‚ Data       β”‚  β”‚  β”‚  β”‚ Data       β”‚  β”‚                     β”‚  β•‘
β•‘  β”‚  β”‚  β”‚ Product:   β”‚  β”‚  β”‚  β”‚ Product:   β”‚  β”‚  β”‚  β”‚ Product:   β”‚  β”‚                     β”‚  β•‘
β•‘  β”‚  β”‚  β”‚ order_     β”‚  β”‚  β”‚  β”‚ customer_  β”‚  β”‚  β”‚  β”‚ supplier_  β”‚  β”‚                     β”‚  β•‘
β•‘  β”‚  β”‚  β”‚ analytics  β”‚  β”‚  β”‚  β”‚ segments   β”‚  β”‚  β”‚  β”‚ performanceβ”‚  β”‚                     β”‚  β•‘
β•‘  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚                     β”‚  β•‘
β•‘  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                     β”‚  β•‘
β•‘  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β•‘
β•‘                                                                                              β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•

Architecture Diagram: Data Product Lifecycle

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    DATA PRODUCT LIFECYCLE IN PYSPARK                                 β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                                     β”‚
β”‚  PHASE 1: DESIGN                PHASE 2: BUILD                                     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                        β”‚
β”‚  β”‚                         β”‚    β”‚                         β”‚                        β”‚
β”‚  β”‚  β€’ Domain Discovery     β”‚    β”‚  β€’ Source Extraction     β”‚                        β”‚
β”‚  β”‚    └─ Identify business β”‚    β”‚    └─ PySpark read from  β”‚                        β”‚
β”‚  β”‚       domain boundaries β”‚    β”‚       domain sources     β”‚                        β”‚
β”‚  β”‚  β€’ Product Charter      β”‚    β”‚  β€’ Data Transformation  β”‚                        β”‚
β”‚  β”‚    └─ Define SLA,       β”‚    β”‚    └─ PySpark transforms β”‚                        β”‚
β”‚  β”‚       freshness, qualityβ”‚    β”‚       (clean, enrich)    β”‚                        β”‚
β”‚  β”‚  β€’ Schema Design        β”‚    β”‚  β€’ Schema Validation    β”‚                        β”‚
β”‚  β”‚    └─ Define output     β”‚    β”‚    └─ Enforce contracts  β”‚                        β”‚
β”‚  β”‚       schema + types    β”‚    β”‚       via PySpark checks β”‚                        β”‚
β”‚  β”‚  β€’ Access Policy        β”‚    β”‚  β€’ Quality Checks       β”‚                        β”‚
β”‚  β”‚    └─ Define who can    β”‚    β”‚    └─ Automated tests   β”‚                        β”‚
β”‚  β”‚       consume product   β”‚    β”‚       in pipeline       β”‚                        β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                        β”‚
β”‚               β”‚                              β”‚                                      β”‚
β”‚               β–Ό                              β–Ό                                      β”‚
β”‚  PHASE 3: PUBLISH              PHASE 4: OPERATE                                    β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                        β”‚
β”‚  β”‚                         β”‚    β”‚                         β”‚                        β”‚
β”‚  β”‚  β€’ Register in Catalog  β”‚    β”‚  β€’ Monitor Freshness    β”‚                        β”‚
β”‚  β”‚    └─ Unity Catalog /   β”‚    β”‚    └─ Alert if SLA      β”‚                        β”‚
β”‚  β”‚       Data Catalog      β”‚    β”‚       breached           β”‚                        β”‚
β”‚  β”‚  β€’ Version Release      β”‚    β”‚  β€’ Track Usage Metrics  β”‚                        β”‚
β”‚  β”‚    └─ Semantic versioningβ”‚   β”‚    └─ Query count,      β”‚                        β”‚
β”‚  β”‚       (v1.0, v1.1)     β”‚    β”‚       consumer count     β”‚                        β”‚
β”‚  β”‚  β€’ Documentation        β”‚    β”‚  β€’ Quality Monitoring   β”‚                        β”‚
β”‚  β”‚    └─ Auto-generated    β”‚    β”‚    └─ Schema drift      β”‚                        β”‚
β”‚  β”‚       data dictionary   β”‚    β”‚       detection          β”‚                        β”‚
β”‚  β”‚  β€’ SLA Commitment       β”‚    β”‚  β€’ Cost Attribution     β”‚                        β”‚
β”‚  β”‚    └─ Published uptime  β”‚    β”‚    └─ Track compute +   β”‚                        β”‚
β”‚  β”‚       and freshness     β”‚    β”‚       storage costs      β”‚                        β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                        β”‚
β”‚               β”‚                              β”‚                                      β”‚
β”‚               β–Ό                              β–Ό                                      β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                         CONSUMER DISCOVERY & ACCESS                         β”‚   β”‚
β”‚  β”‚                                                                             β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Search &    β”‚  β”‚  Request     β”‚  β”‚  Consume     β”‚  β”‚  Feedback    β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  Discover    │──▢  Access     │──▢  & Analyze   │──▢  & Iterate  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  (Catalog)   β”‚  β”‚  (API/Policies)β”‚ β”‚  (PySpark)  β”‚  β”‚  (Improve)   β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                             β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                                     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Architecture Diagram: Federated Governance Model

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    FEDERATED COMPUTATIONAL GOVERNANCE                                β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                                     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                         GOVERNANCE COUNCIL                                   β”‚   β”‚
β”‚  β”‚  (Cross-domain representatives, meets quarterly)                             β”‚   β”‚
β”‚  β”‚                                                                             β”‚   β”‚
β”‚  β”‚  Responsibilities:                                                          β”‚   β”‚
β”‚  β”‚  β€’ Define global data standards (naming, security, quality)                 β”‚   β”‚
β”‚  β”‚  β€’ Approve cross-domain data product contracts                              β”‚   β”‚
β”‚  β”‚  β€’ Resolve domain disputes over shared data                                 β”‚   β”‚
β”‚  β”‚  β€’ Review and approve platform capabilities                                 β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                      β”‚                                              β”‚
β”‚                                      β–Ό                                              β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    GOVERNANCE AS CODE (Automated)                            β”‚   β”‚
β”‚  β”‚                                                                             β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β”‚
β”‚  β”‚  β”‚  POLICY ENFORCEMENT (PySpark + Delta Lake)                            β”‚  β”‚   β”‚
β”‚  β”‚  β”‚                                                                       β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  Schema     β”‚  β”‚  Quality    β”‚  β”‚  Security   β”‚  β”‚  SLA        β”‚ β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  Registry   β”‚  β”‚  Gates      β”‚  β”‚  Policies   β”‚  β”‚  Monitor    β”‚ β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  β”‚             β”‚  β”‚             β”‚  β”‚             β”‚  β”‚             β”‚ β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  β”‚ β€’ Enforce   β”‚  β”‚ β€’ Null      β”‚  β”‚ β€’ Column    β”‚  β”‚ β€’ Freshness β”‚ β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  β”‚   required  β”‚  β”‚   checks    β”‚  β”‚   masking   β”‚  β”‚   tracking  β”‚ β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  β”‚   columns   β”‚  β”‚ β€’ Range     β”‚  β”‚ β€’ Row-level β”‚  β”‚ β€’ Alert     β”‚ β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  β”‚ β€’ Validate  β”‚  β”‚   validationβ”‚  β”‚   filtering β”‚  β”‚   on breach β”‚ β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  β”‚   data typesβ”‚  β”‚ β€’ Uniquenessβ”‚  β”‚ β€’ Encryptionβ”‚  β”‚ β€’ Auto-     β”‚ β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  β”‚ β€’ Reject    β”‚  β”‚   assertionsβ”‚  β”‚   at rest   β”‚  β”‚   scaling   β”‚ β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  β”‚   invalid   β”‚  β”‚ β€’ Completenessβ”‚ β”‚             β”‚  β”‚             β”‚ β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  β”‚   schemas   β”‚  β”‚   checks    β”‚  β”‚             β”‚  β”‚             β”‚ β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚  β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β”‚
β”‚  β”‚                                                                             β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β”‚
β”‚  β”‚  β”‚  IMPLEMENTATION (PySpark Code)                                        β”‚  β”‚   β”‚
β”‚  β”‚  β”‚                                                                       β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  # Schema enforcement                                                β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  enforced_df = validate_schema(df, expected_schema)                   β”‚  β”‚   β”‚
β”‚  β”‚  β”‚                                                                       β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  # Quality gates                                                     β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  quality_report = run_quality_checks(df, quality_rules)               β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  if quality_report.has_critical_failures:                             β”‚  β”‚   β”‚
β”‚  β”‚  β”‚      raise DataQualityException(quality_report)                       β”‚  β”‚   β”‚
β”‚  β”‚  β”‚                                                                       β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  # Security policies                                                 β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  masked_df = apply_column_masking(df, security_policies)              β”‚  β”‚   β”‚
β”‚  β”‚  β”‚                                                                       β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  # SLA monitoring                                                    β”‚  β”‚   β”‚
β”‚  β”‚  β”‚  record_freshness(product_name, current_timestamp())                  β”‚  β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                      β”‚                                              β”‚
β”‚                                      β–Ό                                              β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    DOMAIN-LEVEL GOVERNANCE                                   β”‚   β”‚
β”‚  β”‚                                                                             β”‚   β”‚
β”‚  β”‚  Each domain team:                                                          β”‚   β”‚
β”‚  β”‚  β€’ Implements global policies within their domain                           β”‚   β”‚
β”‚  β”‚  β€’ Defines domain-specific quality rules                                    β”‚   β”‚
β”‚  β”‚  β€’ Manages access policies for their data products                          β”‚   β”‚
β”‚  β”‚  β€’ Reports compliance to governance council                                 β”‚   β”‚
β”‚  β”‚  β€’ Has autonomy over internal domain data structures                        β”‚   β”‚
β”‚  β”‚                                                                             β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Sales       β”‚  β”‚  Marketing   β”‚  β”‚  Supply Chainβ”‚  β”‚  Finance     β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  Domain      β”‚  β”‚  Domain      β”‚  β”‚  Domain      β”‚  β”‚  Domain      β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  Policies    β”‚  β”‚  Policies    β”‚  β”‚  Policies    β”‚  β”‚  Policies    β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                                     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Detailed Explanation

Data Mesh is a decentralized sociotechnical approach to data architecture that treats data as a product owned by domain teams rather than a centralized data engineering team. Founded by Zhamak Dehghani, Data Mesh addresses the scaling limitations of centralized data architectures (data warehouses, monolithic data lakes) by distributing ownership, computation, and governance across autonomous business domains.

The four foundational principles of Data Mesh are: (1) Domain-Oriented Ownership β€” each business domain (sales, marketing, supply chain, finance) owns and manages its data as a first-class product; (2) Data as a Product β€” domain data is published as discoverable, addressable, trustworthy, self-describing, interoperable, and secure products with published SLAs; (3) Self-Serve Data Platform β€” a platform team provides infrastructure abstractions (compute, storage, orchestration, cataloging) that enable domain teams to build and operate their own data products without deep platform expertise; (4) Federated Computational Governance β€” global policies (security, quality, interoperability) are defined by a governance council but enforced programmatically through automated policies embedded in the platform.

In PySpark implementations, each domain team maintains its own set of Spark applications that extract data from operational sources, transform it into well-defined data products, and publish them to a shared data lakehouse. The platform team provides reusable Spark libraries, shared cluster configurations, and standardized deployment patterns. Domain teams have full autonomy over their transformation logic while adhering to global standards for schema format, quality metrics, and access controls.

Data products in Data Mesh are more than just tables β€” they are self-contained units of data with metadata (schema, description, lineage), quality metrics (freshness, completeness, accuracy), access policies (who can read, who can write), and documentation. Each data product has a published SLA specifying maximum staleness (e.g., data must be refreshed every 4 hours), minimum quality thresholds (e.g., 99.9% completeness), and availability guarantees.

The federated governance model replaces traditional centralized data governance with automated, code-based policy enforcement. Instead of a governance team manually reviewing data access requests, policies are encoded as PySpark validation functions, Delta Lake constraints, and Unity Catalog ACLs that are automatically enforced when data products are published or consumed.

Key Concepts Table

Mathematical Foundations

Definition: Data Product

A data product PP is a self-contained, discoverable data asset defined as:

P=(D,S,Q,A,M)P = (D, S, Q, A, M)

where DD is the dataset, SS is the schema, QQ is quality SLAs, AA is access controls, and MM is metadata. Data products must be addressable, trustworthy, and self-describing.

Domain Ownership Metric

Domain independence is measured by the cross-domain dependency ratio:

CDR(d)=∣{p∈Pd:βˆƒdβ€²β‰ d,p.depends_on(dβ€²)}∣∣Pd∣\text{CDR}(d) = \frac{|\{p \in P_d : \exists d' \neq d, p.\text{depends\_on}(d')\}|}{|P_d|}

Target: CDR(d)<0.2\text{CDR}(d) < 0.2 for healthy domain boundaries.

Data Mesh Scalability Theorem

Adding a new domain dnewd_{\text{new}} increases system capacity by Ξ”C=C(dnew)\Delta C = C(d_{\text{new}}) without modifying existing domains if:

βˆ€dβ‰ dnew:d.interfaces∩dnew.interfaces=βˆ…\forall d \neq d_{\text{new}}: d.\text{interfaces} \cap d_{\text{new}}.\text{interfaces} = \emptyset

This is the "independent deployability" property.

Federated Governance Cost

Governance overhead scales as:

Cgovernance=βˆ‘i=1ncpolicy(di)+nΓ—ccoordinationC_{\text{governance}} = \sum_{i=1}^{n} c_{\text{policy}}(d_i) + n \times c_{\text{coordination}}

where the coordination cost grows linearly with domain count.

Data Product Discovery

Findability score for a data product with metadata completeness mm:

Scorediscover=Ξ±β‹…mschema+Ξ²β‹…mquality+Ξ³β‹…mlineage\text{Score}_{\text{discover}} = \alpha \cdot m_{\text{schema}} + \beta \cdot m_{\text{quality}} + \gamma \cdot m_{\text{lineage}}

where Ξ±+Ξ²+Ξ³=1\alpha + \beta + \gamma = 1 are weighting factors.

Key Insight

Data mesh shifts from centralized data teams to domain-oriented ownership. The trade-off is increased coordination overhead but reduced single points of failure. Platform teams provide self-service infrastructure to minimize domain onboarding cost.

Summary

Data mesh distributes data ownership to domains, each managing data as products. Cross-domain dependency ratio measures boundary health. Governance overhead scales linearly with domains. The architecture trades coordination complexity for organizational scalability.

Key Concepts Table (cont.)

ConceptDescriptionPySpark ImplementationOwnership
DomainBusiness capability boundarySpark apps per domainDomain Team
Data ProductDiscoverable, trustworthy data unitDelta tables + metadataDomain Team
Data Product ContractPublished SLA + schema guaranteePySpark validation + Unity CatalogDomain Team
Self-Serve PlatformInfrastructure abstractionsShared Spark configs, librariesPlatform Team
Federated GovernanceAutomated policy enforcementPySpark quality gates, ACLsGovernance Council
Schema RegistryCentralized schema managementDelta Lake schema + Unity CatalogPlatform Team
Lineage TrackingData flow documentationOpenLineage + Spark listenersPlatform Team
Quality GatesAutomated validationPySpark quality checks in pipelineDomain Team
Cost AttributionPer-domain cost trackingSpark metrics + cloud billingPlatform Team
Cross-Domain JoinInter-domain data integrationData product compositionConsumer

Code Examples

Example 1: Defining a Data Product with Quality Contracts

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime
import json

spark = SparkSession.builder \
    .appName("Data-Product-Sales") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .getOrCreate()

# ─── Data Product Definition ───
@dataclass
class DataProductContract:
    """Defines the contract for a data product."""
    name: str
    version: str
    domain: str
    owner: str
    description: str
    schema: StructType
    sla_freshness_hours: int = 4
    quality_rules: Dict = field(default_factory=dict)
    access_policy: Dict = field(default_factory=dict)
    dependencies: List[str] = field(default_factory=list)

# Define the Sales Domain Data Product
customer_ltv_product = DataProductContract(
    name="customer_lifetime_value",
    version="1.2.0",
    domain="sales",
    owner="sales-data-team@company.com",
    description="Customer lifetime value calculation based on historical orders",
    schema=StructType([
        StructField("customer_id", StringType(), False),
        StructField("customer_name", StringType(), True),
        StructField("ltv_score", DoubleType(), True),
        StructField("total_revenue", DecimalType(18, 2), True),
        StructField("order_count", IntegerType(), True),
        StructField("avg_order_value", DecimalType(18, 2), True),
        StructField("first_order_date", DateType(), True),
        StructField("last_order_date", DateType(), True),
        StructField("segment", StringType(), True),
        StructField("_computed_at", TimestampType(), False),
        StructField("_product_version", StringType(), False),
    ]),
    sla_freshness_hours=4,
    quality_rules={
        "customer_id": {"not_null": True, "unique": True},
        "ltv_score": {"not_null": True, "min": 0, "max": 1000000},
        "total_revenue": {"not_null": True, "min": 0},
        "order_count": {"not_null": True, "min": 1},
        "completeness_threshold": 0.99,
    },
    access_policy={
        "owner_domain": "sales",
        "allowed_consumers": ["marketing", "finance", "executive"],
        "pii_columns": ["customer_name"],
        "masking_rules": {"customer_name": "partial_mask"},
    },
    dependencies=["orders_raw", "customers_master"]
)

# ─── Build the Data Product ───
def build_customer_ltv(spark, contract):
    """Build the customer lifetime value data product."""
    
    # Read source data from domain-owned Bronze layer
    orders = spark.read.format("delta").load("/mnt/sales/bronze/orders")
    customers = spark.read.format("delta").load("/mnt/sales/bronze/customers")
    
    # Transform: Calculate LTV metrics
    ltv_df = (
        orders
        .join(customers, "customer_id", "left")
        .groupBy(
            orders["customer_id"],
            customers["customer_name"]
        )
        .agg(
            count("*").alias("order_count"),
            sum("amount").alias("total_revenue"),
            avg("amount").alias("avg_order_value"),
            min("order_date").alias("first_order_date"),
            max("order_date").alias("last_order_date"),
        )
        .withColumn("ltv_score",
            col("total_revenue") * 0.3 +
            col("order_count") * 10 +
            (current_date().datediff(col("last_order_date")) * -0.1)
        )
        .withColumn("segment",
            when(col("ltv_score") > 10000, "platinum")
            .when(col("ltv_score") > 5000, "gold")
            .when(col("ltv_score") > 1000, "silver")
            .otherwise("bronze")
        )
        .withColumn("_computed_at", current_timestamp())
        .withColumn("_product_version", lit(contract.version))
    )
    
    return ltv_df

# Build the product
customer_ltv = build_customer_ltv(spark, customer_ltv_product)

# ─── Quality Validation ───
def validate_data_product(df, contract):
    """Validate data product against its contract."""
    validation_results = []
    
    for col_name, rules in contract.quality_rules.items():
        if col_name == "completeness_threshold":
            continue
        
        col_obj = df[col_name]
        
        if rules.get("not_null"):
            null_count = df.filter(col_obj.isNull()).count()
            total_count = df.count()
            completeness = 1 - (null_count / total_count) if total_count > 0 else 0
            threshold = contract.quality_rules.get("completeness_threshold", 0.99)
            
            validation_results.append({
                "column": col_name,
                "check": "not_null",
                "passed": completeness >= threshold,
                "completeness": completeness,
                "threshold": threshold,
            })
        
        if rules.get("unique"):
            duplicate_count = (
                df.groupBy(col_name)
                .count()
                .filter(col("count") > 1)
                .count()
            )
            validation_results.append({
                "column": col_name,
                "check": "unique",
                "passed": duplicate_count == 0,
                "duplicate_count": duplicate_count,
            })
        
        if "min" in rules:
            min_val = df.agg(min(col_obj)).first()[0]
            validation_results.append({
                "column": col_name,
                "check": "min_value",
                "passed": min_val >= rules["min"],
                "actual_min": min_val,
                "expected_min": rules["min"],
            })
        
        if "max" in rules:
            max_val = df.agg(max(col_obj)).first()[0]
            validation_results.append({
                "column": col_name,
                "check": "max_value",
                "passed": max_val <= rules["max"],
                "actual_max": max_val,
                "expected_max": rules["max"],
            })
    
    return validation_results

# Run validation
results = validate_data_product(customer_ltv, customer_ltv_product)
for r in results:
    status = "PASS" if r["passed"] else "FAIL"
    print(f"[{status}] {r['column']}.{r['check']}: {r}")

# ─── Publish Data Product ───
(
    customer_ltv
    .write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .save("/mnt/sales/gold/customer_lifetime_value")
)

# Register in catalog (Unity Catalog example)
spark.sql("""
    CREATE TABLE IF NOT EXISTS sales.customer_lifetime_value
    USING DELTA
    LOCATION '/mnt/sales/gold/customer_lifetime_value'
    COMMENT 'Customer lifetime value calculation - Sales Domain Product v1.2.0'
    TBLPROPERTIES (
        'product.owner' = 'sales-data-team@company.com',
        'product.version' = '1.2.0',
        'product.sla.freshness.hours' = '4',
        'product.domain' = 'sales',
        'product.quality.completeness' = '0.99'
    )
""")

Example 2: Cross-Domain Data Product Composition

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("Cross-Domain-Composition") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .getOrCreate()

# ─── Compose Data Products from Multiple Domains ───
# Marketing domain wants a unified customer view combining:
# - Sales domain: customer_lifetime_value
# - Marketing domain: customer_segments
# - Supply chain domain: delivery_preferences

# Read data products from each domain (via catalog or direct path)
sales_ltv = (
    spark.read
    .format("delta")
    .load("/mnt/sales/gold/customer_lifetime_value")
    .select(
        "customer_id",
        col("ltv_score").alias("sales_ltv_score"),
        col("segment").alias("sales_segment"),
        "total_revenue",
        "order_count",
    )
)

marketing_segments = (
    spark.read
    .format("delta")
    .load("/mnt/marketing/gold/customer_segments")
    .select(
        "customer_id",
        col("segment").alias("marketing_segment"),
        col("engagement_score"),
        col("preferred_channel"),
        col("last_campaign_response"),
    )
)

supply_preferences = (
    spark.read
    .format("delta")
    .load("/mnt/supply_chain/gold/delivery_preferences")
    .select(
        "customer_id",
        col("preferred_delivery_speed"),
        col("avg_delivery_satisfaction"),
        col("return_rate"),
    )
)

# Compose: Join all data products into a unified view
unified_customer = (
    sales_ltv
    .join(marketing_segments, "customer_id", "left")
    .join(supply_preferences, "customer_id", "left")
    .withColumn("unified_score",
        (col("sales_ltv_score") * 0.4) +
        (col("engagement_score") * 0.3) +
        ((1 - col("return_rate")) * 100 * 0.3)
    )
    .withColumn("recommended_action",
        when(
            (col("sales_ltv_score") > 5000) &
            (col("engagement_score") > 70) &
            (col("avg_delivery_satisfaction") > 4.0),
            "premium_retention_offer"
        ).when(
            (col("sales_ltv_score") < 1000) &
            (col("engagement_score") < 30),
            "win_back_campaign"
        ).otherwise("standard_nurture")
    )
    .withColumn("_composed_at", current_timestamp())
    .withColumn("_source_products", array(
        lit("sales.customer_lifetime_value"),
        lit("marketing.customer_segments"),
        lit("supply_chain.delivery_preferences")
    ))
)

# Write composed data product
(
    unified_customer
    .write
    .format("delta")
    .mode("overwrite")
    .save("/mnt/marketing/gold/unified_customer_view")
)

# Register with lineage metadata
spark.sql("""
    CREATE TABLE IF NOT EXISTS marketing.unified_customer_view
    USING DELTA
    LOCATION '/mnt/marketing/gold/unified_customer_view'
    COMMENT 'Unified customer view composed from sales, marketing, and supply chain domains'
    TBLPROPERTIES (
        'product.owner' = 'marketing-data-team@company.com',
        'product.version' = '1.0.0',
        'product.domain' = 'marketing',
        'product.composed_from' = 'sales.customer_lifetime_value,marketing.customer_segments,supply_chain.delivery_preferences',
        'product.sla.freshness.hours' = '8'
    )
""")

Example 3: Federated Governance with Automated Quality Gates

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json

spark = SparkSession.builder \
    .appName("Federated-Governance") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .getOrCreate()

# ─── Global Governance Policies (Defined by Council) ───
GOVERNANCE_POLICIES = {
    "naming_conventions": {
        "table_names": "snake_case",
        "column_names": "snake_case",
        "prefix_required": True,  # e.g., dim_, fact_, agg_
    },
    "security": {
        "pii_detection": True,
        "encryption_at_rest": True,
        "column_masking_required": True,
        "row_level_security": True,
    },
    "quality": {
        "minimum_completeness": 0.99,
        "freshness_sla_hours": 24,
        "uniqueness_required": True,
        "schema_evolution_requires_approval": True,
    },
    "interoperability": {
        "standard_date_columns": ["created_at", "updated_at"],
        "standard_id_columns": ["_id", "id"],
        "timestamp_timezone": "UTC",
        "decimal_precision": {"precision": 18, "scale": 2},
    }
}

# ─── Automated Quality Gate Function ───
class GovernanceGate:
    """Automated governance validation for data products."""
    
    def __init__(self, policies):
        self.policies = policies
        self.violations = []
    
    def validate_schema(self, df, product_name):
        """Validate naming conventions."""
        columns = df.columns
        
        for col_name in columns:
            # Check snake_case
            if not all(c.islower() or c == '_' or c.isdigit() for c in col_name):
                self.violations.append({
                    "product": product_name,
                    "rule": "naming_conventions",
                    "severity": "WARNING",
                    "message": f"Column '{col_name}' not in snake_case"
                })
        
        return len(self.violations) == 0
    
    def validate_quality(self, df, product_name):
        """Validate quality thresholds."""
        total_rows = df.count()
        if total_rows == 0:
            self.violations.append({
                "product": product_name,
                "rule": "quality",
                "severity": "CRITICAL",
                "message": "Data product is empty"
            })
            return False
        
        for col_name in df.columns:
            null_count = df.filter(col(col_name).isNull()).count()
            completeness = 1 - (null_count / total_rows)
            
            if completeness < self.policies["quality"]["minimum_completeness"]:
                self.violations.append({
                    "product": product_name,
                    "rule": "quality.completeness",
                    "severity": "CRITICAL",
                    "column": col_name,
                    "completeness": completeness,
                    "threshold": self.policies["quality"]["minimum_completeness"]
                })
        
        return True
    
    def validate_security(self, df, product_name, pii_columns):
        """Validate security policies."""
        for col_name in pii_columns:
            if col_name not in df.columns:
                self.violations.append({
                    "product": product_name,
                    "rule": "security.pii",
                    "severity": "CRITICAL",
                    "message": f"PII column '{col_name}' not found for masking"
                })
        
        return True
    
    def validate_interoperability(self, df, product_name):
        """Validate interoperability standards."""
        # Check timestamp columns are in UTC
        for col_name in df.columns:
            if col_name.endswith("_at") or col_name.endswith("_timestamp"):
                col_type = dict(df.dtypes).get(col_name)
                if col_type not in ("timestamp", "date"):
                    self.violations.append({
                        "product": product_name,
                        "rule": "interoperability.timestamp",
                        "severity": "WARNING",
                        "column": col_name,
                        "actual_type": col_type,
                        "expected_type": "timestamp"
                    })
        
        return True
    
    def get_report(self):
        """Generate governance validation report."""
        critical = [v for v in self.violations if v["severity"] == "CRITICAL"]
        warnings = [v for v in self.violations if v["severity"] == "WARNING"]
        
        return {
            "total_violations": len(self.violations),
            "critical": len(critical),
            "warnings": len(warnings),
            "passed": len(critical) == 0,
            "violations": self.violations
        }

# ─── Apply Governance to a Data Product ───
gate = GovernanceGate(GOVERNANCE_POLICIES)

# Build a data product
customer_data = spark.read.format("delta").load("/mnt/sales/bronze/customers")
customer_product = (
    customer_data
    .withColumn("ltv_score", col("total_revenue") * 0.3)
    .withColumn("_computed_at", current_timestamp())
)

# Run governance checks
gate.validate_schema(customer_product, "sales.customer_lifetime_value")
gate.validate_quality(customer_product, "sales.customer_lifetime_value")
gate.validate_security(customer_product, "sales.customer_lifetime_value",
                       pii_columns=["customer_name", "email"])
gate.validate_interoperability(customer_product, "sales.customer_lifetime_value")

# Get report
report = gate.get_report()
print(json.dumps(report, indent=2, default=str))

# Block publication if critical violations
if not report["passed"]:
    raise Exception(
        f"Data product 'sales.customer_lifetime_value' failed governance validation. "
        f"Critical violations: {report['critical']}. "
        f"Fix issues before publishing."
    )

Performance Metrics

MetricCentralized Data TeamData Mesh (Decentralized)Improvement
Time to New Data Product4-8 weeks1-2 weeks75% faster
Data Product Count20-50 (central bottleneck)200-500 (distributed)10x scaling
Data Quality Incidents15/month5/month67% reduction
Mean Time to Recovery4-8 hours1-2 hours75% faster
Data Consumer Satisfaction3.2/5.04.5/5.041% improvement
Cost per Data Product5,000/month∣5,000/month |1,500/month70% reduction
Cross-Domain Query Latency30 seconds5 seconds83% faster
Governance Policy Compliance65% (manual)98% (automated)51% improvement
Developer Productivity2 products/engineer8 products/engineer4x improvement
Data Freshness (avg)24 hours4 hours83% faster

Best Practices

  1. Start with clear domain boundaries β€” Use Event Storming or Domain-Driven Design to identify bounded contexts before assigning data ownership. Each domain should have clear business capabilities and minimal coupling to other domains.

  2. Define data product contracts upfront β€” Every data product must have a published contract specifying schema, SLA (freshness, availability), quality thresholds, and access policies. Use PySpark validation functions to enforce these contracts programmatically.

  3. Build a self-serve platform first β€” Invest in platform abstractions (shared Spark configs, cluster templates, deployment pipelines, catalog integration) before scaling domain teams. The platform should reduce the cognitive load for domain teams.

  4. Implement federated governance as code β€” Replace manual governance processes with automated PySpark validation functions, Delta Lake constraints, and Unity Catalog policies. Governance should be enforced at publish time, not reviewed manually.

  5. Enable discoverability through a data catalog β€” Use Unity Catalog, DataHub, or Amundsen to register data products with metadata, lineage, and quality metrics. Domain teams should be able to discover and request access to other domains' products.

  6. Track data product usage and value β€” Monitor query patterns, consumer count, and business impact for each data product. Use this data to prioritize improvements and justify domain team investments.

  7. Implement cross-domain composition carefully β€” When composing data products from multiple domains, document dependencies, validate that all source products meet their SLAs, and handle partial failures gracefully.

  8. Maintain backward compatibility β€” Version data products using semantic versioning (v1.0, v1.1, v2.0). Breaking changes require a new major version and deprecation period for existing consumers.

  9. Invest in data observability β€” Deploy monitoring for schema drift, data quality degradation, freshness violations, and cost anomalies. Alert domain teams proactively rather than waiting for consumers to report issues.

  10. Foster a data product culture β€” Train domain teams on data engineering best practices, provide templates and examples, and create communities of practice across domains for knowledge sharing.

See also: Snowflake Time Travel (snowflake/02), Kafka CDC (kafka/04), Airflow DAGs (airflow/02)

Advertisement

Need Expert PySpark Help?

Get personalized Spark optimization, cluster tuning, or production data pipeline consulting.

Advertisement