Data Platform Design: Building Enterprise Data Infrastructure
Difficulty: Staff Level | Companies: LinkedIn, Uber, Netflix, Airbnb, Stripe
1. Data Platform Architecture
Architecture Diagram
Enterprise Data Platform
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Application Layer β
β BI Tools β ML Platform β Data Apps β Analytics β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Serving Layer β
β Data Warehouse β Feature Store β Data Catalog β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Processing Layer β
β Batch (Spark) β Stream (Flink) β ML Training β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Storage Layer β
β Data Lake β Data Lakehouse β Object Storage β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Ingestion Layer β
β CDC β Batch Ingest β Streaming β APIs β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Governance Layer β
β Metadata β Lineage β Quality β Access Control β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
2. Data Mesh Architecture
Four Principles
Architecture Diagram
Data Mesh Principles:
βββ Domain Ownership
β βββ Each business domain owns their data end-to-end
βββ Data as a Product
β βββ Data must be discoverable, addressable, trustworthy
βββ Self-Serve Platform
β βββ Platform team provides infrastructure as a service
βββ Federated Computational Governance
βββ Policies are code, executed automatically
Domain-Based Organization
from dataclasses import dataclass
from typing import List, Dict
@dataclass
class DataDomain:
name: str
owner_team: str
data_products: List[str]
upstream_dependencies: List[str]
downstream_consumers: List[str]
def schema(self):
return {
"domain": self.name,
"team": self.owner_team,
"products": self.data_products,
"upstream": self.upstream_dependencies,
"downstream": self.downstream_consumers
}
# Define domains
domains = [
DataDomain(
name="orders",
owner_team="checkout-team",
data_products=["orders_fact", "payments_fact", "cart_events"],
upstream_dependencies=["inventory", "users"],
downstream_consumers=["analytics", "ml", "finance"]
),
DataDomain(
name="users",
owner_team="growth-team",
data_products=["user_profiles", "user_segments", "user_events"],
upstream_dependencies=[],
downstream_consumers=["orders", "marketing", "ml"]
),
DataDomain(
name="inventory",
owner_team="supply-chain-team",
data_products=["stock_levels", "supplier_data", "warehouse_events"],
upstream_dependencies=["orders"],
downstream_consumers=["analytics", "operations"]
),
]
Data Product Definition
@dataclass
class DataProduct:
name: str
domain: str
description: str
schema: Dict[str, str]
sla: Dict[str, any]
access_patterns: List[str]
quality_checks: List[str]
owner: str
def to_data_contract(self):
return {
"product": self.name,
"domain": self.domain,
"schema": self.schema,
"sla": {
"freshness_minutes": self.sla.get("freshness", 60),
"availability_percent": self.sla.get("availability", 99.9),
"completeness_percent": self.sla.get("completeness", 99.5),
},
"quality": self.quality_checks,
"access": self.access_patterns,
}
# Example data product
orders_product = DataProduct(
name="orders_fact",
domain="orders",
description="Fact table of all customer orders",
schema={
"order_id": "string (PK)",
"user_id": "string (FK)",
"order_total": "decimal(12,2)",
"order_status": "enum(pending,completed,cancelled)",
"created_at": "timestamp",
},
sla={"freshness": 15, "availability": 99.95, "completeness": 99.9},
access_patterns=["batch", "streaming"],
quality_checks=["not_null(order_id)", "unique(order_id)", "valid_status"],
owner="checkout-team"
)
3. Self-Serve Platform Design
class SelfServeDataPlatform:
"""Platform capabilities that domain teams can self-serve"""
def __init__(self):
self.capabilities = {
"ingestion": self._ingestion_service,
"transformation": self._transformation_service,
"storage": self._storage_service,
"serving": self._serving_service,
"monitoring": self._monitoring_service,
}
def _ingestion_service(self, config: dict):
"""Domain teams create ingestion pipelines"""
return {
"type": "managed_ingestion",
"sources": config.get("sources", []),
"targets": config.get("targets", []),
"schedule": config.get("schedule", "@daily"),
"monitoring": "auto",
}
def _transformation_service(self, config: dict):
"""Domain teams define transformations in SQL/Python"""
return {
"type": "managed_transformation",
"engine": config.get("engine", "spark"),
"code_repo": config.get("repo"),
"tests": config.get("tests", []),
"lineage": "auto",
}
def _storage_service(self, config: dict):
"""Domain teams get managed storage"""
return {
"type": "managed_storage",
"format": config.get("format", "delta"),
"partitioning": config.get("partitioning"),
"retention": config.get("retention_days", 365),
"compression": "auto",
}
def provision_domain(self, domain_name: str):
"""Provision all resources for a new domain"""
return {
"storage_bucket": f"s3://{domain_name}-data-lake",
"schema_registry": f"{domain_name}-schemas",
"monitoring": f"{domain_name}-dashboards",
"access_role": f"arn:aws:iam::role/{domain_name}-data-access",
}
4. Data Catalog & Discovery
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass, field
@dataclass
class CatalogEntry:
name: str
domain: str
description: str
owner: str
schema: Dict[str, str]
tags: List[str] = field(default_factory=list)
quality_score: float = 0.0
last_updated: datetime = field(default_factory=datetime.now)
lineage_upstream: List[str] = field(default_factory=list)
lineage_downstream: List[str] = field(default_factory=list)
class DataCatalog:
def __init__(self):
self.entries: Dict[str, CatalogEntry] = {}
def register(self, entry: CatalogEntry):
self.entries[entry.name] = entry
def search(self, query: str, tags: Optional[List[str]] = None) -> List[CatalogEntry]:
results = []
for entry in self.entries.values():
match = query.lower() in entry.description.lower() or \
query.lower() in entry.name.lower()
if tags:
match = match and any(t in entry.tags for t in tags)
if match:
results.append(entry)
return sorted(results, key=lambda e: e.quality_score, reverse=True)
def impact_analysis(self, dataset_name: str) -> Dict:
entry = self.entries.get(dataset_name)
if not entry:
return {}
return {
"dataset": dataset_name,
"upstream": entry.lineage_upstream,
"downstream": entry.lineage_downstream,
"downstream_count": len(entry.lineage_downstream),
"quality_score": entry.quality_score,
}
5. Federated Governance
class FederatedGovernance:
"""Governance policies as code, executed automatically"""
PII_TYPES = {"email", "phone", "ssn", "credit_card", "address"}
SENSITIVE_TAGS = {"pii", "financial", "health", "personal"}
def apply_access_policy(self, dataset: CatalogEntry, requester: str) -> bool:
"""Automated access control"""
if any(tag in self.SENSITIVE_TAGS for tag in dataset.tags):
# Require explicit approval for sensitive data
return self._check_approval(dataset.name, requester)
return True
def validate_quality(self, dataset: CatalogEntry, data) -> Dict:
"""Automated quality validation"""
results = {}
# Check completeness
null_ratios = {col: data[col].isnull().mean() for col in data.columns}
results["completeness"] = all(r < 0.05 for r in null_ratios.values())
# Check uniqueness
for col in dataset.schema:
if col.endswith("_id"):
results[f"unique_{col}"] = data[col].is_unique
# Check freshness
if "timestamp" in data.columns:
max_ts = data["timestamp"].max()
results["freshness_hours"] = (datetime.now() - max_ts).total_seconds() / 3600
return results
def compute_quality_score(self, validation_results: Dict) -> float:
"""Compute 0-100 quality score"""
weights = {"completeness": 30, "freshness": 30, "uniqueness": 20, "accuracy": 20}
score = 0
if validation_results.get("completeness"):
score += weights["completeness"]
freshness_hours = validation_results.get("freshness_hours", 999)
if freshness_hours <= 1:
score += weights["freshness"]
elif freshness_hours <= 24:
score += weights["freshness"] * 0.5
return score
βΉοΈ
Best Practice: Start with a centralized platform, then evolve toward data mesh as the organization grows. The key enabler is a self-serve platform that reduces the cognitive load on domain teams.
Follow-Up Questions
- How would you migrate from a centralized data platform to data mesh?
- Design a data platform for a 500-person engineering organization.
- How do you handle cross-domain data joins in a data mesh?
- Design a self-serve analytics platform for non-technical users.
- How would you measure the ROI of a data platform investment?