Schema Registry & Evolution
Difficulty: Senior Level | Companies: LinkedIn, Uber, Netflix, Spotify, Confluent
Content
Schema Registry provides a centralized repository for managing schemas and enabling data compatibility across producers and consumers.
Architecture
Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Schema Registry Cluster β
β βββββββββββββββ βββββββββββββββ β
β β Master β β Slave β β
β β (Leader) β β (Follower) β β
β βββββββββββββββ βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Kafka Cluster β
β __schemas topic (internal) β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
Flow:
1. Producer registers schema with Schema Registry
2. Schema Registry assigns version ID
3. Producer sends data with schema ID (5 bytes)
4. Consumer fetches schema from Registry using ID
5. Consumer deserializes data
Wire Format
Architecture Diagram
Avro Wire Format:
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Magic Byte (1 byte) β Schema ID (4 bytes) β Avro Data β
β 0x0 β 0x00000001 β ... β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
Total overhead: 5 bytes per message
Avro Schema Definition
{
"type": "record",
"name": "Order",
"namespace": "com.example.events",
"fields": [
{
"name": "order_id",
"type": "string"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "amount",
"type": {
"type": "bytes",
"logicalType": "decimal",
"precision": 10,
"scale": 2
}
},
{
"name": "timestamp",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"name": "status",
"type": {
"type": "enum",
"name": "OrderStatus",
"symbols": ["PENDING", "CONFIRMED", "SHIPPED", "DELIVERED"]
}
},
{
"name": "metadata",
"type": ["null", "string"],
"default": null
}
]
}
Java Producer with Schema Registry
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericData;
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://schema-registry:8081");
Producer<String, GenericRecord> producer = new KafkaProducer<>(props);
// Create record using schema
Schema schema = new Schema.Parser().parse(
"{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"order_id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"
);
GenericRecord order = new GenericData.Record(schema);
order.put("order_id", "12345");
order.put("amount", 99.99);
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(
"orders",
"order-12345",
order
);
producer.send(record);
Python Producer with Schema Registry
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
# Define schema
order_schema = avro.loads('''
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "user_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "timestamp", "type": "long"}
]
}
''')
producer = AvroProducer({
'bootstrap.servers': 'kafka1:9092',
'schema.registry.url': 'http://schema-registry:8081'
})
# Create record
order = {
'order_id': '12345',
'user_id': 'user-42',
'amount': 99.99,
'timestamp': 1625097600000
}
producer.produce(
topic='orders',
key='order-12345',
value=order,
value_schema=order_schema
)
producer.flush()
Schema Evolution Strategies
Backward Compatible (Default)
// Original schema (v1)
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"}
]
}
// New schema (v2) - adds optional field
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "USD"}
]
}
Forward Compatible
// Original schema (v1)
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"}
]
}
// New schema (v2) - adds field with default
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "status", "type": "string", "default": "UNKNOWN"}
]
}
Full Compatible
// Combines backward and forward compatibility
// Both readers and writers can use old or new schema
// Requires: all new fields must have defaults
// no fields removed (only optional)
βΉοΈ
Key Insight: Backward compatibility ensures consumers with old schema can read data written with new schema. Forward compatibility ensures producers with new schema can write data readable by old consumers.
Compatibility Configuration
# Set compatibility level for subject
curl -X PUT http://schema-registry:8081/config/orders-value \
-H "Content-Type: application/json" \
-d '{"compatibility": "BACKWARD"}'
# Compatibility levels:
# NONE - No compatibility checking
# BACKWARD - New schema can read old data
# FORWARD - Old schema can read new data
# FULL - Both backward and forward compatible
# BACKWARD_TRANSITIVE - Backward compatible with all versions
# FORWARD_TRANSITIVE - Forward compatible with all versions
# FULL_TRANSITIVE - Full compatible with all versions
Protobuf Schema Example
syntax = "proto3";
message Order {
string order_id = 1;
string user_id = 2;
double amount = 3;
int64 timestamp = 4;
OrderStatus status = 5;
optional string currency = 6; // New field, optional
}
enum OrderStatus {
UNKNOWN = 0;
PENDING = 1;
CONFIRMED = 2;
SHIPPED = 3;
DELIVERED = 4;
}
JSON Schema Example
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Order",
"type": "object",
"properties": {
"order_id": {
"type": "string"
},
"amount": {
"type": "number"
},
"currency": {
"type": "string",
"default": "USD"
}
},
"required": ["order_id", "amount"]
}
Schema Registry Administration
# List subjects
curl http://schema-registry:8081/subjects
# Get schema versions
curl http://schema-registry:8081/subjects/orders-value/versions
# Get specific version
curl http://schema-registry:8081/subjects/orders-value/versions/1
# Delete subject
curl -X DELETE http://schema-registry:8081/subjects/orders-value
# Check compatibility
curl -X POST http://schema-registry:8081/compatibility/subjects/orders-value/versions/latest \
-H "Content-Type: application/json" \
-d '{"schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"order_id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"currency\",\"type\":\"string\",\"default\":\"USD\"}]}"}'
Follow-Up Questions
- What is the difference between backward and forward compatibility?
- How does Schema Registry handle schema evolution for Avro vs Protobuf?
- Explain the wire format for messages using Schema Registry.
- What happens when a producer uses a schema that is not compatible?
- How would you migrate from one schema version to another in production?