πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Data Contracts: Schema Management & Governance

Data EngineeringData Governance⭐ Premium

Advertisement

Data Contracts: Schema Management & Governance

Difficulty: Senior Level | Companies: LinkedIn, Uber, Airbnb, Netflix, Spotify

1. What Are Data Contracts?

Data contracts are formal agreements between data producers and consumers that define:

  • Schema (structure, types, nullability)
  • Semantics (meaning, business rules)
  • SLAs (freshness, completeness, availability)
Producer↔Data Contract↔ConsumerSchema Registry + Quality ChecksContract Validator + SLA Monitoring

ℹ️

Key Insight: Data contracts shift data quality left β€” problems are caught at the producer level, not after downstream dashboards break.

2. Contract Schema Definition

Avro-Based Contract

{
  "type": "record",
  "name": "UserEvents",
  "namespace": "com.company.events",
  "fields": [
    {"name": "event_id", "type": "string", "doc": "Unique event identifier"},
    {"name": "user_id", "type": "long", "doc": "Internal user ID"},
    {"name": "event_type", "type": {"type": "enum", "name": "EventType", "symbols": ["CLICK", "VIEW", "PURCHASE", "SIGNUP"]}},
    {"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"},
    {"name": "properties", "type": {"type": "map", "values": "string"}, "default": {}},
    {"name": "page_url", "type": ["null", "string"], "default": null}
  ],
  "contract": {
    "version": "2.1.0",
    "owner": "platform-team",
    "sla": {
      "freshness_minutes": 15,
      "completeness_threshold": 0.995,
      "nullability_policy": "page_url may be null, all others required"
    }
  }
}

Protobuf Contract

syntax = "proto3";
package company.events;

message UserEvent {
  string event_id = 1;
  int64 user_id = 2;
  EventType event_type = 3;
  google.protobuf.Timestamp timestamp = 4;
  map<string, string> properties = 5;
  optional string page_url = 6;
}

enum EventType {
  EVENT_TYPE_UNSPECIFIED = 0;
  CLICK = 1;
  VIEW = 2;
  PURCHASE = 3;
  SIGNUP = 4;
}

3. Contract Evolution Strategies

Backward vs Forward Compatibility

Architecture Diagram
Compatibility Modes:
β”œβ”€β”€ Backward Compatible: New schema reads old data
β”œβ”€β”€ Forward Compatible: Old schema reads new data
β”œβ”€β”€ Full Compatible: Both directions work
└── Breaking: Neither direction works

Rule of thumb:
βœ… ADD new fields (with defaults)
βœ… Make fields optional
❌ REMOVE fields
❌ CHANGE field types
❌ RENAME fields

Schema Evolution Example

from dataclasses import dataclass
from typing import List, Optional
from enum import Enum

class CompatibilityMode(Enum):
    BACKWARD = "backward"
    FORWARD = "forward"
    FULL = "full"
    NONE = "none"

@dataclass
class SchemaEvolutionRule:
    field_name: str
    old_type: str
    new_type: str
    is_compatible: bool
    reason: str

class ContractEvolution:
    def __init__(self, mode: CompatibilityMode = CompatibilityMode.BACKWARD):
        self.mode = mode
        self.rules: List[SchemaEvolutionRule] = []
    
    def validate_evolution(self, old_schema: dict, new_schema: dict) -> List[str]:
        violations = []
        old_fields = {f['name']: f for f in old_schema.get('fields', [])}
        new_fields = {f['name']: f for f in new_schema.get('fields', [])}
        
        # Check for removed fields
        for name in old_fields:
            if name not in new_fields:
                if self.mode in [CompatibilityMode.BACKWARD, CompatibilityMode.FULL]:
                    violations.append(f"REMOVED field '{name}' β€” violates {self.mode.value} compatibility")
        
        # Check for type changes
        for name in old_fields:
            if name in new_fields:
                old_type = old_fields[name]['type']
                new_type = new_fields[name]['type']
                if old_type != new_type:
                    violations.append(f"CHANGED type of '{name}' from {old_type} to {new_type}")
        
        # Check for added required fields (no default)
        for name in new_fields:
            if name not in old_fields:
                field = new_fields[name]
                if field.get('required', True) and 'default' not in field:
                    violations.append(f"ADDED required field '{name}' without default β€” breaks old consumers")
        
        return violations

# Usage
evolver = ContractEvolution(CompatibilityMode.BACKWARD)
violations = evolver.validate_evolution(old_schema, new_schema)
if violations:
    print(f"Contract evolution violated: {violations}")

⚠️

Common Interview Trap: Adding a required field without a default value breaks backward compatibility. Always provide defaults for new fields.

4. Contract Testing Framework

Producer-Side Validation

import pandera as pa
from pandera import Column, Check, DataFrameSchema

# Define contract as code
user_events_contract = DataFrameSchema({
    "event_id": Column(str, unique=True, nullable=False),
    "user_id": Column(int, checks=Check.gt(0), nullable=False),
    "event_type": Column(str, isin=["CLICK", "VIEW", "PURCHASE", "SIGNUP"], nullable=False),
    "timestamp": Column("datetime64[ns]", nullable=False),
    "page_url": Column(str, nullable=True),
    "properties": Column(dict, nullable=True),
}, coerce=True)

# Validate at write time
def validate_producer_output(df, contract):
    try:
        contract.validate(df)
        return {"status": "passed", "rows": len(df)}
    except pa.errors.SchemaError as e:
        return {"status": "failed", "errors": str(e)}

# Schema-level contract tests
def test_schema_compliance(spark_df):
    """Run before publishing to Kafka"""
    pandas_df = spark_df.limit(1000).toPandas()
    result = validate_producer_output(pandas_df, user_events_contract)
    assert result["status"] == "passed", f"Contract violation: {result['errors']}"

Consumer-Side Validation

class ContractValidator:
    def __init__(self, contract_schema: dict):
        self.schema = contract_schema
    
    def validate_batch(self, df, contract_version: str) -> dict:
        errors = []
        warnings = []
        
        # Check required columns exist
        required = self.schema.get('required_columns', [])
        for col in required:
            if col not in df.columns:
                errors.append(f"Missing required column: {col}")
        
        # Check data types
        for col, expected_type in self.schema.get('column_types', {}).items():
            if col in df.columns:
                actual_type = str(df[col].dtype)
                if actual_type != expected_type:
                    errors.append(f"Type mismatch: {col} expected {expected_type}, got {actual_type}")
        
        # Check freshness
        if 'timestamp_column' in self.schema:
            ts_col = self.schema['timestamp_column']
            if ts_col in df.columns:
                max_ts = df[ts_col].max()
                freshness_minutes = (datetime.now() - max_ts).total_seconds() / 60
                sla_minutes = self.schema.get('freshness_sla_minutes', 60)
                if freshness_minutes > sla_minutes:
                    warnings.append(f"Data is {freshness_minutes:.0f}min old, SLA is {sla_minutes}min")
        
        return {"errors": errors, "warnings": warnings, "valid": len(errors) == 0}

5. Contract Registry & Discovery

Central Contract Registry

from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass, field

@dataclass
class DataContract:
    contract_id: str
    name: str
    version: str
    owner: str
    schema: dict
    sla: dict
    created_at: datetime = field(default_factory=datetime.now)
    status: str = "active"
    consumers: List[str] = field(default_factory=list)
    tags: List[str] = field(default_factory=list)

class ContractRegistry:
    def __init__(self):
        self.contracts: Dict[str, DataContract] = {}
        self.changelog: List[dict] = []
    
    def register(self, contract: DataContract) -> str:
        key = f"{contract.name}:{contract.version}"
        if key in self.contracts:
            raise ValueError(f"Contract {key} already registered")
        
        self.contracts[key] = contract
        self.changelog.append({
            "action": "REGISTER",
            "contract": key,
            "timestamp": datetime.now().isoformat()
        })
        return key
    
    def deprecate(self, name: str, version: str):
        key = f"{name}:{version}"
        self.contracts[key].status = "deprecated"
        self.changelog.append({
            "action": "DEPRECATE",
            "contract": key,
            "timestamp": datetime.now().isoformat()
        })
    
    def get_active_contract(self, name: str) -> Optional[DataContract]:
        for key, contract in self.contracts.items():
            if contract.name == name and contract.status == "active":
                return contract
        return None
    
    def impact_analysis(self, contract_name: str) -> List[str]:
        contract = self.get_active_contract(contract_name)
        return contract.consumers if contract else []

6. SLA Monitoring & Alerting

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Callable

@dataclass
class SLAConfig:
    freshness_minutes: int = 60
    completeness_threshold: float = 0.99
    uniqueness_threshold: float = 0.999
    null_ratio_threshold: float = 0.05

class SLAMonitor:
    def __init__(self, config: SLAConfig, alert_fn: Callable):
        self.config = config
        self.alert_fn = alert_fn
    
    def check(self, df, contract_name: str) -> dict:
        results = {}
        
        # Freshness check
        if 'event_timestamp' in df.columns:
            max_ts = df['event_timestamp'].max()
            age_minutes = (datetime.now() - max_ts).total_seconds() / 60
            results['freshness'] = {
                'age_minutes': age_minutes,
                'sla_minutes': self.config.freshness_minutes,
                'passed': age_minutes <= self.config.freshness_minutes
            }
        
        # Completeness check
        total_rows = df.count()
        non_null_rows = df.dropna().count()
        completeness = non_null_rows / total_rows if total_rows > 0 else 0
        results['completeness'] = {
            'ratio': completeness,
            'threshold': self.config.completeness_threshold,
            'passed': completeness >= self.config.completeness_threshold
        }
        
        # Alert on failures
        for check, result in results.items():
            if not result['passed']:
                self.alert_fn(contract_name, check, result)
        
        return results

7. Contract Governance Framework

Data Contract LifecycleDraft→Review→Active→Deprecated→RetiredApprove + VersionMonitor SLAs

ℹ️

Best Practice: Treat data contracts like API contracts. Version them, enforce them in CI/CD, and deprecate old versions with migration periods.

Follow-Up Questions

  1. How would you implement data contracts for streaming pipelines?
  2. How do you handle contract violations in production?
  3. Design a contract registry that supports 10,000+ contracts.
  4. How do data contracts relate to data mesh architecture?
  5. How would you migrate from contract-less to contracted data?

Advertisement