πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Replication & ISR Deep Dive

Apache KafkaReplication⭐ Premium

Advertisement

Replication & ISR Deep Dive

Difficulty: Senior Level | Companies: LinkedIn, Uber, Netflix, Spotify, Confluent

Content

Replication ensures data durability and availability in Kafka. The ISR (In-Sync Replicas) mechanism is central to understanding how Kafka maintains consistency.

Replication Architecture

Architecture Diagram
Topic: orders (replication-factor=3)

Partition 0:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Broker 1 (Leader)                             β”‚
β”‚  └── Log Segment: 00000000000000000000.log     β”‚
β”‚                                                 β”‚
β”‚  Broker 2 (Follower - ISR)                     β”‚
β”‚  └── Log Segment: 00000000000000000000.log     β”‚
β”‚                                                 β”‚
β”‚  Broker 3 (Follower - ISR)                     β”‚
β”‚  └── Log Segment: 00000000000000000000.log     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

ISR Protocol Flow

Architecture Diagram
1. Producer sends message to Leader (Broker 1)
2. Leader appends to local log
3. Leader sends to all followers
4. Followers fetch and append to their logs
5. Followers send Fetch request with offset
6. Leader tracks which followers are "in-sync"
7. After min.insync.replicas acknowledge, producer gets ACK

Timeline:
Producer β†’ Leader (B1) β†’ Fetch β†’ Follower (B2) β†’ Append β†’ Fetch Request
                              β†’ Follower (B3) β†’ Append β†’ Fetch Request
                              ← B2 offset update
                              ← B3 offset update
                              ← Producer ACK (if min.insync.replicas=2)

Critical Configuration Parameters

# Replication factor (default: 1, recommended: 3)
replication.factor=3

# Minimum replicas that must be in-sync for ACKs
min.insync.replicas=2

# Maximum time a replica can be behind before removal from ISR
replica.lag.time.max.ms=10000

# How often to check for under-replicated partitions
replica.lag.time.max.ms=10000

# Leader election strategy
unclean.leader.election.enable=false

⚠️

Critical Setting: min.insync.replicas=2 with acks=all means at least 2 replicas must acknowledge before producer gets success. If only 1 replica is in-sync, writes will fail with NotEnoughReplicasException.

Java Producer with Replication Awareness

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// Critical replication settings
props.put("acks", "all");  // Wait for all in-sync replicas
props.put("min.insync.replicas", "2");
props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", 1);  // Ensure ordering

Producer<String, String> producer = new KafkaProducer<>(props);

// Send with acknowledgment
ProducerRecord<String, String> record = new ProducerRecord<>(
    "orders",
    "order-123",
    "{\"amount\": 99.99}"
);

producer.send(record, (metadata, exception) -> {
    if (exception == null) {
        System.out.printf("Sent to partition %d, offset %d%n", 
            metadata.partition(), metadata.offset());
    } else {
        System.err.println("Failed: " + exception.getMessage());
    }
});

Python Producer Configuration

from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
    key_serializer=lambda k: k.encode('utf-8'),
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    
    # Replication-aware settings
    acks='all',
    min_in_sync_replicas=2,
    retries=2147483647,
    max_in_flight_requests_per_connection=1
)

# Send with delivery callback
producer.send(
    topic='orders',
    key='order-123',
    value={'amount': 99.99}
).add_callback(lambda m: print(f"Sent to partition {m.partition}"))
 .add_err_callback(lambda e: print(f"Failed: {e}"))

producer.flush()

Monitoring ISR Health

Broker Metrics

// JMX metrics to monitor
// kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
// kafka.server:type=ReplicaManager,name=IsrShrinksPerSec
// kafka.server:type=ReplicaManager,name=IsrExpandsPerSec

// Get ISR status via AdminClient
Properties adminProps = new Properties();
adminProps.put("bootstrap.servers", "kafka1:9092");

AdminClient admin = AdminClient.create(adminProps);

DescribeTopicsResult result = admin.describeTopics(
    Collections.singleton("orders")
);

result.all().get().forEach((topic, description) -> {
    System.out.println("Topic: " + topic);
    description.partitions().forEach(p -> {
        System.out.printf("  Partition %d: Leader=%d, ISR=%s%n",
            p.partition(), p.leader().id(), p.isr());
    });
});

Python ISR Monitoring

from kafka import KafkaAdminClient
from kafka.admin import TopicPartition

admin = KafkaAdminClient(bootstrap_servers=['kafka1:9092'])

# Get partition details
partitions = admin.describe_topics(['orders'])

for topic in partitions:
    for partition in topic['partitions']:
        leader = partition['leader']
        isr = partition['replicas']
        print(f"Partition {partition['partition']}: Leader={leader}, ISR={isr}")
        
        # Check for under-replicated partitions
        if len(isr) < topic['replication_factor']:
            print(f"  WARNING: Under-replicated partition!")

ISR Management Scenarios

1. Normal Operation

Architecture Diagram
Partition 0 ISR: [B1, B2, B3]
All replicas in-sync, accepting writes

2. Broker Failure

Architecture Diagram
Partition 0 ISR: [B1, B2]  (B3 failed)
- If min.insync.replicas=2: writes continue
- If min.insync.replicas=3: writes fail
- B3 marked as out-of-sync after replica.lag.time.max.ms

3. Network Partition

Architecture Diagram
Partition 0 ISR: [B1, B2]  (B3 partitioned)
- B3 cannot fetch from leader
- After lag.time.max.ms, B3 removed from ISR
- B3 resumes: fetches from leader, rejoins ISR

4. Unclean Leader Election

Architecture Diagram
Scenario: B1 (leader) and B2 (follower) both fail
B3 is not in ISR (fell behind)

Option A (safe): No leader elected, partition unavailable
Option B (unsafe): B3 becomes leader, data loss possible

Configuration: unclean.leader.election.enable=false (recommended)

ISR Shrinking and Expanding

// Monitor ISR changes
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");

AdminClient admin = AdminClient.create(props);

// Get ISR size for all partitions
ListOffsetsResult result = admin.listOffsets(
    TopicPartition.partition("orders", 0),
    ListOffsetsSpec.latest()
);

result.all().get().forEach((tp, offset) -> {
    System.out.printf("Partition %s: Latest offset = %d%n", 
        tp, offset.offset());
});

ISR Shrink Detection

# Detect ISR shrinking
import time
from kafka import KafkaConsumer
from prometheus_client import Counter, Gauge

isr_shrink_counter = Counter(
    'kafka_isr_shrink_total',
    'Total ISR shrink events',
    ['topic', 'partition']
)

isr_size_gauge = Gauge(
    'kafka_isr_size',
    'Current ISR size',
    ['topic', 'partition']
)

# Monitor ISR changes
consumer = KafkaConsumer(
    bootstrap_servers=['kafka1:9092'],
    group_id='isr-monitor'
)

# Use AdminClient to poll ISR
admin = KafkaAdminClient(bootstrap_servers=['kafka1:9092'])

while True:
    topics = admin.describe_topics(['orders'])
    for topic in topics:
        for partition in topic['partitions']:
            isr_size = len(partition['isr'])
            isr_size_gauge.labels(
                topic=topic['topic'],
                partition=partition['partition']
            ).set(isr_size)
            
            if isr_size < 3:  # Expected replication factor
                isr_shrink_counter.labels(
                    topic=topic['topic'],
                    partition=partition['partition']
                ).inc()
    
    time.sleep(10)

Follow-Up Questions

  1. What happens when min.insync.replicas=2 and only 1 replica is in-sync?
  2. How does Kafka detect that a replica has fallen out of sync?
  3. Explain the trade-offs between acks=all and acks=1 with replication.
  4. What is the impact of unclean.leader.election.enable=true on data consistency?
  5. How would you recover from a situation where all replicas for a partition are down?

Advertisement