Schema Registry: Avro, Protobuf, Compatibility Modes
Difficulty: Senior | Asked at: Confluent, LinkedIn, Netflix, Uber
βΉοΈInterview Context
Schema Registry is essential for data governance in Kafka. Interviewers expect you to understand schema evolution, compatibility modes, and how to handle breaking changes gracefully.
The Question
How does Schema Registry work? Explain the difference between Avro, Protobuf, and JSON Schema. What are the compatibility modes and how do you handle schema evolution?
Schema Registry Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Schema Registry Cluster β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Registry 1 β β Registry 2 β β Registry 3 β β
β β (Leader) β β (Follower) β β (Follower) β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β β β β
β βββββββββββββββββΌββββββββββββββββ β
β β β
β ββββββ΄βββββ β
β β Kafka β β
β β Topic: β β
β β _schemasβ β
β βββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Producer/Consumer
|
+--> Schema Registry (HTTP API)
| GET/POST /subjects/{topic}/versions
|
+--> Kafka Topic
Messages encoded with schema ID
Schema Formats
Avro Schema
{
"type": "record",
"name": "User",
"namespace": "com.example",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null},
{"name": "created_at", "type": "long", "logicalType": "timestamp-millis"}
]
}
Protobuf Schema
syntax = "proto3";
package com.example;
message User {
int64 id = 1;
string name = 2;
optional string email = 3;
int64 created_at = 4;
}
JSON Schema
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "User",
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"},
"email": {"type": ["string", "null"]},
"created_at": {"type": "integer"}
},
"required": ["id", "name"]
}
Schema Format Comparison
| Feature | Avro | Protobuf | JSON Schema |
|---|---|---|---|
| Binary encoding | Yes | Yes | No (text) |
| Schema evolution | Excellent | Good | Fair |
| Default values | Yes | Yes | Yes |
| Code generation | Yes | Yes | Limited |
| Performance | Fastest | Fast | Slowest |
| Human readable | No | No | Yes |
βΉοΈFormat Selection
- Avro: Best for Kafka. Excellent schema evolution, compact binary format, strong compatibility checking.
- Protobuf: Good for gRPC integration. Slightly less flexible schema evolution.
- JSON Schema: Best for human readability. Larger payloads, slower parsing.
Compatibility Modes
BACKWARD Compatible
# BACKWARD: New schema can read old data
# Old consumers can read new data
# Old schema (v1)
old_schema = {
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"}
]
}
# New schema (v2) - ADD field with default
new_schema = {
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null} # NEW
]
}
# BACKWARD compatible:
# - New schema can read old data (email defaults to null)
# - Old consumers can't read new data (they don't know about email)
FORWARD Compatible
# FORWARD: Old schema can read new data
# New consumers can read old data
# Old schema (v1)
old_schema = {
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null}
]
}
# New schema (v2) - REMOVE field
new_schema = {
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"}
# email removed
]
}
# FORWARD compatible:
# - Old schema can read new data (ignores unknown fields)
# - New consumers can read old data (email field ignored)
FULL Compatible
# FULL: Both backward and forward compatible
# Allowed changes:
# 1. Add field with default value
# 2. Remove field with default value
# 3. Rename field (with aliases)
# Example:
v1 = {
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"}
]
}
v2 = {
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "full_name", "type": "string", "aliases": ["name"]}, # Renamed
{"name": "email_address", "type": "string", "aliases": ["email"]} # Renamed
]
}
# FULL compatible:
# - Old data readable by new schema (uses aliases)
# - New data readable by old schema (uses original names)
Compatibility Mode Matrix
| Mode | Add Field | Remove Field | Rename Field | Change Type |
|---|---|---|---|---|
| BACKWARD | Yes (with default) | No | No | No |
| FORWARD | No | Yes (with default) | No | No |
| FULL | Yes (with default) | Yes (with default) | Yes (with aliases) | No |
| NONE | Yes | Yes | Yes | Yes |
β οΈCompatibility Warning
Setting compatibility mode to NONE disables all checks. This is dangerous in production. Always use BACKWARD, FORWARD, or FULL compatibility.
Schema Evolution Strategies
Adding Fields
# Strategy: Add field with default value
# This is always safe for BACKWARD compatibility
# v1
schema_v1 = {
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string"}
]
}
# v2 - Add optional field
schema_v2 = {
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string"},
{"name": "discount_code", "type": ["null", "string"], "default": null}
]
}
# v3 - Add required field with default
schema_v3 = {
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string"},
{"name": "discount_code", "type": ["null", "string"], "default": null},
{"name": "priority", "type": "string", "default": "standard"}
]
}
Removing Fields
# Strategy: Deprecate then remove
# Phase 1: Stop producing the field (but still accept it)
# Phase 2: Remove from schema after retention period
# v1 - Field exists
schema_v1 = {
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "deprecated_field", "type": "string"}
]
}
# v2 - Field deprecated (still in schema for backward compat)
schema_v2 = {
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "deprecated_field", "type": "string", "doc": "DEPRECATED: will be removed"}
]
}
# v3 - Field removed (after retention period)
schema_v3 = {
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"}
]
}
Renaming Fields
# Strategy: Use aliases for backward compatibility
schema_v1 = {
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"}
]
}
schema_v2 = {
"type": "record",
"name": "User",
"fields": [
{"name": "user_id", "type": "long", "aliases": ["id"]},
{"name": "full_name", "type": "string", "aliases": ["name"]}
]
}
# Aliases allow old data to be read with new field names
# And new data to be read with old field names
Type Promotion
# Avro supports safe type promotions
# Int -> Long -> Float -> Double
schema_v1 = {
"type": "record",
"name": "Product",
"fields": [
{"name": "price", "type": "int"} # $19.99 stored as 1999 cents
]
}
schema_v2 = {
"type": "record",
"name": "Product",
"fields": [
{"name": "price", "type": "long"} # Promoted to long
]
}
# This is FULL compatible because:
# - Old int data can be read as long
# - New long data can be read as int (with truncation warning)
βΉοΈEvolution Best Practice
- Always add fields with default values
- Never remove required fields without deprecation period
- Use aliases for renames
- Test compatibility before deploying schema changes
Schema Registry API
Register Schema
import requests
import json
class SchemaRegistryClient:
def __init__(self, registry_url):
self.registry_url = registry_url
def register_schema(self, subject, schema, schema_type='AVRO'):
"""Register a new schema version"""
url = f"{self.registry_url}/subjects/{subject}/versions"
payload = {
'schema': json.dumps(schema),
'schemaType': schema_type
}
response = requests.post(url, json=payload)
response.raise_for_status()
return response.json() # {'id': 1, 'subject': '...', 'version': 1}
def get_schema(self, subject, version='latest'):
"""Get schema by version"""
url = f"{self.registry_url}/subjects/{subject}/versions/{version}"
response = requests.get(url)
response.raise_for_status()
return response.json()
def get_schema_by_id(self, schema_id):
"""Get schema by ID"""
url = f"{self.registry_url}/schemas/ids/{schema_id}"
response = requests.get(url)
response.raise_for_status()
return response.json()
def check_compatibility(self, subject, schema):
"""Check if schema is compatible"""
url = f"{self.registry_url}/subjects/{subject}/compatibility"
payload = {'schema': json.dumps(schema)}
response = requests.post(url, json=payload)
response.raise_for_status()
return response.json() # {'is_compatible': true}
def set_compatibility_mode(self, subject, mode):
"""Set compatibility mode for subject"""
url = f"{self.registry_url}/config/{subject}"
payload = {'compatibility': mode}
response = requests.put(url, json=payload)
response.raise_for_status()
# Usage
client = SchemaRegistryClient('http://localhost:8081')
# Register schema
schema = {
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"}
]
}
result = client.register_schema('users-value', schema)
print(f"Registered schema with ID: {result['id']}")
# Check compatibility
is_compatible = client.check_compatibility('users-value', new_schema)
print(f"Compatible: {is_compatible['is_compatible']}")
Schema ID in Messages
# Messages encoded with schema ID prefix
# Format: [magic byte (0)] [4-byte schema ID] [avro data]
import struct
def encode_with_schema_id(schema_id, avro_data):
"""
Encode message with schema ID prefix.
Wire format:
- Byte 0: Magic byte (0x00)
- Bytes 1-4: Schema ID (big-endian int32)
- Bytes 5+: Avro-encoded data
"""
magic_byte = b'\x00'
schema_id_bytes = struct.pack('>I', schema_id)
return magic_byte + schema_id_bytes + avro_data
def decode_with_schema_id(encoded_message):
"""
Decode message and extract schema ID.
"""
magic_byte = encoded_message[0]
schema_id = struct.unpack('>I', encoded_message[1:5])[0]
avro_data = encoded_message[5:]
return schema_id, avro_data
Schema Registry Metrics
# Key metrics to monitor
schema_metrics = {
# Registration
'schema-registrations': 'Total schema registrations',
'schema-registration-rate': 'Registration rate per second',
# Compatibility
'compatibility-checks': 'Total compatibility checks',
'compatibility-failures': 'Failed compatibility checks',
# Performance
'request-latency-avg': 'Average request latency',
'request-rate': 'Request rate',
'error-rate': 'Error rate',
# Storage
'schemas-count': 'Total registered schemas',
'subjects-count': 'Total subjects'
}
β οΈProduction Tip
Run Schema Registry in a cluster (3+ nodes) for high availability. It stores schemas in Kafka topic _schemas, so ensure that topic has sufficient replication factor.
Common Schema Evolution Patterns
# Pattern 1: Safe field addition
def add_optional_field(schema, field_name, field_type, default_value=None):
"""
Add an optional field to schema.
Always backward compatible.
"""
new_field = {
'name': field_name,
'type': ['null', field_type] if default_value is None else field_type,
'default': default_value
}
schema['fields'].append(new_field)
return schema
# Pattern 2: Field deprecation
def deprecate_field(schema, field_name):
"""
Mark field as deprecated.
Field remains in schema for backward compatibility.
"""
for field in schema['fields']:
if field['name'] == field_name:
field['doc'] = f'DEPRECATED: {field.get("doc", "")}'
break
return schema
# Pattern 3: Schema branching
def create_schema_branch(base_schema, branch_name):
"""
Create a branch for experimental schema changes.
"""
branched = base_schema.copy()
branched['name'] = f"{base_schema['name']}_{branch_name}"
return branched
βΉοΈKey Insight
Schema Registry is not just for validation. It enables:
- Schema evolution without breaking consumers
- Data governance and documentation
- Cross-language compatibility (Avro/Protobuf)
- Schema versioning and lineage tracking