Apache Kafka Schema Registry: Avro Schemas, Compatibility, and Evolution
Architecture Diagram: Schema Registry Components
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β CONFLUENT SCHEMA REGISTRY ARCHITECTURE β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β CLIENT LAYER β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β β
β β β β Producer β β Consumer β β Streams β β Connect β β REST Proxy β β β β
β β β β (Avro/JSON) β β (Avro/JSON) β β (Avro/JSON) β β (Avro/JSON) β β (HTTP) β β β β
β β β β β β β β β β β β β β β β
β β β β ββββββββββ β β ββββββββββ β β ββββββββββ β β ββββββββββ β β ββββββββββ β β β β
β β β β βSchema β β β βSchema β β β βSchema β β β βSchema β β β βSchema β β β β β
β β β β βRegistryβ β β βRegistryβ β β βRegistryβ β β βRegistryβ β β βRegistryβ β β β β
β β β β βClient β β β βClient β β β βClient β β β βClient β β β βClient β β β β β
β β β β ββββββββββ β β ββββββββββ β β ββββββββββ β β ββββββββββ β β ββββββββββ β β β β
β β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β β
β β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β β HTTP REST API β β
β β β GET/POST/PUT/DELETE β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β SCHEMA REGISTRY CLUSTER β β β
β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β β β
β β β β β SR Node 1 βββββΊβ SR Node 2 βββββΊβ SR Node 3 β β β β β
β β β β β (Leader) β β (Follower) β β (Follower) β β β β β
β β β β β :8081 β β :8081 β β :8081 β β β β β
β β β β β β β β β β β β β β
β β β β β ββββββββββββ β β ββββββββββββ β β ββββββββββββ β β β β β
β β β β β β Schema β β β β Schema β β β β Schema β β β β β β
β β β β β β Registry β β β β Registry β β β β Registry β β β β β β
β β β β β β Cache β β β β Cache β β β β Cache β β β β β β
β β β β β ββββββββββββ β β ββββββββββββ β β ββββββββββββ β β β β β
β β β β β ββββββββββββ β β ββββββββββββ β β ββββββββββββ β β β β β
β β β β β βCompat. β β β βCompat. β β β βCompat. β β β β β β
β β β β β βChecker β β β βChecker β β β βChecker β β β β β β
β β β β β ββββββββββββ β β ββββββββββββ β β ββββββββββββ β β β β β
β β β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β STORAGE BACKEND β β β
β β β β β β β
β β β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β β
β β β β β Kafka Topic β β ZooKeeper β β PostgreSQL β β Elasticsearchβ β β β
β β β β β _schemas β β (metadata) β β (optional) β β (optional) β β β β
β β β β β β β β β β β β β β β
β β β β β Subject β β β Leader β β Custom β β Search β β β β
β β β β β Schema ID β β β Election β β Storage β β Indexing β β β β
β β β β β Schema β β Registrationβ β Backend β β Backend β β β β
β β β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β β
β β β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Architecture Diagram: Schema Evolution Compatibility Matrix
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β SCHEMA EVOLUTION COMPATIBILITY MATRIX β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β COMPATIBILITY MODE: BACKWARD β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β New Schema can be read by OLD consumers? β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β β β
β β β β Schema v1: { "name": "string" } Schema v2: { "name": "string", "age": "int" } β β β β
β β β β β β β β
β β β β β
BACKWARD COMPATIBLE: Old consumer can read new data (age field has default) β β β β
β β β β β FORWARD INCOMPATIBLE: New consumer cannot read old data (age field missing) β β β β
β β β β β β β β
β β β β Use case: Evolve schema without updating all consumers immediately β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β COMPATIBILITY MODE: FORWARD β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β New Schema can be written by OLD producers? β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β β β
β β β β Schema v1: { "name": "string" } Schema v2: { "name": "string", "age": "int" } β β β β
β β β β β β β β
β β β β β BACKWARD INCOMPATIBLE: Old consumer cannot read new data (age field missing in old) β β β β
β β β β β
FORWARD COMPATIBLE: New consumer can read old data (age field has default) β β β β
β β β β β β β β
β β β β Use case: Update producers first, then consumers β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β COMPATIBILITY MODE: FULL β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β Both backward AND forward compatible β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β β β
β β β β Schema v1: { "name": "string" } Schema v2: { "name": "string", "age": "int" } β β β β
β β β β β β β β
β β β β β
BACKWARD COMPATIBLE: Old consumer can read new data (age field has default) β β β β
β β β β β
FORWARD COMPATIBLE: New consumer can read old data (age field has default) β β β β
β β β β β β β β
β β β β Use case: Mixed consumer/producer versions, zero-downtime deployments β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β COMPATIBILITY MODE: NONE β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β No compatibility checks β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β β β
β β β β Schema v1: { "name": "string" } Schema v2: { "email": "string" } β β β β
β β β β β β β β
β β β β β οΈ NO GUARANTEES: Breaking changes allowed β β β β
β β β β β β β β
β β β β Use case: Development only, never in production β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Architecture Diagram: Schema Serialization Flow
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β SCHEMA SERIALIZATION & DESERIALIZATION FLOW β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β PRODUCER SIDE (Serialization) β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β β
β β β β Avro β β Schema β β Schema β β Wire β β Kafka β β Broker β β β β
β β β β Record βββββΊβ Lookup βββββΊβ Encode βββββΊβ Format βββββΊβ Send βββββΊβ Store β β β β
β β β β (POJO) β β (cache) β β (Avro) β β (bytes) β β β β β β β β
β β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β β
β β β β β β β β β β β β
β β β βΌ βΌ βΌ βΌ βΌ βΌ β β β
β β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β β
β β β β User β β Check β β Binary β β [MAGIC_0]β β Record β β Log β β β β
β β β β Object β β registry β β encode β β [ID: 4B] β β to β β Segment β β β β
β β β β β β for β β with β β [AVRO] β β binary β β β β β β
β β β β {"name": β β schema β β schema β β [payload]β β β β β β β β
β β β β "John"} β β ID β β ID β β β β β β β β β β
β β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β β
β β β β β β
β β β Wire Format: β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Byte 0: Magic byte (0x0) β β β β
β β β β Bytes 1-4: Schema ID (4-byte integer, big-endian) β β β β
β β β β Bytes 5-N: Avro binary encoded payload β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β CONSUMER SIDE (Deserialization) β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β β
β β β β Broker β β Kafka β β Wire β β Schema β β Schema β β Avro β β β β
β β β β Fetch βββββΊβ Record βββββΊβ Parse βββββΊβ Lookup βββββΊβ Decode βββββΊβ Record β β β β
β β β β β β β β β β (cache) β β (Avro) β β (POJO) β β β β
β β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β β
β β β β β β β β β β β β
β β β βΌ βΌ βΌ βΌ βΌ βΌ β β β
β β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β β
β β β β Read β β Binary β β Extract β β Fetch β β Binary β β User β β β β
β β β β from β β message β β schema β β schema β β decode β β Object β β β β
β β β β log β β β β ID β β from β β with β β β β β β
β β β β β β β β β β registry β β schema β β {"name": β β β β
β β β β β β β β β β β β β β "John"} β β β β
β β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β β
β β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββmodeβββββββββββββββ
Formal Definitions
DfSchema Compatibility
Schema compatibility is a set of rules that govern how schemas can evolve over time without breaking existing producers or consumers. The Schema Registry enforces compatibility checks before registering a new schema version. Compatibility modes include BACKWARD (new schema readable by old consumers), FORWARD (new consumer reads old data), FULL (both), and NONE (no checks).
DfWire Format
The wire format is the binary encoding of a message in Kafka that includes a 5-byte header: 1 magic byte (0x0) + 4-byte schema ID (big-endian integer) + Avro/JSON/Protobuf binary payload. This format enables consumers to look up the schema from the registry using the embedded ID for deserialization, decoupling producers and consumers through schema evolution.
DfSubject Name Strategy
A subject name strategy determines how schemas are organized and versioned in the registry. TopicNameStrategy (default) uses the topic name as the subject, meaning all records in a topic share one schema lineage. TopicRecordNameStrategy combines topic + record name, allowing multiple record types per topic. RecordNameStrategy uses only the record name, enabling schema sharing across topics.
Key Formulas
Schema Storage Cost
Here,
- =Number of registered subjects
- =Average schema versions per subject
- =Average schema size in bytes
Schema Registry stores all schemas in the _schemas topic (replicated for HA). Only the leader handles writes (registration); all nodes serve reads (retrieval). The client caches schemas locally (IDβschema and schemaβID mappings) to minimize round trips. Cache hit rate is typically >99.9% in steady state.
For production: always use auto.register.schemas=false to prevent accidental schema registration. Use use.latest.version=true for automatic schema evolution. Test compatibility with the registry's compatibility endpoint before deploying schema changes.
ThSchema Evolution Invariants
The following invariants must hold for safe schema evolution:
- Additive fields (new fields with defaults) are backward compatible β old consumers ignore unknown fields.
- Removing fields requires the field to have a default value β otherwise old consumers will fail when reading new data (forward incompatibility).
- Type changes are only compatible if the new type is wire-compatible (e.g., intβlong is safe; stringβint is not).
- BACKWARD compatibility requires: for every field in the new schema that did not exist in the old schema, the new field must have a default value.
- FORWARD compatibility requires: for every field in the old schema that is removed in the new schema, the old field must have had a default value.
Detailed Explanation
Confluent Schema Registry is a centralized schema management service for Kafka that enables schema evolution while maintaining data quality and compatibility. It stores schemas in a _schemas topic (replicated across the cluster for high availability) and provides a REST API for schema registration, retrieval, and compatibility checking. The registry acts as a schema repository that decouples producers and consumers through schema IDs embedded in message payloads, enabling independent evolution of data schemas.
Schema Registration and Storage: When a producer registers a schema, the registry assigns it a unique integer ID and stores the schema definition (Avro, JSON Schema, or Protobuf) associated with the subject name. The subject name can follow different strategies: TopicNameStrategy (default), TopicRecordNameStrategy, or RecordNameStrategy. The registry checks compatibility before registration, ensuring that the new schema is compatible with the existing schema version according to the configured compatibility mode. The schema is then stored in the _schemas topic and cached in the registry's local cache.
Compatibility Modes: The registry supports several compatibility modes to control how schemas can evolve. BACKWARD compatibility ensures that new schemas can be read by old consumers (new schema is a superset of old). FORWARD compatibility ensures that new consumers can read old data (old schema is a subset of new). FULL compatibility ensures both backward and forward compatibility. NONE disables compatibility checking (not recommended for production). BACKWARD_TRANSITIVE ensures backward compatibility across all historical versions, while FORWARD_TRANSITIVE ensures forward compatibility across all versions.
Wire Format and Schema IDs: Messages in Kafka include a 5-byte header: the first byte is a magic byte (0x0), followed by a 4-byte schema ID (big-endian integer). This design allows consumers to deserialize messages by looking up the schema from the registry using the embedded ID. The registry client caches schemas locally, reducing latency and network overhead. The wire format is format-agnostic, supporting Avro, JSON Schema, and Protobuf payloads.
Schema Evolution Strategies: There are several patterns for evolving schemas in production. Additive Evolution adds new fields with default values (backward compatible). Field Deprecation marks fields as deprecated without removing them (forward compatible). Type Evolution changes field types (requires careful planning). Schema Branching uses TopicRecordNameStrategy to allow multiple schemas per topic. Each strategy has different compatibility implications and should be chosen based on the deployment model and consumer rollout strategy.
Schema Registry Client Configuration: Clients connect to the registry via HTTP REST API. The client maintains a local cache of schemas (both ID-to-schema and schema-to-ID mappings) to minimize round trips. The schema.registry.url configuration specifies the registry endpoints (comma-separated for high availability). The client supports basic authentication, SSL, and integration with Confluent Cloud. For Kafka Streams and Connect, the registry integrates via converters (AvroConverter, JsonSchemaConverter, ProtobufConverter).
Subject Name Strategies: The subject name determines how schemas are organized in the registry. TopicNameStrategy uses the topic name as the subject (default), meaning all records in a topic share a schema. TopicRecordNameStrategy combines topic name with record name, allowing multiple record types per topic. RecordNameStrategy uses only the record name, enabling schema sharing across topics. The choice affects compatibility checking and schema organization.
Schema Registry High Availability: The registry runs as a cluster with leader election (using ZooKeeper or KRaft). Only the leader handles writes (schema registration), while all nodes can serve reads (schema retrieval). Follower nodes replicate the _schemas topic to stay synchronized. If the leader fails, a new leader is elected automatically. The registry supports horizontal scaling for read-heavy workloads, but write scalability is limited by the single-leader design.
Integration with Confluent Platform: Schema Registry integrates deeply with Confluent Platform components. Kafka Connect uses converters that automatically register and retrieve schemas. Kafka Streams supports schema evolution through the same converter mechanism. Confluent Control Center provides a UI for managing schemas and monitoring compatibility. Confluent Cloud offers managed Schema Registry with additional features like schema linking and schema import/export.
Key Concepts Table
| Component | Description | Key Configurations | Use Cases |
|---|---|---|---|
| Subject | Schema namespace (topic or record name) | subject.name.strategy | Schema organization |
| Schema ID | Unique integer identifier for schema | Auto-assigned | Wire format, caching |
| Compatibility Mode | Rules for schema evolution | compatibility.level | Data quality assurance |
| Wire Format | 5-byte header (magic + schema ID) | Automatic | Message serialization |
| Schema Cache | Local cache of schemas | schema.registry.cache.config | Performance optimization |
| Subject Name Strategy | How subjects are named | topic.name.strategy | Schema organization |
| Compatibility Checker | Validates schema evolution | compatibility.level | Prevent breaking changes |
| Schema Reference | Dependencies between schemas | references | Complex schema hierarchies |
| Mode | PERMISSIVE or READONLY | mode | Schema management control |
| Schema Linking | Sync schemas across clusters | schema.links | Multi-cluster, migration |
Code Examples
Schema Registration with Avro
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import java.util.Properties;
public class SchemaRegistrationExample {
public static void main(String[] args) throws Exception {
// Define Avro schema
Schema schema = SchemaBuilder.builder("com.example.orders")
.record("Order")
.namespace("com.example.orders")
.fields()
.name("orderId")
.type()
.stringType()
.noDefault()
.name("userId")
.type()
.stringType()
.noDefault()
.name("amount")
.type()
.doubleType()
.noDefault()
.name("currency")
.type()
.stringType()
.stringDefault("USD")
.name("status")
.type()
.stringType()
.stringDefault("PENDING")
.name("createdAt")
.type()
.longType()
.noDefault()
.name("metadata")
.type()
.nullable()
.map()
.values()
.stringType()
.nullDefault()
.endRecord();
// Create Schema Registry client
Properties props = new Properties();
props.put("schema.registry.url", "http://localhost:8081");
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", "admin:secret");
SchemaRegistryClient client = new RestService(props);
// Register schema
String subject = "order-events-value";
AvroSchema avroSchema = new AvroSchema(schema);
int schemaId = client.register(subject, avroSchema);
System.out.println("Registered schema with ID: " + schemaId);
// Get schema metadata
SchemaMetadata metadata = client.getSchemaMetadata(subject, 1);
System.out.println("Schema version: " + metadata.getVersion());
System.out.println("Schema ID: " + metadata.getSchemaId());
// Check compatibility
boolean isCompatible = client.testCompatibility(subject, avroSchema);
System.out.println("Schema is compatible: " + isCompatible);
}
}
Avro Producer with Schema Registry
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class AvroProducerExample {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
// Schema Registry configuration
props.put("schema.registry.url", "http://localhost:8081");
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", "admin:secret");
// Avro-specific configurations
props.put("avro.use.logical.type.converters", true);
props.put("specific.avro.reader", false);
KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props);
// Define schema inline
Schema schema = new Schema.Parser().parse(
"{\"type\":\"record\",\"name\":\"Order\"," +
"\"namespace\":\"com.example\"," +
"\"fields\":[" +
"{\"name\":\"orderId\",\"type\":\"string\"}," +
"{\"name\":\"userId\",\"type\":\"string\"}," +
"{\"name\":\"amount\",\"type\":\"double\"}," +
"{\"name\":\"status\",\"type\":\"string\",\"default\":\"PENDING\"}" +
"]}");
for (int i = 0; i < 100; i++) {
// Create Avro record
GenericRecord order = new GenericData.Record(schema);
order.put("orderId", "order-" + i);
order.put("userId", "user-" + (i % 10));
order.put("amount", Math.random() * 1000);
order.put("status", "PENDING");
ProducerRecord<String, GenericRecord> record =
new ProducerRecord<>("order-events", "order-" + i, order);
// Add headers
record.headers().add("schema.version", "1".getBytes());
record.headers().add("content-type", "avro".getBytes());
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Error: " + exception.getMessage());
} else {
System.out.printf("Sent to partition %d, offset %d%n",
metadata.partition(), metadata.offset());
}
});
}
producer.flush();
producer.close();
}
}
Avro Consumer with Schema Evolution
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class AvroConsumerExample {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "avro-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
// Schema Registry configuration
props.put("schema.registry.url", "http://localhost:8081");
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", "admin:secret");
// Avro deserializer configuration
props.put("specific.avro.reader", false);
props.put("avro.use.logical.type.converters", true);
props.put("auto.register.schemas", false); // Don't auto-register
props.put("use.latest.version", true); // Use latest compatible schema
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("order-events"));
try {
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, GenericRecord> record : records) {
GenericRecord order = record.value();
// Access fields safely (handle schema evolution)
String orderId = order.get("orderId").toString();
String userId = order.get("userId").toString();
double amount = (double) order.get("amount");
// Handle new fields with defaults
String status = "PENDING";
if (order.get("status") != null) {
status = order.get("status").toString();
}
// Handle optional fields
String metadata = "";
if (order.get("metadata") != null) {
@SuppressWarnings("unchecked")
Map<String, String> metaMap =
(Map<String, String>) order.get("metadata");
metadata = metaMap.toString();
}
System.out.printf("Order: %s, User: %s, Amount: %.2f, Status: %s%n",
orderId, userId, amount, status);
}
consumer.commitSync();
}
} finally {
consumer.close();
}
}
}
Schema Evolution Script
#!/bin/bash
# Schema Registry management script
SCHEMA_REGISTRY_URL="http://localhost:8081"
AUTH="admin:secret"
echo "=== SCHEMA REGISTRY MANAGEMENT ==="
echo "Timestamp: $(date)"
echo ""
# List all subjects
echo "--- All Subjects ---"
curl -s -u $AUTH "$SCHEMA_REGISTRY_URL/subjects" | jq .
echo ""
echo "--- Get Subject Schema ---"
SUBJECT="order-events-value"
SCHEMA=$(curl -s -u $AUTH "$SCHEMA_REGISTRY_URL/subjects/$SUBJECT/versions/latest")
echo "Subject: $SUBJECT"
echo "Schema: $SCHEMA" | jq .
echo ""
echo "--- Compatibility Check ---"
NEW_SCHEMA='{
"type": "record",
"name": "Order",
"namespace": "com.example",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "userId", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "status", "type": "string", "default": "PENDING"},
{"name": "priority", "type": "string", "default": "NORMAL"}
]
}'
COMPATIBILITY=$(curl -s -u $AUTH \
-X POST \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d "{\"schema\": $(echo $NEW_SCHEMA | jq -Rs .)}" \
"$SCHEMA_REGISTRY_URL/compatibility/subjects/$SUBJECT/versions/latest")
echo "Compatibility result: $COMPATIBILITY" | jq .
echo ""
echo "--- Register New Schema ---"
if [ $(echo $COMPATIBILITY | jq -r '.is_compatible') == "true" ]; then
REGISTRATION=$(curl -s -u $AUTH \
-X POST \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d "{\"schema\": $(echo $NEW_SCHEMA | jq -Rs .)}" \
"$SCHEMA_REGISTRY_URL/subjects/$SUBJECT/versions")
echo "Registration result: $REGISTRATION" | jq .
else
echo "Schema is NOT compatible - cannot register"
fi
echo ""
echo "--- Schema Compatibility Modes ---"
curl -s -u $AUTH "$SCHEMA_REGISTRY_URL/config" | jq .
echo ""
echo "--- Set Compatibility Mode ---"
curl -u $AUTH \
-X PUT \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "BACKWARD"}' \
"$SCHEMA_REGISTRY_URL/config/$SUBJECT"
echo ""
echo "--- List Schema Versions ---"
curl -s -u $AUTH "$SCHEMA_REGISTRY_URL/subjects/$SUBJECT/versions" | jq .
echo ""
echo "--- Get Specific Version ---"
VERSION=1
curl -s -u $AUTH "$SCHEMA_REGISTRY_URL/subjects/$SUBJECT/versions/$VERSION" | jq .
Kafka Streams with Schema Registry
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
import java.util.HashMap;
import java.util.Map;
public class StreamsSchemaRegistryExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "schema-registry-streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// Schema Registry configuration
props.put("schema.registry.url", "http://localhost:8081");
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", "admin:secret");
// Use GenericAvroSerde for Avro values
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
StreamsBuilder builder = new StreamsBuilder();
// Configure Avro serde
Map<String, String> serdeConfig = new HashMap<>();
serdeConfig.put("schema.registry.url", "http://localhost:8081");
serdeConfig.put("basic.auth.credentials.source", "USER_INFO");
serdeConfig.put("basic.auth.user.info", "admin:secret");
GenericAvroSerde avroSerde = new GenericAvroSerde();
avroSerde.configure(serdeConfig, false);
// Source stream with Avro deserialization
KStream<String, GenericRecord> orders = builder.stream("order-events",
Consumed.with(Serdes.String(), avroSerde));
// Transform and filter
KStream<String, GenericRecord> highValueOrders = orders
.filter((key, order) -> {
double amount = (double) order.get("amount");
return amount > 1000;
})
.mapValues(order -> {
// Create new Avro record with evolved schema
Schema newSchema = SchemaBuilder.builder("com.example")
.record("HighValueOrder")
.fields()
.name("orderId")
.type(order.getSchema().getField("orderId").schema())
.noDefault()
.name("userId")
.type(order.getSchema().getField("userId").schema())
.noDefault()
.name("amount")
.type(order.getSchema().getField("amount").schema())
.noDefault()
.name("verified")
.type()
.booleanType()
.booleanDefault(false)
.endRecord();
GenericRecord highValueOrder = new GenericData.Record(newSchema);
highValueOrder.put("orderId", order.get("orderId"));
highValueOrder.put("userId", order.get("userId"));
highValueOrder.put("amount", order.get("amount"));
highValueOrder.put("verified", false);
return highValueOrder;
});
// Sink with Avro serialization
highValueOrders.to("high-value-orders",
Produced.with(Serdes.String(), avroSerde));
// Aggregation with windowing
KTable<Windowed<String>, Long> orderCounts = orders
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
"order-count-store")
.withValueSerde(Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Performance Metrics
| Operation | Latency (p99) | Throughput | Cache Hit Rate | Memory Usage |
|---|---|---|---|---|
| Schema Registration | 100ms | 100/sec | N/A | 100MB |
| Schema Retrieval (cache hit) | 1ms | 1M/sec | 99.9% | 100MB |
| Schema Retrieval (cache miss) | 50ms | 10K/sec | 0.1% | 100MB |
| Compatibility Check | 10ms | 10K/sec | N/A | 100MB |
| Avro Serialization | 0.1ms | 10M/sec | N/A | 64MB |
| Avro Deserialization | 0.1ms | 10M/sec | N/A | 64MB |
| Registry Cluster (3 nodes) | 5ms | 300K/sec | 99.9% | 300MB |
| Registry Cluster (5 nodes) | 5ms | 500K/sec | 99.9% | 500MB |
Best Practices
-
Compatibility Mode Selection: Use
BACKWARDcompatibility for most use cases (allows old consumers to read new data). UseFULLcompatibility when you need maximum flexibility. AvoidNONEin production. -
Subject Name Strategy: Use
TopicRecordNameStrategyfor topics with multiple record types. UseTopicNameStrategyfor simple topics. UseRecordNameStrategyfor schema sharing across topics. -
Schema Evolution Pattern: Always add new fields with default values (backward compatible). Never remove fields (breaks compatibility). Never change field types (breaks compatibility). Deprecate fields by adding new ones with different names.
-
Wire Format Efficiency: Use Avro for compact binary encoding (smallest payload). Use JSON Schema for human-readable debugging. Use Protobuf for cross-language compatibility. Consider schema size impact on network bandwidth.
-
Registry High Availability: Run Schema Registry in a cluster (3+ nodes) for production. Use a load balancer in front of the cluster. Monitor leader election and failover. Test failover scenarios regularly.
-
Client Configuration: Enable schema caching (
schema.registry.cache.config). Useauto.register.schemas=falsein production to prevent accidental schema registration. Useuse.latest.version=truefor automatic schema evolution. -
Monitoring: Track
schema-registry-request-rate,schema-registry-error-rate,schema-registry-cache-hit-ratio. Monitor schema compatibility failures. Alert on schema registration errors. -
Testing: Test schema evolution with realistic data. Verify backward and forward compatibility. Use schema registry test utilities. Test with multiple consumer versions.
-
Security: Enable authentication for schema registry access. Use SSL for encrypted communication. Implement ACLs for schema modification. Audit schema registration and compatibility checks.
-
Migration: Use schema linking for multi-cluster migrations. Plan schema evolution in advance. Document schema changes. Coordinate with all consumer teams before schema changes.
Key Takeaways:
- Wire format = magic byte (0x0) + 4-byte schema ID + binary payload; enables schema lookup by ID
- BACKWARD compatibility: new schema readable by old consumers; FORWARD: new consumer reads old data; FULL: both
- Additive evolution (new fields with defaults) is always safe; removing fields requires defaults; type changes need wire compatibility
- Client-side caching achieves >99.9% cache hit rate; schema retrieval latency is ~1ms with cache, ~50ms without
- auto.register.schemas=false prevents accidental registration in production
- Schema Registry cluster (3+ nodes) provides HA; only leader handles writes, all nodes serve reads
See also: Data Engineering Pipeline patterns (data-engineering/019) | PySpark Structured Streaming (pyspark/11-structured-streaming)