Consumer Groups & Rebalancing
Difficulty: Senior Level | Companies: LinkedIn, Uber, Netflix, Spotify, Confluent
Content
Consumer groups enable parallel consumption of Kafka topics. Understanding rebalancing is crucial for building resilient consumer applications.
Consumer Group Architecture
Architecture Diagram
Consumer Group: "order-processor"
Topic: orders (4 partitions)
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Consumer 1 Consumer 2 β
β βββββββββββββββ βββββββββββββββ β
β β Partition 0 β β Partition 2 β β
β β Partition 1 β β Partition 3 β β
β βββββββββββββββ βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
Rules:
- Each partition consumed by exactly ONE consumer in group
- Consumers can consume multiple partitions
- Group members tracked in __consumer_offsets topic
Rebalancing Flow
Architecture Diagram
Rebalance Trigger:
1. Consumer joins group
2. Consumer leaves (crash/shutdown)
3. Topic metadata changes
4. Consumer heartbeat timeout
Rebalance Protocol:
1. Consumer sends JoinGroup request
2. Coordinator collects all members
3. Coordinator sends SyncGroup to assign partitions
4. Consumers receive assignment and resume
Java Consumer Implementation
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("group.id", "order-processor");
props.put("enable.auto.commit", false);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Rebalance listener
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// Custom rebalance listener
consumer.subscribe(Arrays.asList("orders"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Partitions revoked: " + partitions);
// Commit offsets or release resources
commitCurrentOffsets();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Partitions assigned: " + partitions);
// Initialize state for new partitions
initializePartitions(partitions);
}
});
// Consumption loop
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
// Commit after processing
consumer.commitSync();
}
Python Consumer Implementation
from kafka import KafkaConsumer
from kafka.consumer import ConsumerRebalanceListener
import json
class OrderProcessorListener(ConsumerRebalanceListener):
def on_partitions_revoked(self, revoked):
"""Called before partitions are revoked"""
print(f"Partitions revoked: {revoked}")
# Commit final offsets
self.commit_offsets()
def on_partitions_assigned(self, assigned):
"""Called after partitions are assigned"""
print(f"Partitions assigned: {assigned}")
# Initialize processing state
self.initialize_state(assigned)
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['kafka1:9092', 'kafka2:9092'],
group_id='order-processor',
enable_auto_commit=False,
auto_offset_reset='earliest',
# Assignment strategy
partition_assignment_strategy=['org.apache.kafka.clients.consumer.CooperativeStickyAssignor'],
# Session and heartbeat timeouts
session_timeout_ms=30000,
heartbeat_interval_ms=10000,
max_poll_interval_ms=300000
)
# Set rebalance listener
consumer.subscribe(['orders'], listener=OrderProcessorListener())
# Process messages
try:
while True:
records = consumer.poll(timeout_ms=1000)
for tp, messages in records.items():
for message in messages:
process_message(message)
# Commit offsets
consumer.commit()
except KeyboardInterrupt:
consumer.close()
Partition Assignment Strategies
RangeAssignor (Default)
// Assigns partitions sequentially to consumers
// Consumer 1: [0, 1]
// Consumer 2: [2, 3]
// Consumer 3: [] (if more consumers than partitions)
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.RangeAssignor");
RoundRobinAssignor
// Distributes partitions evenly
// Consumer 1: [0, 2]
// Consumer 2: [1, 3]
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.RoundRobinAssignor");
CooperativeStickyAssignor (Kafka 2.4+)
// Minimizes partition movement during rebalance
// Only moves partitions that must change
// Reduces rebalance impact
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
Comparison
Architecture Diagram
Strategy | Rebalance Impact | Distribution
-------------------|-----------------|-------------
RangeAssignor | High | Sequential
RoundRobinAssignor | High | Even
CooperativeSticky | Low | Even (sticky)
βΉοΈ
Key Insight: CooperativeStickyAssignor performs cooperative rebalancing - only the affected partitions are revoked and reassigned. This minimizes stop-the-world rebalancing.
Rebalancing Best Practices
1. Session Timeouts
// Heartbeat to coordinator
props.put("session.timeout.ms", "30000"); // 30 seconds
props.put("heartbeat.interval.ms", "10000"); // 10 seconds (1/3 of session)
// Consumer poll timeout
props.put("max.poll.interval.ms", "300000"); // 5 minutes
2. Static Group Membership (Kafka 2.3+)
// Consumers maintain membership across restarts
props.put("group.instance.id", "consumer-1"); // Static ID
// Reduces unnecessary rebalances
// Consumer crash and restart: no rebalance if within session timeout
3. Incremental Cooperative Rebalancing
// Enable cooperative rebalancing
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
// Benefits:
// - No stop-the-world rebalances
// - Only affected partitions are revoked
// - Reduces processing pause
Monitoring Consumer Groups
# List consumer groups
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --list
# Describe group status
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 \
--describe --group order-processor
# Output:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
order-processor orders 0 12345 12350 5
order-processor orders 1 12340 12340 0
order-processor orders 2 12342 12345 3
order-processor orders 3 12338 12341 3
Lag Monitoring
from kafka import KafkaConsumer
from prometheus_client import Gauge, start_http_server
lag_gauge = Gauge(
'kafka_consumer_lag',
'Consumer lag per partition',
['topic', 'partition', 'consumer_group']
)
def monitor_lag(bootstrap_servers, topic, group_id):
consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
group_id=group_id
)
while True:
partitions = consumer.partitions_for_topic(topic)
for partition in partitions:
tp = TopicPartition(topic, partition)
# Get end offset
consumer.seek_to_end(tp)
end_offset = consumer.position(tp)
# Get committed offset
committed = consumer.committed(tp)
lag = end_offset - committed if committed else end_offset
lag_gauge.labels(topic, partition, group_id).set(lag)
time.sleep(10)
Follow-Up Questions
- What happens when a consumer crashes during a rebalance?
- How does static group membership affect rebalancing behavior?
- Explain the difference between RangeAssignor and CooperativeStickyAssignor.
- What is the impact of setting
max.poll.interval.mstoo low? - How would you handle a consumer that is consistently too slow?