πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Schema Registry: Avro, Protobuf, Compatibility Modes

Apache KafkaSchema Registry⭐ Premium

Advertisement

Schema Registry: Avro, Protobuf, Compatibility Modes

Difficulty: Senior | Asked at: Confluent, LinkedIn, Netflix, Uber

ℹ️Interview Context

Schema Registry is essential for data governance in Kafka. Interviewers expect you to understand schema evolution, compatibility modes, and how to handle breaking changes gracefully.

The Question

How does Schema Registry work? Explain the difference between Avro, Protobuf, and JSON Schema. What are the compatibility modes and how do you handle schema evolution?

Schema Registry Architecture

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                Schema Registry Cluster                    β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚  Registry 1 β”‚  β”‚  Registry 2 β”‚  β”‚  Registry 3 β”‚   β”‚
β”‚  β”‚  (Leader)   β”‚  β”‚  (Follower) β”‚  β”‚  (Follower) β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚         ↑               ↑               ↑               β”‚
β”‚         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜               β”‚
β”‚                         β”‚                               β”‚
β”‚                    β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β”                          β”‚
β”‚                    β”‚ Kafka   β”‚                          β”‚
β”‚                    β”‚ Topic:  β”‚                          β”‚
β”‚                    β”‚ _schemasβ”‚                          β”‚
β”‚                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Producer/Consumer
     |
     +--> Schema Registry (HTTP API)
     |    GET/POST /subjects/{topic}/versions
     |
     +--> Kafka Topic
          Messages encoded with schema ID

Schema Formats

Avro Schema

{
  "type": "record",
  "name": "User",
  "namespace": "com.example",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": ["null", "string"], "default": null},
    {"name": "created_at", "type": "long", "logicalType": "timestamp-millis"}
  ]
}

Protobuf Schema

syntax = "proto3";
package com.example;

message User {
  int64 id = 1;
  string name = 2;
  optional string email = 3;
  int64 created_at = 4;
}

JSON Schema

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "User",
  "type": "object",
  "properties": {
    "id": {"type": "integer"},
    "name": {"type": "string"},
    "email": {"type": ["string", "null"]},
    "created_at": {"type": "integer"}
  },
  "required": ["id", "name"]
}

Schema Format Comparison

FeatureAvroProtobufJSON Schema
Binary encodingYesYesNo (text)
Schema evolutionExcellentGoodFair
Default valuesYesYesYes
Code generationYesYesLimited
PerformanceFastestFastSlowest
Human readableNoNoYes

ℹ️Format Selection

  • Avro: Best for Kafka. Excellent schema evolution, compact binary format, strong compatibility checking.
  • Protobuf: Good for gRPC integration. Slightly less flexible schema evolution.
  • JSON Schema: Best for human readability. Larger payloads, slower parsing.

Compatibility Modes

BACKWARD Compatible

# BACKWARD: New schema can read old data
# Old consumers can read new data

# Old schema (v1)
old_schema = {
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "id", "type": "long"},
        {"name": "name", "type": "string"}
    ]
}

# New schema (v2) - ADD field with default
new_schema = {
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "id", "type": "long"},
        {"name": "name", "type": "string"},
        {"name": "email", "type": ["null", "string"], "default": null}  # NEW
    ]
}

# BACKWARD compatible:
# - New schema can read old data (email defaults to null)
# - Old consumers can't read new data (they don't know about email)

FORWARD Compatible

# FORWARD: Old schema can read new data
# New consumers can read old data

# Old schema (v1)
old_schema = {
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "id", "type": "long"},
        {"name": "name", "type": "string"},
        {"name": "email", "type": ["null", "string"], "default": null}
    ]
}

# New schema (v2) - REMOVE field
new_schema = {
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "id", "type": "long"},
        {"name": "name", "type": "string"}
        # email removed
    ]
}

# FORWARD compatible:
# - Old schema can read new data (ignores unknown fields)
# - New consumers can read old data (email field ignored)

FULL Compatible

# FULL: Both backward and forward compatible

# Allowed changes:
# 1. Add field with default value
# 2. Remove field with default value
# 3. Rename field (with aliases)

# Example:
v1 = {
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "id", "type": "long"},
        {"name": "name", "type": "string"},
        {"name": "email", "type": "string"}
    ]
}

v2 = {
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "id", "type": "long"},
        {"name": "full_name", "type": "string", "aliases": ["name"]},  # Renamed
        {"name": "email_address", "type": "string", "aliases": ["email"]}  # Renamed
    ]
}

# FULL compatible:
# - Old data readable by new schema (uses aliases)
# - New data readable by old schema (uses original names)

Compatibility Mode Matrix

ModeAdd FieldRemove FieldRename FieldChange Type
BACKWARDYes (with default)NoNoNo
FORWARDNoYes (with default)NoNo
FULLYes (with default)Yes (with default)Yes (with aliases)No
NONEYesYesYesYes

⚠️Compatibility Warning

Setting compatibility mode to NONE disables all checks. This is dangerous in production. Always use BACKWARD, FORWARD, or FULL compatibility.

Schema Evolution Strategies

Adding Fields

# Strategy: Add field with default value
# This is always safe for BACKWARD compatibility

# v1
schema_v1 = {
    "type": "record",
    "name": "Order",
    "fields": [
        {"name": "order_id", "type": "string"},
        {"name": "amount", "type": "double"},
        {"name": "currency", "type": "string"}
    ]
}

# v2 - Add optional field
schema_v2 = {
    "type": "record",
    "name": "Order",
    "fields": [
        {"name": "order_id", "type": "string"},
        {"name": "amount", "type": "double"},
        {"name": "currency", "type": "string"},
        {"name": "discount_code", "type": ["null", "string"], "default": null}
    ]
}

# v3 - Add required field with default
schema_v3 = {
    "type": "record",
    "name": "Order",
    "fields": [
        {"name": "order_id", "type": "string"},
        {"name": "amount", "type": "double"},
        {"name": "currency", "type": "string"},
        {"name": "discount_code", "type": ["null", "string"], "default": null},
        {"name": "priority", "type": "string", "default": "standard"}
    ]
}

Removing Fields

# Strategy: Deprecate then remove
# Phase 1: Stop producing the field (but still accept it)
# Phase 2: Remove from schema after retention period

# v1 - Field exists
schema_v1 = {
    "type": "record",
    "name": "Order",
    "fields": [
        {"name": "order_id", "type": "string"},
        {"name": "amount", "type": "double"},
        {"name": "deprecated_field", "type": "string"}
    ]
}

# v2 - Field deprecated (still in schema for backward compat)
schema_v2 = {
    "type": "record",
    "name": "Order",
    "fields": [
        {"name": "order_id", "type": "string"},
        {"name": "amount", "type": "double"},
        {"name": "deprecated_field", "type": "string", "doc": "DEPRECATED: will be removed"}
    ]
}

# v3 - Field removed (after retention period)
schema_v3 = {
    "type": "record",
    "name": "Order",
    "fields": [
        {"name": "order_id", "type": "string"},
        {"name": "amount", "type": "double"}
    ]
}

Renaming Fields

# Strategy: Use aliases for backward compatibility
schema_v1 = {
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "id", "type": "long"},
        {"name": "name", "type": "string"}
    ]
}

schema_v2 = {
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "user_id", "type": "long", "aliases": ["id"]},
        {"name": "full_name", "type": "string", "aliases": ["name"]}
    ]
}

# Aliases allow old data to be read with new field names
# And new data to be read with old field names

Type Promotion

# Avro supports safe type promotions
# Int -> Long -> Float -> Double

schema_v1 = {
    "type": "record",
    "name": "Product",
    "fields": [
        {"name": "price", "type": "int"}  # $19.99 stored as 1999 cents
    ]
}

schema_v2 = {
    "type": "record",
    "name": "Product",
    "fields": [
        {"name": "price", "type": "long"}  # Promoted to long
    ]
}

# This is FULL compatible because:
# - Old int data can be read as long
# - New long data can be read as int (with truncation warning)

ℹ️Evolution Best Practice

  1. Always add fields with default values
  2. Never remove required fields without deprecation period
  3. Use aliases for renames
  4. Test compatibility before deploying schema changes

Schema Registry API

Register Schema

import requests
import json

class SchemaRegistryClient:
    def __init__(self, registry_url):
        self.registry_url = registry_url
    
    def register_schema(self, subject, schema, schema_type='AVRO'):
        """Register a new schema version"""
        url = f"{self.registry_url}/subjects/{subject}/versions"
        
        payload = {
            'schema': json.dumps(schema),
            'schemaType': schema_type
        }
        
        response = requests.post(url, json=payload)
        response.raise_for_status()
        
        return response.json()  # {'id': 1, 'subject': '...', 'version': 1}
    
    def get_schema(self, subject, version='latest'):
        """Get schema by version"""
        url = f"{self.registry_url}/subjects/{subject}/versions/{version}"
        response = requests.get(url)
        response.raise_for_status()
        return response.json()
    
    def get_schema_by_id(self, schema_id):
        """Get schema by ID"""
        url = f"{self.registry_url}/schemas/ids/{schema_id}"
        response = requests.get(url)
        response.raise_for_status()
        return response.json()
    
    def check_compatibility(self, subject, schema):
        """Check if schema is compatible"""
        url = f"{self.registry_url}/subjects/{subject}/compatibility"
        payload = {'schema': json.dumps(schema)}
        response = requests.post(url, json=payload)
        response.raise_for_status()
        return response.json()  # {'is_compatible': true}
    
    def set_compatibility_mode(self, subject, mode):
        """Set compatibility mode for subject"""
        url = f"{self.registry_url}/config/{subject}"
        payload = {'compatibility': mode}
        response = requests.put(url, json=payload)
        response.raise_for_status()

# Usage
client = SchemaRegistryClient('http://localhost:8081')

# Register schema
schema = {
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "id", "type": "long"},
        {"name": "name", "type": "string"}
    ]
}

result = client.register_schema('users-value', schema)
print(f"Registered schema with ID: {result['id']}")

# Check compatibility
is_compatible = client.check_compatibility('users-value', new_schema)
print(f"Compatible: {is_compatible['is_compatible']}")

Schema ID in Messages

# Messages encoded with schema ID prefix
# Format: [magic byte (0)] [4-byte schema ID] [avro data]

import struct

def encode_with_schema_id(schema_id, avro_data):
    """
    Encode message with schema ID prefix.
    
    Wire format:
    - Byte 0: Magic byte (0x00)
    - Bytes 1-4: Schema ID (big-endian int32)
    - Bytes 5+: Avro-encoded data
    """
    magic_byte = b'\x00'
    schema_id_bytes = struct.pack('>I', schema_id)
    
    return magic_byte + schema_id_bytes + avro_data

def decode_with_schema_id(encoded_message):
    """
    Decode message and extract schema ID.
    """
    magic_byte = encoded_message[0]
    schema_id = struct.unpack('>I', encoded_message[1:5])[0]
    avro_data = encoded_message[5:]
    
    return schema_id, avro_data

Schema Registry Metrics

# Key metrics to monitor
schema_metrics = {
    # Registration
    'schema-registrations': 'Total schema registrations',
    'schema-registration-rate': 'Registration rate per second',
    
    # Compatibility
    'compatibility-checks': 'Total compatibility checks',
    'compatibility-failures': 'Failed compatibility checks',
    
    # Performance
    'request-latency-avg': 'Average request latency',
    'request-rate': 'Request rate',
    'error-rate': 'Error rate',
    
    # Storage
    'schemas-count': 'Total registered schemas',
    'subjects-count': 'Total subjects'
}

⚠️Production Tip

Run Schema Registry in a cluster (3+ nodes) for high availability. It stores schemas in Kafka topic _schemas, so ensure that topic has sufficient replication factor.

Common Schema Evolution Patterns

# Pattern 1: Safe field addition
def add_optional_field(schema, field_name, field_type, default_value=None):
    """
    Add an optional field to schema.
    Always backward compatible.
    """
    new_field = {
        'name': field_name,
        'type': ['null', field_type] if default_value is None else field_type,
        'default': default_value
    }
    schema['fields'].append(new_field)
    return schema

# Pattern 2: Field deprecation
def deprecate_field(schema, field_name):
    """
    Mark field as deprecated.
    Field remains in schema for backward compatibility.
    """
    for field in schema['fields']:
        if field['name'] == field_name:
            field['doc'] = f'DEPRECATED: {field.get("doc", "")}'
            break
    return schema

# Pattern 3: Schema branching
def create_schema_branch(base_schema, branch_name):
    """
    Create a branch for experimental schema changes.
    """
    branched = base_schema.copy()
    branched['name'] = f"{base_schema['name']}_{branch_name}"
    return branched

ℹ️Key Insight

Schema Registry is not just for validation. It enables:

  1. Schema evolution without breaking consumers
  2. Data governance and documentation
  3. Cross-language compatibility (Avro/Protobuf)
  4. Schema versioning and lineage tracking

Advertisement