Data Mesh Architecture with PySpark
Architecture Diagram: Data Mesh Conceptual Model
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β DATA MESH ARCHITECTURE (Conceptual) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ£
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β FOUR FOUNDATIONAL PRINCIPLES β β
β β β β
β β ββββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββ β β
β β β 1. DOMAIN β β 2. DATA AS β β 3. SELF-SERVE β β 4. FEDERATED β β β
β β β ORIENTED β β A PRODUCT β β DATA β β COMPUTATIONAL β β β
β β β β β β β PLATFORM β β GOVERNANCE β β β
β β β β’ Business β β β’ Discoverable β β β β β β β
β β β domains own β β β’ Addressable β β β’ Infrastructureβ β β’ Global β β β
β β β their data β β β’ Trustworthy β β as Code β β standards β β β
β β β β’ Cross-domain β β β’ Self-describingβ β β’ Domain teams β β β’ Domain β β β
β β β via products β β β’ Interoperable β β build their β β autonomy β β β
β β β β’ No centralizedβ β β’ Secure β β own pipelines β β β’ Automated β β β
β β β data team β β β’ Versioned β β β’ Platform team β β policy β β β
β β β β β β β provides β β enforcement β β β
β β β β β β β abstractions β β β β β
β β ββββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββ β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β DOMAIN OWNERSHIP STRUCTURE β β
β β β β
β β ββββββββββββββββββββββββββββββββ β β
β β β PLATFORM TEAM β β β
β β β (Infrastructure & Tooling) β β β
β β β β β β
β β β β’ Data lakehouse infrastructure β β
β β β β’ Compute orchestration (Spark) β β
β β β β’ Identity & access management β β
β β β β’ Observability & monitoring β β
β β ββββββββββββββββ¬ββββββββββββββββ β β
β β β β β
β β ββββββββββββββββββββββββββββΌβββββββββββββββββββββββββββ β β
β β β β β β β
β β βΌ βΌ βΌ β β
β β ββββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββββ β β
β β β SALES DOMAIN β β MARKETING DOMAINβ β SUPPLY CHAIN β β β
β β β (Domain Team) β β (Domain Team) β β DOMAIN Team) β β β
β β β β β β β β β β
β β β ββββββββββββββ β β ββββββββββββββ β β ββββββββββββββ β β β
β β β β Data β β β β Data β β β β Data β β β β
β β β β Product: β β β β Product: β β β β Product: β β β β
β β β β customer_ β β β β campaign_ β β β β inventory_ β β β β
β β β β lifetime_ β β β β attributionβ β β β forecast β β β β
β β β β value β β β β _model β β β β β β β β
β β β ββββββββββββββ β β ββββββββββββββ β β ββββββββββββββ β β β
β β β ββββββββββββββ β β ββββββββββββββ β β ββββββββββββββ β β β
β β β β Data β β β β Data β β β β Data β β β β
β β β β Product: β β β β Product: β β β β Product: β β β β
β β β β order_ β β β β customer_ β β β β supplier_ β β β β
β β β β analytics β β β β segments β β β β performanceβ β β β
β β β ββββββββββββββ β β ββββββββββββββ β β ββββββββββββββ β β β
β β ββββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββββ β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Architecture Diagram: Data Product Lifecycle
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β DATA PRODUCT LIFECYCLE IN PYSPARK β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β PHASE 1: DESIGN PHASE 2: BUILD β
β βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ β
β β β β β β
β β β’ Domain Discovery β β β’ Source Extraction β β
β β ββ Identify business β β ββ PySpark read from β β
β β domain boundaries β β domain sources β β
β β β’ Product Charter β β β’ Data Transformation β β
β β ββ Define SLA, β β ββ PySpark transforms β β
β β freshness, qualityβ β (clean, enrich) β β
β β β’ Schema Design β β β’ Schema Validation β β
β β ββ Define output β β ββ Enforce contracts β β
β β schema + types β β via PySpark checks β β
β β β’ Access Policy β β β’ Quality Checks β β
β β ββ Define who can β β ββ Automated tests β β
β β consume product β β in pipeline β β
β ββββββββββββββ¬βββββββββββββ ββββββββββββββ¬βββββββββββββ β
β β β β
β βΌ βΌ β
β PHASE 3: PUBLISH PHASE 4: OPERATE β
β βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ β
β β β β β β
β β β’ Register in Catalog β β β’ Monitor Freshness β β
β β ββ Unity Catalog / β β ββ Alert if SLA β β
β β Data Catalog β β breached β β
β β β’ Version Release β β β’ Track Usage Metrics β β
β β ββ Semantic versioningβ β ββ Query count, β β
β β (v1.0, v1.1) β β consumer count β β
β β β’ Documentation β β β’ Quality Monitoring β β
β β ββ Auto-generated β β ββ Schema drift β β
β β data dictionary β β detection β β
β β β’ SLA Commitment β β β’ Cost Attribution β β
β β ββ Published uptime β β ββ Track compute + β β
β β and freshness β β storage costs β β
β ββββββββββββββ¬βββββββββββββ ββββββββββββββ¬βββββββββββββ β
β β β β
β βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β CONSUMER DISCOVERY & ACCESS β β
β β β β
β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β
β β β Search & β β Request β β Consume β β Feedback β β β
β β β Discover ββββΆ Access ββββΆ & Analyze ββββΆ & Iterate β β β
β β β (Catalog) β β (API/Policies)β β (PySpark) β β (Improve) β β β
β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Architecture Diagram: Federated Governance Model
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β FEDERATED COMPUTATIONAL GOVERNANCE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β GOVERNANCE COUNCIL β β
β β (Cross-domain representatives, meets quarterly) β β
β β β β
β β Responsibilities: β β
β β β’ Define global data standards (naming, security, quality) β β
β β β’ Approve cross-domain data product contracts β β
β β β’ Resolve domain disputes over shared data β β
β β β’ Review and approve platform capabilities β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β GOVERNANCE AS CODE (Automated) β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β POLICY ENFORCEMENT (PySpark + Delta Lake) β β β
β β β β β β
β β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ β β β
β β β β Schema β β Quality β β Security β β SLA β β β β
β β β β Registry β β Gates β β Policies β β Monitor β β β β
β β β β β β β β β β β β β β
β β β β β’ Enforce β β β’ Null β β β’ Column β β β’ Freshness β β β β
β β β β required β β checks β β masking β β tracking β β β β
β β β β columns β β β’ Range β β β’ Row-level β β β’ Alert β β β β
β β β β β’ Validate β β validationβ β filtering β β on breach β β β β
β β β β data typesβ β β’ Uniquenessβ β β’ Encryptionβ β β’ Auto- β β β β
β β β β β’ Reject β β assertionsβ β at rest β β scaling β β β β
β β β β invalid β β β’ Completenessβ β β β β β β β
β β β β schemas β β checks β β β β β β β β
β β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β IMPLEMENTATION (PySpark Code) β β β
β β β β β β
β β β # Schema enforcement β β β
β β β enforced_df = validate_schema(df, expected_schema) β β β
β β β β β β
β β β # Quality gates β β β
β β β quality_report = run_quality_checks(df, quality_rules) β β β
β β β if quality_report.has_critical_failures: β β β
β β β raise DataQualityException(quality_report) β β β
β β β β β β
β β β # Security policies β β β
β β β masked_df = apply_column_masking(df, security_policies) β β β
β β β β β β
β β β # SLA monitoring β β β
β β β record_freshness(product_name, current_timestamp()) β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β DOMAIN-LEVEL GOVERNANCE β β
β β β β
β β Each domain team: β β
β β β’ Implements global policies within their domain β β
β β β’ Defines domain-specific quality rules β β
β β β’ Manages access policies for their data products β β
β β β’ Reports compliance to governance council β β
β β β’ Has autonomy over internal domain data structures β β
β β β β
β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β
β β β Sales β β Marketing β β Supply Chainβ β Finance β β β
β β β Domain β β Domain β β Domain β β Domain β β β
β β β Policies β β Policies β β Policies β β Policies β β β
β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Detailed Explanation
Data Mesh is a decentralized sociotechnical approach to data architecture that treats data as a product owned by domain teams rather than a centralized data engineering team. Founded by Zhamak Dehghani, Data Mesh addresses the scaling limitations of centralized data architectures (data warehouses, monolithic data lakes) by distributing ownership, computation, and governance across autonomous business domains.
The four foundational principles of Data Mesh are: (1) Domain-Oriented Ownership β each business domain (sales, marketing, supply chain, finance) owns and manages its data as a first-class product; (2) Data as a Product β domain data is published as discoverable, addressable, trustworthy, self-describing, interoperable, and secure products with published SLAs; (3) Self-Serve Data Platform β a platform team provides infrastructure abstractions (compute, storage, orchestration, cataloging) that enable domain teams to build and operate their own data products without deep platform expertise; (4) Federated Computational Governance β global policies (security, quality, interoperability) are defined by a governance council but enforced programmatically through automated policies embedded in the platform.
In PySpark implementations, each domain team maintains its own set of Spark applications that extract data from operational sources, transform it into well-defined data products, and publish them to a shared data lakehouse. The platform team provides reusable Spark libraries, shared cluster configurations, and standardized deployment patterns. Domain teams have full autonomy over their transformation logic while adhering to global standards for schema format, quality metrics, and access controls.
Data products in Data Mesh are more than just tables β they are self-contained units of data with metadata (schema, description, lineage), quality metrics (freshness, completeness, accuracy), access policies (who can read, who can write), and documentation. Each data product has a published SLA specifying maximum staleness (e.g., data must be refreshed every 4 hours), minimum quality thresholds (e.g., 99.9% completeness), and availability guarantees.
The federated governance model replaces traditional centralized data governance with automated, code-based policy enforcement. Instead of a governance team manually reviewing data access requests, policies are encoded as PySpark validation functions, Delta Lake constraints, and Unity Catalog ACLs that are automatically enforced when data products are published or consumed.
Key Concepts Table
Mathematical Foundations
Definition: Data Product
A data product is a self-contained, discoverable data asset defined as:
where is the dataset, is the schema, is quality SLAs, is access controls, and is metadata. Data products must be addressable, trustworthy, and self-describing.
Domain Ownership Metric
Domain independence is measured by the cross-domain dependency ratio:
Target: for healthy domain boundaries.
Data Mesh Scalability Theorem
Adding a new domain increases system capacity by without modifying existing domains if:
This is the "independent deployability" property.
Federated Governance Cost
Governance overhead scales as:
where the coordination cost grows linearly with domain count.
Data Product Discovery
Findability score for a data product with metadata completeness :
where are weighting factors.
Key Insight
Data mesh shifts from centralized data teams to domain-oriented ownership. The trade-off is increased coordination overhead but reduced single points of failure. Platform teams provide self-service infrastructure to minimize domain onboarding cost.
Summary
Data mesh distributes data ownership to domains, each managing data as products. Cross-domain dependency ratio measures boundary health. Governance overhead scales linearly with domains. The architecture trades coordination complexity for organizational scalability.
Key Concepts Table (cont.)
| Concept | Description | PySpark Implementation | Ownership |
|---|---|---|---|
| Domain | Business capability boundary | Spark apps per domain | Domain Team |
| Data Product | Discoverable, trustworthy data unit | Delta tables + metadata | Domain Team |
| Data Product Contract | Published SLA + schema guarantee | PySpark validation + Unity Catalog | Domain Team |
| Self-Serve Platform | Infrastructure abstractions | Shared Spark configs, libraries | Platform Team |
| Federated Governance | Automated policy enforcement | PySpark quality gates, ACLs | Governance Council |
| Schema Registry | Centralized schema management | Delta Lake schema + Unity Catalog | Platform Team |
| Lineage Tracking | Data flow documentation | OpenLineage + Spark listeners | Platform Team |
| Quality Gates | Automated validation | PySpark quality checks in pipeline | Domain Team |
| Cost Attribution | Per-domain cost tracking | Spark metrics + cloud billing | Platform Team |
| Cross-Domain Join | Inter-domain data integration | Data product composition | Consumer |
Code Examples
Example 1: Defining a Data Product with Quality Contracts
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime
import json
spark = SparkSession.builder \
.appName("Data-Product-Sales") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.getOrCreate()
# βββ Data Product Definition βββ
@dataclass
class DataProductContract:
"""Defines the contract for a data product."""
name: str
version: str
domain: str
owner: str
description: str
schema: StructType
sla_freshness_hours: int = 4
quality_rules: Dict = field(default_factory=dict)
access_policy: Dict = field(default_factory=dict)
dependencies: List[str] = field(default_factory=list)
# Define the Sales Domain Data Product
customer_ltv_product = DataProductContract(
name="customer_lifetime_value",
version="1.2.0",
domain="sales",
owner="sales-data-team@company.com",
description="Customer lifetime value calculation based on historical orders",
schema=StructType([
StructField("customer_id", StringType(), False),
StructField("customer_name", StringType(), True),
StructField("ltv_score", DoubleType(), True),
StructField("total_revenue", DecimalType(18, 2), True),
StructField("order_count", IntegerType(), True),
StructField("avg_order_value", DecimalType(18, 2), True),
StructField("first_order_date", DateType(), True),
StructField("last_order_date", DateType(), True),
StructField("segment", StringType(), True),
StructField("_computed_at", TimestampType(), False),
StructField("_product_version", StringType(), False),
]),
sla_freshness_hours=4,
quality_rules={
"customer_id": {"not_null": True, "unique": True},
"ltv_score": {"not_null": True, "min": 0, "max": 1000000},
"total_revenue": {"not_null": True, "min": 0},
"order_count": {"not_null": True, "min": 1},
"completeness_threshold": 0.99,
},
access_policy={
"owner_domain": "sales",
"allowed_consumers": ["marketing", "finance", "executive"],
"pii_columns": ["customer_name"],
"masking_rules": {"customer_name": "partial_mask"},
},
dependencies=["orders_raw", "customers_master"]
)
# βββ Build the Data Product βββ
def build_customer_ltv(spark, contract):
"""Build the customer lifetime value data product."""
# Read source data from domain-owned Bronze layer
orders = spark.read.format("delta").load("/mnt/sales/bronze/orders")
customers = spark.read.format("delta").load("/mnt/sales/bronze/customers")
# Transform: Calculate LTV metrics
ltv_df = (
orders
.join(customers, "customer_id", "left")
.groupBy(
orders["customer_id"],
customers["customer_name"]
)
.agg(
count("*").alias("order_count"),
sum("amount").alias("total_revenue"),
avg("amount").alias("avg_order_value"),
min("order_date").alias("first_order_date"),
max("order_date").alias("last_order_date"),
)
.withColumn("ltv_score",
col("total_revenue") * 0.3 +
col("order_count") * 10 +
(current_date().datediff(col("last_order_date")) * -0.1)
)
.withColumn("segment",
when(col("ltv_score") > 10000, "platinum")
.when(col("ltv_score") > 5000, "gold")
.when(col("ltv_score") > 1000, "silver")
.otherwise("bronze")
)
.withColumn("_computed_at", current_timestamp())
.withColumn("_product_version", lit(contract.version))
)
return ltv_df
# Build the product
customer_ltv = build_customer_ltv(spark, customer_ltv_product)
# βββ Quality Validation βββ
def validate_data_product(df, contract):
"""Validate data product against its contract."""
validation_results = []
for col_name, rules in contract.quality_rules.items():
if col_name == "completeness_threshold":
continue
col_obj = df[col_name]
if rules.get("not_null"):
null_count = df.filter(col_obj.isNull()).count()
total_count = df.count()
completeness = 1 - (null_count / total_count) if total_count > 0 else 0
threshold = contract.quality_rules.get("completeness_threshold", 0.99)
validation_results.append({
"column": col_name,
"check": "not_null",
"passed": completeness >= threshold,
"completeness": completeness,
"threshold": threshold,
})
if rules.get("unique"):
duplicate_count = (
df.groupBy(col_name)
.count()
.filter(col("count") > 1)
.count()
)
validation_results.append({
"column": col_name,
"check": "unique",
"passed": duplicate_count == 0,
"duplicate_count": duplicate_count,
})
if "min" in rules:
min_val = df.agg(min(col_obj)).first()[0]
validation_results.append({
"column": col_name,
"check": "min_value",
"passed": min_val >= rules["min"],
"actual_min": min_val,
"expected_min": rules["min"],
})
if "max" in rules:
max_val = df.agg(max(col_obj)).first()[0]
validation_results.append({
"column": col_name,
"check": "max_value",
"passed": max_val <= rules["max"],
"actual_max": max_val,
"expected_max": rules["max"],
})
return validation_results
# Run validation
results = validate_data_product(customer_ltv, customer_ltv_product)
for r in results:
status = "PASS" if r["passed"] else "FAIL"
print(f"[{status}] {r['column']}.{r['check']}: {r}")
# βββ Publish Data Product βββ
(
customer_ltv
.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true")
.save("/mnt/sales/gold/customer_lifetime_value")
)
# Register in catalog (Unity Catalog example)
spark.sql("""
CREATE TABLE IF NOT EXISTS sales.customer_lifetime_value
USING DELTA
LOCATION '/mnt/sales/gold/customer_lifetime_value'
COMMENT 'Customer lifetime value calculation - Sales Domain Product v1.2.0'
TBLPROPERTIES (
'product.owner' = 'sales-data-team@company.com',
'product.version' = '1.2.0',
'product.sla.freshness.hours' = '4',
'product.domain' = 'sales',
'product.quality.completeness' = '0.99'
)
""")
Example 2: Cross-Domain Data Product Composition
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("Cross-Domain-Composition") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.getOrCreate()
# βββ Compose Data Products from Multiple Domains βββ
# Marketing domain wants a unified customer view combining:
# - Sales domain: customer_lifetime_value
# - Marketing domain: customer_segments
# - Supply chain domain: delivery_preferences
# Read data products from each domain (via catalog or direct path)
sales_ltv = (
spark.read
.format("delta")
.load("/mnt/sales/gold/customer_lifetime_value")
.select(
"customer_id",
col("ltv_score").alias("sales_ltv_score"),
col("segment").alias("sales_segment"),
"total_revenue",
"order_count",
)
)
marketing_segments = (
spark.read
.format("delta")
.load("/mnt/marketing/gold/customer_segments")
.select(
"customer_id",
col("segment").alias("marketing_segment"),
col("engagement_score"),
col("preferred_channel"),
col("last_campaign_response"),
)
)
supply_preferences = (
spark.read
.format("delta")
.load("/mnt/supply_chain/gold/delivery_preferences")
.select(
"customer_id",
col("preferred_delivery_speed"),
col("avg_delivery_satisfaction"),
col("return_rate"),
)
)
# Compose: Join all data products into a unified view
unified_customer = (
sales_ltv
.join(marketing_segments, "customer_id", "left")
.join(supply_preferences, "customer_id", "left")
.withColumn("unified_score",
(col("sales_ltv_score") * 0.4) +
(col("engagement_score") * 0.3) +
((1 - col("return_rate")) * 100 * 0.3)
)
.withColumn("recommended_action",
when(
(col("sales_ltv_score") > 5000) &
(col("engagement_score") > 70) &
(col("avg_delivery_satisfaction") > 4.0),
"premium_retention_offer"
).when(
(col("sales_ltv_score") < 1000) &
(col("engagement_score") < 30),
"win_back_campaign"
).otherwise("standard_nurture")
)
.withColumn("_composed_at", current_timestamp())
.withColumn("_source_products", array(
lit("sales.customer_lifetime_value"),
lit("marketing.customer_segments"),
lit("supply_chain.delivery_preferences")
))
)
# Write composed data product
(
unified_customer
.write
.format("delta")
.mode("overwrite")
.save("/mnt/marketing/gold/unified_customer_view")
)
# Register with lineage metadata
spark.sql("""
CREATE TABLE IF NOT EXISTS marketing.unified_customer_view
USING DELTA
LOCATION '/mnt/marketing/gold/unified_customer_view'
COMMENT 'Unified customer view composed from sales, marketing, and supply chain domains'
TBLPROPERTIES (
'product.owner' = 'marketing-data-team@company.com',
'product.version' = '1.0.0',
'product.domain' = 'marketing',
'product.composed_from' = 'sales.customer_lifetime_value,marketing.customer_segments,supply_chain.delivery_preferences',
'product.sla.freshness.hours' = '8'
)
""")
Example 3: Federated Governance with Automated Quality Gates
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json
spark = SparkSession.builder \
.appName("Federated-Governance") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.getOrCreate()
# βββ Global Governance Policies (Defined by Council) βββ
GOVERNANCE_POLICIES = {
"naming_conventions": {
"table_names": "snake_case",
"column_names": "snake_case",
"prefix_required": True, # e.g., dim_, fact_, agg_
},
"security": {
"pii_detection": True,
"encryption_at_rest": True,
"column_masking_required": True,
"row_level_security": True,
},
"quality": {
"minimum_completeness": 0.99,
"freshness_sla_hours": 24,
"uniqueness_required": True,
"schema_evolution_requires_approval": True,
},
"interoperability": {
"standard_date_columns": ["created_at", "updated_at"],
"standard_id_columns": ["_id", "id"],
"timestamp_timezone": "UTC",
"decimal_precision": {"precision": 18, "scale": 2},
}
}
# βββ Automated Quality Gate Function βββ
class GovernanceGate:
"""Automated governance validation for data products."""
def __init__(self, policies):
self.policies = policies
self.violations = []
def validate_schema(self, df, product_name):
"""Validate naming conventions."""
columns = df.columns
for col_name in columns:
# Check snake_case
if not all(c.islower() or c == '_' or c.isdigit() for c in col_name):
self.violations.append({
"product": product_name,
"rule": "naming_conventions",
"severity": "WARNING",
"message": f"Column '{col_name}' not in snake_case"
})
return len(self.violations) == 0
def validate_quality(self, df, product_name):
"""Validate quality thresholds."""
total_rows = df.count()
if total_rows == 0:
self.violations.append({
"product": product_name,
"rule": "quality",
"severity": "CRITICAL",
"message": "Data product is empty"
})
return False
for col_name in df.columns:
null_count = df.filter(col(col_name).isNull()).count()
completeness = 1 - (null_count / total_rows)
if completeness < self.policies["quality"]["minimum_completeness"]:
self.violations.append({
"product": product_name,
"rule": "quality.completeness",
"severity": "CRITICAL",
"column": col_name,
"completeness": completeness,
"threshold": self.policies["quality"]["minimum_completeness"]
})
return True
def validate_security(self, df, product_name, pii_columns):
"""Validate security policies."""
for col_name in pii_columns:
if col_name not in df.columns:
self.violations.append({
"product": product_name,
"rule": "security.pii",
"severity": "CRITICAL",
"message": f"PII column '{col_name}' not found for masking"
})
return True
def validate_interoperability(self, df, product_name):
"""Validate interoperability standards."""
# Check timestamp columns are in UTC
for col_name in df.columns:
if col_name.endswith("_at") or col_name.endswith("_timestamp"):
col_type = dict(df.dtypes).get(col_name)
if col_type not in ("timestamp", "date"):
self.violations.append({
"product": product_name,
"rule": "interoperability.timestamp",
"severity": "WARNING",
"column": col_name,
"actual_type": col_type,
"expected_type": "timestamp"
})
return True
def get_report(self):
"""Generate governance validation report."""
critical = [v for v in self.violations if v["severity"] == "CRITICAL"]
warnings = [v for v in self.violations if v["severity"] == "WARNING"]
return {
"total_violations": len(self.violations),
"critical": len(critical),
"warnings": len(warnings),
"passed": len(critical) == 0,
"violations": self.violations
}
# βββ Apply Governance to a Data Product βββ
gate = GovernanceGate(GOVERNANCE_POLICIES)
# Build a data product
customer_data = spark.read.format("delta").load("/mnt/sales/bronze/customers")
customer_product = (
customer_data
.withColumn("ltv_score", col("total_revenue") * 0.3)
.withColumn("_computed_at", current_timestamp())
)
# Run governance checks
gate.validate_schema(customer_product, "sales.customer_lifetime_value")
gate.validate_quality(customer_product, "sales.customer_lifetime_value")
gate.validate_security(customer_product, "sales.customer_lifetime_value",
pii_columns=["customer_name", "email"])
gate.validate_interoperability(customer_product, "sales.customer_lifetime_value")
# Get report
report = gate.get_report()
print(json.dumps(report, indent=2, default=str))
# Block publication if critical violations
if not report["passed"]:
raise Exception(
f"Data product 'sales.customer_lifetime_value' failed governance validation. "
f"Critical violations: {report['critical']}. "
f"Fix issues before publishing."
)
Performance Metrics
| Metric | Centralized Data Team | Data Mesh (Decentralized) | Improvement |
|---|---|---|---|
| Time to New Data Product | 4-8 weeks | 1-2 weeks | 75% faster |
| Data Product Count | 20-50 (central bottleneck) | 200-500 (distributed) | 10x scaling |
| Data Quality Incidents | 15/month | 5/month | 67% reduction |
| Mean Time to Recovery | 4-8 hours | 1-2 hours | 75% faster |
| Data Consumer Satisfaction | 3.2/5.0 | 4.5/5.0 | 41% improvement |
| Cost per Data Product | 1,500/month | 70% reduction | |
| Cross-Domain Query Latency | 30 seconds | 5 seconds | 83% faster |
| Governance Policy Compliance | 65% (manual) | 98% (automated) | 51% improvement |
| Developer Productivity | 2 products/engineer | 8 products/engineer | 4x improvement |
| Data Freshness (avg) | 24 hours | 4 hours | 83% faster |
Best Practices
-
Start with clear domain boundaries β Use Event Storming or Domain-Driven Design to identify bounded contexts before assigning data ownership. Each domain should have clear business capabilities and minimal coupling to other domains.
-
Define data product contracts upfront β Every data product must have a published contract specifying schema, SLA (freshness, availability), quality thresholds, and access policies. Use PySpark validation functions to enforce these contracts programmatically.
-
Build a self-serve platform first β Invest in platform abstractions (shared Spark configs, cluster templates, deployment pipelines, catalog integration) before scaling domain teams. The platform should reduce the cognitive load for domain teams.
-
Implement federated governance as code β Replace manual governance processes with automated PySpark validation functions, Delta Lake constraints, and Unity Catalog policies. Governance should be enforced at publish time, not reviewed manually.
-
Enable discoverability through a data catalog β Use Unity Catalog, DataHub, or Amundsen to register data products with metadata, lineage, and quality metrics. Domain teams should be able to discover and request access to other domains' products.
-
Track data product usage and value β Monitor query patterns, consumer count, and business impact for each data product. Use this data to prioritize improvements and justify domain team investments.
-
Implement cross-domain composition carefully β When composing data products from multiple domains, document dependencies, validate that all source products meet their SLAs, and handle partial failures gracefully.
-
Maintain backward compatibility β Version data products using semantic versioning (v1.0, v1.1, v2.0). Breaking changes require a new major version and deprecation period for existing consumers.
-
Invest in data observability β Deploy monitoring for schema drift, data quality degradation, freshness violations, and cost anomalies. Alert domain teams proactively rather than waiting for consumers to report issues.
-
Foster a data product culture β Train domain teams on data engineering best practices, provide templates and examples, and create communities of practice across domains for knowledge sharing.
See also: Snowflake Time Travel (snowflake/02), Kafka CDC (kafka/04), Airflow DAGs (airflow/02)