πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Consumer Groups & Rebalancing

Apache KafkaConsumer Groups⭐ Premium

Advertisement

Consumer Groups & Rebalancing

Difficulty: Senior Level | Companies: LinkedIn, Uber, Netflix, Spotify, Confluent

Content

Consumer groups enable parallel consumption of Kafka topics. Understanding rebalancing is crucial for building resilient consumer applications.

Consumer Group Architecture

Architecture Diagram
Consumer Group: "order-processor"

Topic: orders (4 partitions)

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Consumer 1         Consumer 2                   β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”             β”‚
β”‚  β”‚ Partition 0 β”‚   β”‚ Partition 2 β”‚             β”‚
β”‚  β”‚ Partition 1 β”‚   β”‚ Partition 3 β”‚             β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Rules:
- Each partition consumed by exactly ONE consumer in group
- Consumers can consume multiple partitions
- Group members tracked in __consumer_offsets topic

Rebalancing Flow

Architecture Diagram
Rebalance Trigger:
1. Consumer joins group
2. Consumer leaves (crash/shutdown)
3. Topic metadata changes
4. Consumer heartbeat timeout

Rebalance Protocol:
1. Consumer sends JoinGroup request
2. Coordinator collects all members
3. Coordinator sends SyncGroup to assign partitions
4. Consumers receive assignment and resume

Java Consumer Implementation

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("group.id", "order-processor");
props.put("enable.auto.commit", false);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// Rebalance listener
props.put("partition.assignment.strategy", 
          "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// Custom rebalance listener
consumer.subscribe(Arrays.asList("orders"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        System.out.println("Partitions revoked: " + partitions);
        // Commit offsets or release resources
        commitCurrentOffsets();
    }
    
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        System.out.println("Partitions assigned: " + partitions);
        // Initialize state for new partitions
        initializePartitions(partitions);
    }
});

// Consumption loop
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        processRecord(record);
    }
    
    // Commit after processing
    consumer.commitSync();
}

Python Consumer Implementation

from kafka import KafkaConsumer
from kafka.consumer import ConsumerRebalanceListener
import json

class OrderProcessorListener(ConsumerRebalanceListener):
    def on_partitions_revoked(self, revoked):
        """Called before partitions are revoked"""
        print(f"Partitions revoked: {revoked}")
        # Commit final offsets
        self.commit_offsets()
    
    def on_partitions_assigned(self, assigned):
        """Called after partitions are assigned"""
        print(f"Partitions assigned: {assigned}")
        # Initialize processing state
        self.initialize_state(assigned)

consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['kafka1:9092', 'kafka2:9092'],
    group_id='order-processor',
    enable_auto_commit=False,
    auto_offset_reset='earliest',
    
    # Assignment strategy
    partition_assignment_strategy=['org.apache.kafka.clients.consumer.CooperativeStickyAssignor'],
    
    # Session and heartbeat timeouts
    session_timeout_ms=30000,
    heartbeat_interval_ms=10000,
    max_poll_interval_ms=300000
)

# Set rebalance listener
consumer.subscribe(['orders'], listener=OrderProcessorListener())

# Process messages
try:
    while True:
        records = consumer.poll(timeout_ms=1000)
        
        for tp, messages in records.items():
            for message in messages:
                process_message(message)
            
        # Commit offsets
        consumer.commit()
        
except KeyboardInterrupt:
    consumer.close()

Partition Assignment Strategies

RangeAssignor (Default)

// Assigns partitions sequentially to consumers
// Consumer 1: [0, 1]
// Consumer 2: [2, 3]
// Consumer 3: [] (if more consumers than partitions)

props.put("partition.assignment.strategy", 
          "org.apache.kafka.clients.consumer.RangeAssignor");

RoundRobinAssignor

// Distributes partitions evenly
// Consumer 1: [0, 2]
// Consumer 2: [1, 3]

props.put("partition.assignment.strategy", 
          "org.apache.kafka.clients.consumer.RoundRobinAssignor");

CooperativeStickyAssignor (Kafka 2.4+)

// Minimizes partition movement during rebalance
// Only moves partitions that must change
// Reduces rebalance impact

props.put("partition.assignment.strategy", 
          "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");

Comparison

Architecture Diagram
Strategy            | Rebalance Impact | Distribution
-------------------|-----------------|-------------
RangeAssignor       | High            | Sequential
RoundRobinAssignor  | High            | Even
CooperativeSticky   | Low             | Even (sticky)

ℹ️

Key Insight: CooperativeStickyAssignor performs cooperative rebalancing - only the affected partitions are revoked and reassigned. This minimizes stop-the-world rebalancing.

Rebalancing Best Practices

1. Session Timeouts

// Heartbeat to coordinator
props.put("session.timeout.ms", "30000");  // 30 seconds
props.put("heartbeat.interval.ms", "10000");  // 10 seconds (1/3 of session)

// Consumer poll timeout
props.put("max.poll.interval.ms", "300000");  // 5 minutes

2. Static Group Membership (Kafka 2.3+)

// Consumers maintain membership across restarts
props.put("group.instance.id", "consumer-1");  // Static ID

// Reduces unnecessary rebalances
// Consumer crash and restart: no rebalance if within session timeout

3. Incremental Cooperative Rebalancing

// Enable cooperative rebalancing
props.put("partition.assignment.strategy", 
          "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");

// Benefits:
// - No stop-the-world rebalances
// - Only affected partitions are revoked
// - Reduces processing pause

Monitoring Consumer Groups

# List consumer groups
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --list

# Describe group status
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 \
    --describe --group order-processor

# Output:
GROUP           TOPIC    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
order-processor orders   0          12345           12350           5
order-processor orders   1          12340           12340           0
order-processor orders   2          12342           12345           3
order-processor orders   3          12338           12341           3

Lag Monitoring

from kafka import KafkaConsumer
from prometheus_client import Gauge, start_http_server

lag_gauge = Gauge(
    'kafka_consumer_lag',
    'Consumer lag per partition',
    ['topic', 'partition', 'consumer_group']
)

def monitor_lag(bootstrap_servers, topic, group_id):
    consumer = KafkaConsumer(
        bootstrap_servers=bootstrap_servers,
        group_id=group_id
    )
    
    while True:
        partitions = consumer.partitions_for_topic(topic)
        for partition in partitions:
            tp = TopicPartition(topic, partition)
            
            # Get end offset
            consumer.seek_to_end(tp)
            end_offset = consumer.position(tp)
            
            # Get committed offset
            committed = consumer.committed(tp)
            
            lag = end_offset - committed if committed else end_offset
            lag_gauge.labels(topic, partition, group_id).set(lag)
        
        time.sleep(10)

Follow-Up Questions

  1. What happens when a consumer crashes during a rebalance?
  2. How does static group membership affect rebalancing behavior?
  3. Explain the difference between RangeAssignor and CooperativeStickyAssignor.
  4. What is the impact of setting max.poll.interval.ms too low?
  5. How would you handle a consumer that is consistently too slow?

Advertisement