Kafka: Partitions, Consumer Groups, Exactly-Once
Deep dive into Apache Kafka internals and best practices
Interview Question
"You have a Kafka topic 'orders' with 12 partitions. A consumer group 'order-processor' has 8 consumers. Explain: (1) How are messages distributed? (2) What happens if you add 4 more consumers? (3) What happens if a consumer crashes? (4) How do you achieve exactly-once processing?"
Difficulty: Hard | Frequently asked at LinkedIn, Uber, Netflix, Stripe
Theoretical Foundation
Kafka Architecture
Partitions
A partition is an ordered, immutable sequence of records. Each record has a sequential id called an offset.
Key properties:
- Ordering: Guaranteed within a partition (not across partitions)
- Immutability: Records cannot be modified after writing
- Append-only: New records are added to the end
- Retention: Records are retained for a configurable time
Partition assignment:
For the orders topic with 12 partitions:
- Message with key "user_123" β partition
hash("user_123") % 12 - Same key always goes to same partition (ordering guarantee)
Consumer Groups
A consumer group is a set of consumers that jointly consume a topic. Each partition is assigned to exactly one consumer in the group.
Assignment algorithm:
- Sort partitions numerically
- Sort consumers lexicographically
- Assign partitions round-robin
For 12 partitions, 8 consumers:
- Consumers 1-4: 2 partitions each (P0-P7)
- Consumers 5-8: 1 partition each (P8-P11)
Actually, the correct distribution:
- Consumer 0: P0, P12 (but we only have 12 partitions, so P0, P12 doesn't exist)
- Let me recalculate: 12 partitions / 8 consumers = 1.5 partitions per consumer
- Consumers 0-3: 2 partitions each (P0-P7)
- Consumers 4-7: 1 partition each (P8-P11)
Consumer Group Coordination
Exactly-Once Semantics (EOS)
Exactly-once processing ensures each message is processed exactly once, despite failures.
Three components:
- Producer Idempotency:
# Producer config
enable.idempotence=true
acks=all
retries=Integer.MAX_VALUE
max.in.flight.requests.per.connection=5
- Transactional Producer:
producer.initTransactions();
producer.beginTransaction();
try {
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
- Transactional Consumer:
# Consumer config
isolation.level=read_committed
Transaction Flow:
Offset Management
Consumer offsets track the last processed message per partition.
Offset commit strategies:
- Auto-commit: Kafka commits offsets automatically
- Manual commit: Application commits after processing
- Exactly-once commit: Transactional commit with processing
Rebalancing
When consumers join or leave, partitions are reassigned.
Before Rebalance (6 consumers, 12 partitions):
Consumer 0: P0, P1, P2
Consumer 1: P3, P4, P5
Consumer 2: P6, P7, P8
Consumer 3: P9, P10, P11
Consumer 4: (idle)
Consumer 5: (idle)
After Consumer 4 joins (7 consumers, 12 partitions):
Consumer 0: P0, P1
Consumer 1: P2, P3
Consumer 2: P4, P5
Consumer 3: P6, P7
Consumer 4: P8, P9
Consumer 5: P10, P11
Consumer 6: (idle)
Rebalance triggers:
- Consumer joins group
- Consumer leaves group (graceful or crash)
- Consumer heartbeat timeout (45 seconds)
- Topic partitions change (admin operation)
- New topic added to group
Rebalance protocols:
- Eager: Stop-the-world rebalance (all consumers stop)
- Cooperative (Incremental): Only affected partitions are reassigned
- Static group membership: Consumers keep partitions across restarts
β οΈ
Common Interview Trap: Many candidates don't understand that rebalancing causes processing pauses. During rebalance, all consumers stop processing. For low-latency applications, use cooperative rebalancing or static group membership.
Partition Reassignment Strategies
RangeAssignor (default):
Partitions: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
Consumers: [C0, C1, C2, C3]
C0: [0, 1, 2] (3 partitions)
C1: [3, 4, 5] (3 partitions)
C2: [6, 7, 8] (3 partitions)
C3: [9, 10, 11] (3 partitions)
RoundRobinAssignor:
Partitions: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
Consumers: [C0, C1, C2, C3]
C0: [0, 4, 8] (3 partitions)
C1: [1, 5, 9] (3 partitions)
C2: [2, 6, 10] (3 partitions)
C3: [3, 7, 11] (3 partitions)
StickyAssignor:
- Minimizes partition movement during rebalance
- Preserves existing assignments when possible
CooperativeStickyAssignor:
- Incremental rebalance
- Only moves partitions that need to move
Code Implementation
Kafka Producer Configuration
from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.partitioner import RoundRobinPartitioner
import json
import time
# Create topic
admin_client = KafkaAdminClient(bootstrap_servers='kafka:9092')
topics = [
NewTopic(
name='orders',
num_partitions=12,
replication_factor=3,
config={
'retention.ms': str(7 * 24 * 60 * 60 * 1000), # 7 days
'segment.ms': str(24 * 60 * 60 * 1000), # 1 day
'cleanup.policy': 'delete'
}
)
]
admin_client.create_topics(new_topics=topics)
# Producer with idempotency
producer = KafkaProducer(
bootstrap_servers='kafka:9092',
key_serializer=lambda k: k.encode('utf-8'),
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all',
enable_idempotence=True,
retries=2147483647,
max_in_flight_requests_per_connection=5,
linger_ms=10,
batch_size=32768,
compression_type='snappy'
)
# Send messages
for i in range(1000000):
order = {
'order_id': f'order_{i}',
'user_id': f'user_{i % 1000}',
'amount': float(i * 10),
'timestamp': time.time()
}
# Partition by user_id for ordering guarantee
producer.send('orders', key=order['user_id'], value=order)
if i % 10000 == 0:
producer.flush()
print(f"Sent {i} messages")
producer.flush()
Kafka Consumer Configuration
from kafka import KafkaConsumer
from kafka import TopicPartition
import json
# Consumer with exactly-once semantics
consumer = KafkaConsumer(
'orders',
bootstrap_servers='kafka:9092',
group_id='order-processor',
auto_offset_reset='earliest',
enable_auto_commit=False,
isolation_level='read_committed',
max_poll_records=500,
max_poll_interval_ms=300000,
session_timeout_ms=30000,
heartbeat_interval_ms=10000
)
# Process messages with manual offset commit
def process_messages(consumer):
try:
while True:
# Poll for messages
messages = consumer.poll(timeout_ms=1000)
for tp, msgs in messages.items():
for msg in msgs:
# Process message
order = json.loads(msg.value.decode('utf-8'))
process_order(order)
# Commit offset after processing all messages in partition
consumer.commit()
except KeyboardInterrupt:
consumer.close()
def process_order(order):
"""Process a single order"""
# Business logic here
print(f"Processing order: {order['order_id']}")
Transactional Producer
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
# Transactional producer
producer = KafkaProducer(
bootstrap_servers='kafka:9092',
key_serializer=lambda k: k.encode('utf-8'),
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
transactional_id='order-processor-tx-1' # Unique per producer instance
)
# Initialize transactions
producer.init_transactions()
def process_order_with_transaction(order, consumer):
"""Process order with transactional guarantees"""
try:
# Begin transaction
producer.begin_transaction()
# Process order (business logic)
enriched_order = enrich_order(order)
# Send to output topic
producer.send('enriched-orders', key=order['user_id'], value=enriched_order)
# Send to audit topic
producer.send('order-audit', key=order['order_id'], value={
'order_id': order['order_id'],
'action': 'processed',
'timestamp': time.time()
})
# Commit consumer offset in same transaction
producer.send_offsets_to_transaction(
consumer.position(consumer.assignment()),
consumer.group_metadata()
)
# Commit transaction
producer.commit_transaction()
except KafkaError as e:
# Abort transaction on error
producer.abort_transaction()
raise e
def enrich_order(order):
"""Enrich order with additional data"""
# Add user info, product details, etc.
return {**order, 'enriched': True}
Consumer Group with Cooperative Rebalancing
from kafka import KafkaConsumer
from kafka.coordinator.assignors import CooperativeStickyAssignor
# Consumer with cooperative rebalancing
consumer = KafkaConsumer(
'orders',
bootstrap_servers='kafka:9092',
group_id='order-processor',
partition_assignment_strategy=[CooperativeStickyAssignor()],
session_timeout_ms=45000,
heartbeat_interval_ms=15000,
max_poll_interval_ms=300000
)
# Track partition assignments
def on_partitions_assigned(partitions):
print(f"Partitions assigned: {partitions}")
def on_partitions_revoked(partitions):
print(f"Partitions revoked: {partitions}")
consumer.on_partitions_assigned = on_partitions_assigned
consumer.on_partitions_revoked = on_partitions_revoked
Monitoring and Metrics
from kafka import KafkaConsumer
from prometheus_client import start_http_server, Gauge
import time
# Prometheus metrics
CONSUMER_LAG = Gauge('kafka_consumer_lag', 'Consumer lag', ['topic', 'partition'])
CONSUMER_RATE = Gauge('kafka_consumer_rate', 'Messages per second', ['topic'])
CONSUMER_ERRORS = Gauge('kafka_consumer_errors', 'Processing errors', ['topic'])
# Start Prometheus metrics server
start_http_server(8000)
def monitor_consumer(consumer):
"""Monitor consumer health and lag"""
while True:
# Get partition assignments
assignments = consumer.assignment()
for tp in assignments:
# Get current position (last processed offset)
position = consumer.position(tp)
# Get end offset (latest offset in partition)
end_offsets = consumer.end_offsets([tp])
end_offset = end_offsets[tp]
# Calculate lag
lag = end_offset - position
# Update Prometheus metrics
CONSUMER_LAG.labels(topic=tp.topic, partition=tp.partition).set(lag)
# Log lag if too high
total_lag = sum(CONSUMER_LAG._metrics.values())
if total_lag > 10000:
print(f"WARNING: High consumer lag: {total_lag}")
time.sleep(10)
Performance Tuning
# ============================================================
# PERFORMANCE TUNING
# ============================================================
# Producer tuning
producer_config = {
# Batching
'linger.ms': 100, # Wait up to 100ms to batch messages
'batch.size': 65536, # 64KB batch size
'buffer.memory': 33554432, # 32MB buffer
# Compression
'compression.type': 'snappy', # or lz4, zstd
# Acknowledgments
'acks': 'all', # Wait for all replicas
'retries': 2147483647,
# Idempotency
'enable.idempotence': True,
'max.in.flight.requests.per.connection': 5,
# Network
'send.buffer.bytes': 131072, # 128KB
'receive.buffer.bytes': 131072, # 128KB
}
# Consumer tuning
consumer_config = {
# Polling
'max.poll.records': 1000,
'max.poll.interval.ms': 300000,
# Fetching
'fetch.min.bytes': 1,
'fetch.max.wait.ms': 500,
'max.partition.fetch.bytes': 1048576, # 1MB
# Session
'session.timeout.ms': 30000,
'heartbeat.interval.ms': 10000,
# Offsets
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
}
π‘
Production Tip: Always monitor these Kafka metrics:
- Consumer lag: How far behind consumers are
- Under-replicated partitions: Replication health
- Request latency: Producer/consumer latency
- Disk usage: Broker storage health
- Network throughput: Bandwidth utilization
Common Follow-Up Questions
Q1: What happens when you have more consumers than partitions?
Extra consumers are idle. For 12 partitions and 16 consumers:
- 12 consumers get 1 partition each
- 4 consumers are idle (no partitions assigned)
This is wasteful. Solution: Increase partitions or reduce consumers.
Q2: How do you handle ordering across partitions?
Kafka only guarantees ordering within a partition. For cross-partition ordering:
- Use a single partition (loses parallelism)
- Use a global ordering service (complex)
- Accept eventual consistency (most common)
Q3: What's the difference between at-least-once, at-most-once, and exactly-once?
| Semantics | Duplicates | Data Loss | Complexity |
|---|---|---|---|
| At-most-once | No | Yes | Low |
| At-least-once | Yes | No | Medium |
| Exactly-once | No | No | High |
Q4: How do you handle schema evolution in Kafka?
# Using Avro with Schema Registry
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
# Define schema
schema_str = """
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "user_id", "type": "string"}
]
}
"""
# Producer with schema
avroProducer = AvroProducer({
'bootstrap.servers': 'kafka:9092',
'schema.registry.url': 'http://schema-registry:8081'
})
# Schema evolution: Add new field with default
schema_v2 = """
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "user_id", "type": "string"},
{"name": "currency", "type": "string", "default": "USD"}
]
}
"""
β οΈ
Critical Consideration: Kafka's exactly-once semantics only works within the Kafka ecosystem (Kafka Streams, Transactional Producers). For external systems, you need idempotent operations and transactional outbox patterns.
Company-Specific Tips
LinkedIn Interview Tips
- Discuss Kafka's origin at LinkedIn
- Explain consumer group coordination
- Mention schema evolution with Schema Registry
- Talk about Kafka Connect for data integration
Uber Interview Tips
- Focus on real-time ride matching with Kafka
- Discuss exactly-once for payment processing
- Mention Kafka Streams for real-time aggregations
- Talk about dead letter queues for error handling
Netflix Interview Tips
- Discuss content delivery with Kafka
- Explain A/B testing event streaming
- Mention viewing history real-time processing
- Talk about multi-region Kafka deployment
βΉοΈ
Final Takeaway: Kafka is more than just a message queueβit's a distributed streaming platform. Master partitions, consumer groups, and exactly-once semantics to design robust real-time systems. Always consider: (1) partition count, (2) consumer count, (3) rebalancing strategy, and (4) offset management.