Replication & ISR Deep Dive
Difficulty: Senior Level | Companies: LinkedIn, Uber, Netflix, Spotify, Confluent
Content
Replication ensures data durability and availability in Kafka. The ISR (In-Sync Replicas) mechanism is central to understanding how Kafka maintains consistency.
Replication Architecture
Architecture Diagram
Topic: orders (replication-factor=3)
Partition 0:
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Broker 1 (Leader) β
β βββ Log Segment: 00000000000000000000.log β
β β
β Broker 2 (Follower - ISR) β
β βββ Log Segment: 00000000000000000000.log β
β β
β Broker 3 (Follower - ISR) β
β βββ Log Segment: 00000000000000000000.log β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
ISR Protocol Flow
Architecture Diagram
1. Producer sends message to Leader (Broker 1)
2. Leader appends to local log
3. Leader sends to all followers
4. Followers fetch and append to their logs
5. Followers send Fetch request with offset
6. Leader tracks which followers are "in-sync"
7. After min.insync.replicas acknowledge, producer gets ACK
Timeline:
Producer β Leader (B1) β Fetch β Follower (B2) β Append β Fetch Request
β Follower (B3) β Append β Fetch Request
β B2 offset update
β B3 offset update
β Producer ACK (if min.insync.replicas=2)
Critical Configuration Parameters
# Replication factor (default: 1, recommended: 3)
replication.factor=3
# Minimum replicas that must be in-sync for ACKs
min.insync.replicas=2
# Maximum time a replica can be behind before removal from ISR
replica.lag.time.max.ms=10000
# How often to check for under-replicated partitions
replica.lag.time.max.ms=10000
# Leader election strategy
unclean.leader.election.enable=false
β οΈ
Critical Setting: min.insync.replicas=2 with acks=all means at least 2 replicas must acknowledge before producer gets success. If only 1 replica is in-sync, writes will fail with NotEnoughReplicasException.
Java Producer with Replication Awareness
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Critical replication settings
props.put("acks", "all"); // Wait for all in-sync replicas
props.put("min.insync.replicas", "2");
props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", 1); // Ensure ordering
Producer<String, String> producer = new KafkaProducer<>(props);
// Send with acknowledgment
ProducerRecord<String, String> record = new ProducerRecord<>(
"orders",
"order-123",
"{\"amount\": 99.99}"
);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("Sent to partition %d, offset %d%n",
metadata.partition(), metadata.offset());
} else {
System.err.println("Failed: " + exception.getMessage());
}
});
Python Producer Configuration
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
key_serializer=lambda k: k.encode('utf-8'),
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
# Replication-aware settings
acks='all',
min_in_sync_replicas=2,
retries=2147483647,
max_in_flight_requests_per_connection=1
)
# Send with delivery callback
producer.send(
topic='orders',
key='order-123',
value={'amount': 99.99}
).add_callback(lambda m: print(f"Sent to partition {m.partition}"))
.add_err_callback(lambda e: print(f"Failed: {e}"))
producer.flush()
Monitoring ISR Health
Broker Metrics
// JMX metrics to monitor
// kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
// kafka.server:type=ReplicaManager,name=IsrShrinksPerSec
// kafka.server:type=ReplicaManager,name=IsrExpandsPerSec
// Get ISR status via AdminClient
Properties adminProps = new Properties();
adminProps.put("bootstrap.servers", "kafka1:9092");
AdminClient admin = AdminClient.create(adminProps);
DescribeTopicsResult result = admin.describeTopics(
Collections.singleton("orders")
);
result.all().get().forEach((topic, description) -> {
System.out.println("Topic: " + topic);
description.partitions().forEach(p -> {
System.out.printf(" Partition %d: Leader=%d, ISR=%s%n",
p.partition(), p.leader().id(), p.isr());
});
});
Python ISR Monitoring
from kafka import KafkaAdminClient
from kafka.admin import TopicPartition
admin = KafkaAdminClient(bootstrap_servers=['kafka1:9092'])
# Get partition details
partitions = admin.describe_topics(['orders'])
for topic in partitions:
for partition in topic['partitions']:
leader = partition['leader']
isr = partition['replicas']
print(f"Partition {partition['partition']}: Leader={leader}, ISR={isr}")
# Check for under-replicated partitions
if len(isr) < topic['replication_factor']:
print(f" WARNING: Under-replicated partition!")
ISR Management Scenarios
1. Normal Operation
Architecture Diagram
Partition 0 ISR: [B1, B2, B3]
All replicas in-sync, accepting writes
2. Broker Failure
Architecture Diagram
Partition 0 ISR: [B1, B2] (B3 failed)
- If min.insync.replicas=2: writes continue
- If min.insync.replicas=3: writes fail
- B3 marked as out-of-sync after replica.lag.time.max.ms
3. Network Partition
Architecture Diagram
Partition 0 ISR: [B1, B2] (B3 partitioned)
- B3 cannot fetch from leader
- After lag.time.max.ms, B3 removed from ISR
- B3 resumes: fetches from leader, rejoins ISR
4. Unclean Leader Election
Architecture Diagram
Scenario: B1 (leader) and B2 (follower) both fail
B3 is not in ISR (fell behind)
Option A (safe): No leader elected, partition unavailable
Option B (unsafe): B3 becomes leader, data loss possible
Configuration: unclean.leader.election.enable=false (recommended)
ISR Shrinking and Expanding
// Monitor ISR changes
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
AdminClient admin = AdminClient.create(props);
// Get ISR size for all partitions
ListOffsetsResult result = admin.listOffsets(
TopicPartition.partition("orders", 0),
ListOffsetsSpec.latest()
);
result.all().get().forEach((tp, offset) -> {
System.out.printf("Partition %s: Latest offset = %d%n",
tp, offset.offset());
});
ISR Shrink Detection
# Detect ISR shrinking
import time
from kafka import KafkaConsumer
from prometheus_client import Counter, Gauge
isr_shrink_counter = Counter(
'kafka_isr_shrink_total',
'Total ISR shrink events',
['topic', 'partition']
)
isr_size_gauge = Gauge(
'kafka_isr_size',
'Current ISR size',
['topic', 'partition']
)
# Monitor ISR changes
consumer = KafkaConsumer(
bootstrap_servers=['kafka1:9092'],
group_id='isr-monitor'
)
# Use AdminClient to poll ISR
admin = KafkaAdminClient(bootstrap_servers=['kafka1:9092'])
while True:
topics = admin.describe_topics(['orders'])
for topic in topics:
for partition in topic['partitions']:
isr_size = len(partition['isr'])
isr_size_gauge.labels(
topic=topic['topic'],
partition=partition['partition']
).set(isr_size)
if isr_size < 3: # Expected replication factor
isr_shrink_counter.labels(
topic=topic['topic'],
partition=partition['partition']
).inc()
time.sleep(10)
Follow-Up Questions
- What happens when
min.insync.replicas=2and only 1 replica is in-sync? - How does Kafka detect that a replica has fallen out of sync?
- Explain the trade-offs between
acks=allandacks=1with replication. - What is the impact of
unclean.leader.election.enable=trueon data consistency? - How would you recover from a situation where all replicas for a partition are down?