Kafka Internals: Partitions, Replicas, ISR, Controller
Difficulty: Expert | Asked at: Google, LinkedIn, Confluent, Uber
βΉοΈInterview Context
This question tests your understanding of Kafka's core architecture. Senior and Staff-level candidates are expected to explain partition leadership, the ISR mechanism, and controller failover with confidence.
The Question
Explain the internal architecture of Apache Kafka. How do partitions, replicas, the ISR (In-Sync Replicas) set, and the controller interact? Describe what happens when a broker fails and how the system maintains data consistency.
Core Architecture Overview
Kafka's architecture is built on a distributed log abstraction. At the highest level:
Producer β Broker Cluster β Consumer Group
β
Controller (one broker)
β
Partition Leaders + Follows
Partition Model
Each topic is divided into P partitions. Partition assignment follows:
For a topic with replication factor , each partition has replicas spread across different brokers. One replica is the leader and are followers.
Topic: orders (P=3, R=3)
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Partition 0 Partition 1 Partition 2 β
β βββββββββββ βββββββββββ βββββββββββ β
β β B1 (L) β β B2 (L) β β B3 (L) β β
β β B2 (F) β β B3 (F) β β B1 (F) β β
β β B3 (F) β β B1 (F) β β B2 (F) β β
β βββββββββββ βββββββββββ βββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β οΈKey Insight
All reads and writes go through the partition leader. Followers exist purely for fault tolerance and read scaling (when configured).
In-Sync Replicas (ISR)
The ISR is the set of replicas that are fully caught up with the leader. Formally:
Where lag is measured as the time difference between the leader's last write and the follower's replication offset.
Key properties of ISR:
- Every replica starts in the ISR
- A replica is removed if it falls behind by
replica.lag.time.max.ms(default 30s) - A replica is re-added once it catches up
- The ISR must be acknowledged for
acks=all
ISR Shrinking and Expansion Events
# Pseudocode for ISR management
class ISRManager:
def __init__(self, leader, replicas, lag_threshold_ms=30000):
self.leader = leader
self.replicas = replicas
self.isr = set(replicas)
self.lag_threshold = lag_threshold_ms
def check_isr(self, current_time_ms):
"""Called periodically by the leader"""
new_isr = set()
for replica in self.replicas:
last_caught_up = replica.last_fetch_time_ms
if current_time_ms - last_caught_up <= self.lag_threshold:
new_isr.add(replica)
removed = self.isr - new_isr
added = new_isr - self.isr
if removed:
log.warn(f"ISR shrinking: removed {removed}")
if added:
log.info(f"ISR expanding: added {added}")
self.isr = new_isr
return self.isr
Controller Election
The controller is a single broker responsible for:
- Partition leader election
- Topic creation/deletion
- Broker registration/deregistration
- ISR management coordination
Controller Election Protocol
1. All brokers watch /controller in ZooKeeper
2. When controller dies:
a. ZooKeeper ephemeral node /controller deleted
b. All brokers receive watch notification
c. Each broker attempts sequential write to /controller
d. First successful writer becomes controller
3. New controller:
a. Reads all partition metadata
b. For each partition where leader was dead:
- Selects new leader from ISR
- Updates /brokers/topics/<topic>/partitions/<p>/state
c. Broadcasts leadership changes
β οΈZooKeeper vs KRaft
In KRaft mode (Kafka 3.4+), the controller quorum replaces ZooKeeper entirely. The metadata log is replicated across controller nodes using Raft consensus, eliminating the single-point-of-failure concern of ZK.
Leader Election Process
When a partition leader fails, the controller initiates election:
class LeaderElection:
def __init__(self, partition, isr_list, preferred_replicas):
self.partition = partition
self.isr = isr_list
self.preferred = preferred_replicas
def elect_leader(self):
"""Offline leader election"""
# Priority: preferred replica order within ISR
for replica in self.preferred:
if replica in self.isr and replica.is_alive():
return replica
# Fallback: any live ISR member
for replica in self.isr:
if replica.is_alive():
return replica
# No available replica
raise NoAvailableLeader(self.partition)
def elect_with_unclean_leader(self):
"""Unclean leader election (dangerous)"""
# Allows non-ISR replica to become leader
# Risk: data loss
for replica in self.partition.replicas:
if replica.is_alive():
return replica
raise NoAvailableLeader(self.partition)
Leader Election Timeline
T+0ms: Leader broker (B1) crashes
T+0-50ms: ZooKeeper session timeout / heartbeat failure
T+50ms: Controller notified via Watcher
T+50-100ms: Controller reads partition state from ZK
T+100-150ms: Controller selects new leader from ISR
T+150-200ms: Controller writes new state to ZK
T+200-250ms: New leader begins serving requests
T+250ms: Producers/consumers updated via metadata refresh
Replication Protocol
Followers pull data from the leader using a fetch-based protocol:
class ReplicationFetcher:
def __init__(self, leader, follower, max_fetch_bytes=1048576):
self.leader = leader
self.follower = follower
self.max_fetch_bytes = max_fetch_bytes
self.fetch_offset = follower.log_end_offset
def fetch_loop(self):
while self.follower.is_running():
# Request batch from leader
fetch_request = FetchRequest(
topic=self.partition.topic,
partition=self.partition.id,
offset=self.fetch_offset,
max_bytes=self.max_fetch_bytes
)
response = self.leader.fetch(fetch_request)
if response.has_data():
# Append to local log
bytes_written = self.follower.append(
response.records,
response.offset
)
# Update high watermark
if response.high_watermark > self.follower.high_watermark:
self.follower.high_watermark = response.high_watermark
self.fetch_offset = response.high_watermark
time.sleep(0.1) # Backoff if no data
High Watermark and Data Consistency
The high watermark (HW) is the offset of the last committed message that all ISR members have replicated:
Leader Log: [0] [1] [2] [3] [4] [5]
HW = 4: ββββββββββββββββββββββ
β
Follower A: [0] [1] [2] [3] [4] [5] (caught up)
Follower B: [0] [1] [2] [3] (behind)
HW = 4 because Follower B only has up to offset 3
Wait: HW = min(5, 4) = 4? No, HW = min(end offsets of ISR)
Follower B end = 3, so HW = 3
βΉοΈRead Visibility
Consumers with isolation.level=read_committed only see messages up to the high watermark. This prevents reading uncommitted or partially replicated data.
Complete Example: Producer-Acks-Broker Flow
from kafka import KafkaProducer
from kafka.errors import NotEnoughReplicas, NotEnoughReplicasAfterAppend
import time
def produce_with_acks():
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
acks='all', # Wait for all ISR
retries=5,
max_in_flight_requests_per_connection=1, # Ordering
enable_idempotence=True, # Exactly-once
linger_ms=5, # Batch for 5ms
batch_size=16384, # 16KB batches
compression_type='snappy',
request_timeout_ms=30000
)
for i in range(1000):
message = f"event-{i}"
future = producer.send(
topic='user-events',
key=f"user-{i % 100}".encode(),
value=message.encode(),
headers=[('source', b'web-app')]
)
try:
record_metadata = future.get(timeout=30)
print(f"Sent to partition {record_metadata.partition} "
f"offset {record_metadata.offset}")
except NotEnoughReplicas:
print("Not enough replicas available")
except NotEnoughReplicasAfterAppend:
print("Not enough replicas after append")
except Exception as e:
print(f"Produce failed: {e}")
producer.flush()
producer.close()
Broker Failure Scenarios Matrix
| Scenario | Impact | Recovery Time | Data Loss Risk |
|---|---|---|---|
| Follower failure | ISR shrinks, writes continue | Minutes (auto-rejoin) | None |
| Leader failure | Partition unavailable | Seconds (controller election) | None (if unclean=false) |
| Controller failure | New controller elected | Seconds (ZK election) | None |
| Multiple ISR failures | May block writes if acks=all | Until replicas rejoin | None (or data loss if unclean=true) |
| Rack failure | Multiple brokers down | Manual intervention possible | Depends on replica placement |
Performance Formula
The effective write throughput is bounded by:
Where is the replication factor because each write is replicated times.
KRaft Mode Architecture
βββββββββββββββββββββββββββββββββββββββββββββββ
β Controller Quorum (3 nodes) β
β βββββββ βββββββ βββββββ β
β β C1 β β C2 β β C3 β β Raft Log β
β β(L) β β(F) β β(F) β β
β βββββββ βββββββ βββββββ β
β β Metadata Changes β β
βββββββββββββββββββββββββββββββββββββββββββββββ€
β Broker Quorum (N nodes) β
β βββββββ βββββββ βββββββ βββββββ β
β β B1 β β B2 β β B3 β β B4 β β
β βββββββ βββββββ βββββββ βββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββ
βΉοΈProduction Tip
In KRaft mode, metadata changes are committed via Raft consensus in ~10-50ms, compared to ZooKeeper's 100-200ms. This significantly reduces partition leader election times and improves cluster scalability.
Key Metrics to Monitor
# Critical Kafka metrics for architecture health
metrics_to_watch = {
# ISR metrics
"kafka.server:type=ReplicaManager,name=IsrShrinksPerSec": "ISR shrink rate",
"kafka.server:type=ReplicaManager,name=IsrExpandsPerSec": "ISR expand rate",
"kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions": "Under-replicated count",
# Controller metrics
"kafka.controller:type=KafkaController,name=ActiveControllerCount": "Controller active",
"kafka.controller:type=KafkaController,name=OfflinePartitionsCount": "Offline partitions",
"kafka.controller:type=ControllerChannelManager,name=QueueSize": "Controller queue",
# Partition metrics
"kafka.server:type=ReplicaManager,name=PartitionCount": "Total partitions",
"kafka.log:type=Log,name=Size": "Log size per partition"
}
Common Follow-Up Questions
-
Why does Kafka use pull-based replication instead of push?
- Pull allows followers to control fetch rate
- Avoids overwhelming slow followers
- Enables natural backpressure
- Simplifies leader logic (stateless replication)
-
What happens during a controlled shutdown?
- Broker moves leadership of its partitions to other ISR members first
- Then shuts down cleanly
- No data loss, minimal unavailability window
-
How does rack awareness work?
- Brokers configured with
broker.rackproperty - Controller ensures replicas span different racks
- Protects against rack-level failures
- Brokers configured with
β οΈCritical Concept
The ISR mechanism is what makes Kafka's acks=all both safe and performant. If ISR always contained all replicas, it would be equivalent to synchronous replication. The ISR allows Kafka to be flexibleβsometimes synchronous (all replicas in sync), sometimes asynchronous (some replicas lagging).