πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Consumer Groups: Rebalancing, Static Groups, Cooperative

Apache KafkaConsumer⭐ Premium

Advertisement

Consumer Groups: Rebalancing, Static Groups, Cooperative

Difficulty: Senior | Asked at: LinkedIn, Uber, Airbnb, Spotify

ℹ️Interview Context

Consumer group rebalancing is one of the most misunderstood Kafka concepts. Interviewers want to see you understand the different rebalance protocols and when to use each one.

The Question

Explain how Kafka consumer group rebalancing works. What are the different rebalance protocols? How do static group membership and cooperative rebalancing improve system stability?

Consumer Group Architecture

Architecture Diagram
                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                    β”‚     Consumer Group        β”‚
                    β”‚     group.id: "orders"    β”‚
                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                              β”‚
            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
            ↓                 ↓                 ↓
     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
     β”‚ Consumer 1  β”‚  β”‚ Consumer 2  β”‚  β”‚ Consumer 3  β”‚
     β”‚             β”‚  β”‚             β”‚  β”‚             β”‚
     β”‚ P0, P1      β”‚  β”‚ P2, P3      β”‚  β”‚ P4, P5      β”‚
     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
     
     Topic: orders (6 partitions)
     Partition assignment: RangeAssignor or RoundRobinAssignor

Rebalance Trigger Events

# Rebalance is triggered by:
triggers = {
    "consumer_join": "New consumer joins the group",
    "consumer_leave": "Consumer crashes or sends LeaveGroup",
    "heartbeat_timeout": "Consumer fails to heartbeat within session.timeout.ms",
    "subscribe_change": "Consumer changes topic subscription",
    "partition_change": "Topic partitions added/removed",
    "group_coordinator_change": "Coordinator broker changes"
}

# Heartbeat mechanism
class ConsumerHeartbeat:
    def __init__(self, session_timeout_ms=30000, heartbeat_interval_ms=3000):
        self.session_timeout = session_timeout_ms
        self.heartbeat_interval = heartbeat_interval_ms
        self.last_heartbeat = time.time()
    
    def start_heartbeat_loop(self):
        while self.running:
            try:
                self.send_heartbeat()
                self.last_heartbeat = time.time()
            except Exception as e:
                log.error(f"Heartbeat failed: {e}")
            time.sleep(self.heartbeat_interval / 1000)

⚠️Rebalance Cost

During a rebalance, ALL consumers in the group stop processing. With N consumers and P partitions, the entire group halts until rebalancing completes. This is why choosing the right protocol is critical.

Rebalance Protocols

1. Eager Rebalance (Default)

from kafka import KafkaConsumer

# Eager rebalance - default behavior
consumer = KafkaConsumer(
    'orders',
    group_id='my-group',
    bootstrap_servers='localhost:9092',
    
    # Eager rebalance settings
    session_timeout_ms=30000,
    heartbeat_interval_ms=10000,
    max_poll_interval_ms=300000,
    
    # Partition assignment strategy
    partition_assignment_strategy=['org.apache.kafka.clients.consumer.RangeAssignor']
)

How Eager Rebalance Works:

Architecture Diagram
Step 1: Trigger event (e.g., new consumer joins)
Step 2: ALL consumers stop processing
Step 3: ALL consumers revoke their partitions
Step 4: Coordinator collects all consumer subscriptions
Step 5: New partition assignment computed
Step 6: All consumers receive new assignments
Step 7: All consumers resume processing

Timeline:
T+0:    Event triggers rebalance
T+0-5:  All consumers stop, revoke partitions
T+5-10: Coordinator computes new assignment
T+10-15: New assignments sent to consumers
T+15:   Consumers resume processing

Total downtime: 10-30 seconds typically

2. Cooperative Rebalance (Incremental)

from kafka import KafkaConsumer
from kafka.cooperative.assignor import CooperativeStickyAssignor

consumer = KafkaConsumer(
    'orders',
    group_id='my-group',
    bootstrap_servers='localhost:9092',
    
    # Cooperative rebalance
    partition_assignment_strategy=[
        CooperativeStickyAssignor  # Must be set
    ],
    
    # Session management
    session_timeout_ms=30000,
    heartbeat_interval_ms=3000
)

How Cooperative Rebalance Works:

Architecture Diagram
Step 1: Trigger event (e.g., new consumer joins)
Step 2: Only AFFECTED consumers stop processing
Step 3: Only partitions being MOVED are revoked
Step 4: Coordinator computes delta changes
Step 5: Affected consumers receive new assignments
Step 6: Only affected consumers resume processing

Timeline:
T+0:    Event triggers rebalance
T+0-2:  Only affected consumers stop, revoke moved partitions
T+2-5:  Coordinator computes delta assignment
T+5-7:  Affected consumers receive new assignments
T+7:    Affected consumers resume processing

Total downtime: 2-5 seconds for affected consumers only
Other consumers: CONTINUE PROCESSING throughout

ℹ️Cooperative Advantage

With cooperative rebalancing, if Consumer 3 joins and only needs partitions from Consumer 2, Consumers 1 and 3 continue processing uninterrupted. Only Consumer 2 pauses briefly while partitions are transferred.

3. Static Group Membership

from kafka import KafkaConsumer

# Static membership - consumer keeps assignment across restarts
consumer = KafkaConsumer(
    'orders',
    group_id='my-group',
    bootstrap_servers='localhost:9092',
    
    # Static membership
    group_instance_id='consumer-1-fixed',  # Unique per instance
    
    # Longer session timeout for stability
    session_timeout_ms=300000,  # 5 minutes
    
    # Cooperative rebalance
    partition_assignment_strategy=[
        CooperativeStickyAssignor
    ]
)

Static Membership Benefits:

Architecture Diagram
Scenario: Rolling deployment of 10 consumers

Without static membership:
1. Deploy consumer-1 with new version
2. Consumer-1 crashes (graceful shutdown)
3. Rebalance triggered: all 10 consumers paused
4. Consumer-1 rejoins with new version
5. Another rebalance triggered: all 10 consumers paused again
6. Repeat for each consumer...

Total rebalances: 10
Total pause time: 10 Γ— rebalance_time

With static membership:
1. Deploy consumer-1 with new version
2. Consumer-1 sends LeaveGroup request
3. Coordinator marks consumer-1 as "leaving"
4. No rebalance triggered (session timeout not exceeded)
5. Consumer-1 rejoins with same instance_id
6. Coordinator reassigns partitions to same consumer

Total rebalances: 0 (if within session timeout)
Total pause time: 0

Session Timeout Calculation

RebalanceΒ TriggeredΒ if=timeΒ sinceΒ lastΒ heartbeatsession.timeout.ms>1\text{Rebalance Triggered if} = \frac{\text{time since last heartbeat}}{\text{session.timeout.ms}} > 1
Recommended=session.timeout.msβ‰₯3Γ—heartbeat.interval.ms\text{Recommended} = \text{session.timeout.ms} \geq 3 \times \text{heartbeat.interval.ms}
def calculate_session_timeout(
    processing_time_ms: int,
    num_partitions: int,
    safety_margin: float = 2.0
) -> int:
    """
    Calculate appropriate session timeout.
    
    Args:
        processing_time_ms: Max time to process one poll() batch
        num_partitions: Partitions assigned to this consumer
        safety_margin: Multiplier for safety
    
    Returns:
        Recommended session timeout in ms
    """
    # Base processing time
    base_time = processing_time_ms
    
    # Scale with partitions (more partitions = more work)
    scaled_time = base_time * (1 + num_partitions * 0.1)
    
    # Apply safety margin
    timeout = int(scaled_time * safety_margin)
    
    # Minimum 30 seconds
    return max(timeout, 30000)

# Example
timeout = calculate_session_timeout(
    processing_time_ms=5000,  # 5 seconds per poll
    num_partitions=10,
    safety_margin=2.0
)
print(f"Recommended session.timeout.ms: {timeout}")
# ~16 seconds, but we'd use 30000 minimum

⚠️Common Mistake

Setting session.timeout.ms too low causes frequent rebalances during slow processing. Setting it too high delays failure detection. Rule of thumb: 3x your max processing time per poll() batch.

Partition Assignment Strategies

Range Assignor

# RangeAssignor: Assigns contiguous partitions to each consumer
# Good for: Topics with similar partition counts

# Example: 6 partitions, 3 consumers
# Consumer 1: P0, P1 (range 0-1)
# Consumer 2: P2, P3 (range 2-3)
# Consumer 3: P4, P5 (range 4-5)

class RangeAssignor:
    def assign(self, partitions, num_consumers):
        """
        Assign contiguous ranges of partitions.
        P0-P1 β†’ C1, P2-P3 β†’ C2, P4-P5 β†’ C3
        """
        assignment = {}
        partitions_per_consumer = len(partitions) // num_consumers
        remainder = len(partitions) % num_consumers
        
        start = 0
        for consumer_id in range(num_consumers):
            # Extra partition for first 'remainder' consumers
            count = partitions_per_consumer + (1 if consumer_id < remainder else 0)
            end = start + count
            
            assignment[consumer_id] = partitions[start:end]
            start = end
        
        return assignment

RoundRobin Assignor

# RoundRobinAssignor: Distributes partitions evenly
# Good for: Balancing across consumers with different subscriptions

# Example: 2 topics Γ— 3 partitions, 2 consumers
# Topic A: P0, P1, P2
# Topic B: P0, P1, P2
# Consumer 1 subscribes to both
# Consumer 2 subscribes to both

# RoundRobin:
# Consumer 1: A-P0, A-P2, B-P1
# Consumer 2: A-P1, B-P0, B-P2

class RoundRobinAssignor:
    def assign(self, topic_partitions, num_consumers):
        """
        Round-robin assignment across all partitions.
        """
        all_partitions = []
        for topic, partitions in topic_partitions.items():
            for p in partitions:
                all_partitions.append((topic, p))
        
        # Sort for determinism
        all_partitions.sort()
        
        assignment = {i: [] for i in range(num_consumers)}
        for idx, partition in enumerate(all_partitions):
            consumer_id = idx % num_consumers
            assignment[consumer_id].append(partition)
        
        return assignment

Sticky Assignor

# StickyAssignor: Minimizes partition movement
# Good for: Reducing rebalance impact

class StickyAssignor:
    def assign(self, current_assignment, partitions, num_consumers):
        """
        Keep partitions on same consumer when possible.
        Only move partitions when absolutely necessary.
        """
        new_assignment = {}
        
        # Step 1: Try to keep existing assignments
        for consumer_id, assigned in current_assignment.items():
            new_assignment[consumer_id] = [
                p for p in assigned if p in partitions
            ]
        
        # Step 2: Find unassigned partitions
        all_assigned = set()
        for assigned in new_assignment.values():
            all_assigned.update(assigned)
        
        unassigned = [p for p in partitions if p not in all_assigned]
        
        # Step 3: Distribute unassigned partitions
        # to least loaded consumers
        for partition in unassigned:
            min_consumer = min(
                new_assignment.keys(),
                key=lambda c: len(new_assignment[c])
            )
            new_assignment[min_consumer].append(partition)
        
        return new_assignment

ℹ️Assignor Selection

  • RangeAssignor: Best for single-topic subscriptions with similar partition counts
  • RoundRobinAssignor: Best for multi-topic subscriptions
  • StickyAssignor: Best for minimizing rebalance impact
  • CooperativeStickyAssignor: Best for incremental rebalancing (recommended)

CooperativeStickyAssignor

# CooperativeStickyAssignor: Combines cooperative rebalancing with sticky assignment
# This is the recommended strategy for production use

from kafka.cooperative.assignor import CooperativeStickyAssignor

consumer = KafkaConsumer(
    'topic-a', 'topic-b', 'topic-c',
    group_id='my-group',
    bootstrap_servers='localhost:9092',
    
    # Use cooperative sticky assignment
    partition_assignment_strategy=[
        CooperativeStickyAssignor()
    ],
    
    # Important: set cooperative rebalance protocol
    session_timeout_ms=45000,
    heartbeat_interval_ms=3000,
    
    # Static membership for rolling deployments
    group_instance_id='consumer-instance-1'
)

# The cooperative sticky assignor will:
# 1. Compute current assignment
# 2. Compute desired assignment
# 3. Minimize partition movements
# 4. Only revoke partitions that MUST move
# 5. Keep other partitions assigned to their current consumer

Assignment Algorithm Comparison

Architecture Diagram
Scenario: 6 partitions, 3 consumers, Consumer 4 joins

Eager Rebalance:
  Step 1: All consumers revoke ALL partitions
  Step 2: Compute new assignment
  Step 3: Assign new partitions
  Result: ALL consumers pause for 10-30 seconds

Cooperative Sticky:
  Step 1: Compute delta (which partitions need to move)
  Step 2: Only move P4, P5 from C2 to C4
  Step 3: C2 revokes only P4, P5
  Result: Only C2 pauses for 2-5 seconds

Sticky (non-cooperative):
  Step 1: All consumers revoke ALL partitions
  Step 2: Compute assignment minimizing movement
  Step 3: Assign new partitions
  Result: ALL consumers pause, but assignment is sticky

⚠️Migration Warning

When migrating from eager to cooperative rebalancing, you must do a rolling restart. All consumers in the group must use the same assignment strategy. Mixing eager and cooperative strategies causes unpredictable behavior.

Consumer Offset Management

Offset Commit Strategies

from kafka import KafkaConsumer
import atexit

consumer = KafkaConsumer(
    'orders',
    group_id='my-group',
    bootstrap_servers='localhost:9092',
    
    # Auto commit settings
    enable_auto_commit=False,  # Manual commit for control
    auto_commit_interval_ms=5000,
    
    # Isolation level
    isolation_level='read_committed'  # For exactly-once with transactions
)

# Manual offset commit
def process_messages():
    while True:
        messages = consumer.poll(timeout_ms=1000)
        
        for topic_partition, records in messages.items():
            for record in records:
                # Process message
                process_record(record)
            
            # Commit after processing batch
            consumer.commit()
        
        # Or commit specific offset
        # consumer.commit({
        #     topic_partition: OffsetAndMetadata(
        #         offset=last_offset + 1,
        #         metadata='optional'
        #     )
        # })

# Commit on shutdown
def shutdown_hook():
    consumer.commit()
    consumer.close()

atexit.register(shutdown_hook)

Offset Storage

Architecture Diagram
Consumer offsets stored in __consumer_offsets topic:

Key:   {group_id, topic, partition}
Value: {offset, metadata, timestamp}

Example:
Key:   {"group": "orders", "topic": "events", "partition": 0}
Value: {"offset": 12345, "metadata": "", "timestamp": 1690000000000}

Consumer reads its last committed offset on startup
to know where to resume processing.

Offset Lag Monitoring

def calculate_offset_lag(consumer, topic):
    """
    Calculate consumer lag for monitoring.
    """
    from kafka import TopicPartition
    
    lag_info = {}
    
    for partition in consumer.partitions(topic):
        tp = TopicPartition(topic, partition)
        
        # Get committed offset
        committed = consumer.committed(tp)
        
        # Get end offset (latest)
        consumer.seek_to_end(tp)
        end_offset = consumer.position(tp)
        
        # Calculate lag
        lag = end_offset - (committed or 0)
        
        lag_info[partition] = {
            'committed': committed,
            'end_offset': end_offset,
            'lag': lag,
            'lag_percentage': (lag / end_offset * 100) if end_offset > 0 else 0
        }
    
    return lag_info

# Monitoring example
lag = calculate_offset_lag(consumer, 'orders')
for partition, info in lag.items():
    print(f"Partition {partition}: lag={info['lag']} ({info['lag_percentage']:.1f}%)")

ℹ️Lag Threshold

Set alerts when lag exceeds 10,000 messages or 1% of total messages. High lag indicates consumer is falling behind, potentially causing stale data for downstream systems.

Consumer Performance Formula

ConsumerThroughput=messages_processedtime_per_poll+processing_time\text{ConsumerThroughput} = \frac{\text{messages\_processed}}{\text{time\_per\_poll} + \text{processing\_time}}
MaxThroughput=max.poll.recordsΓ—1000max.poll.interval.msΒ msgs/secΒ perΒ consumer\text{MaxThroughput} = \frac{\text{max.poll.records} \times 1000}{\text{max.poll.interval.ms}} \text{ msgs/sec per consumer}
def estimate_consumer_throughput(
    max_poll_records: int,
    processing_time_per_record_ms: float,
    num_consumers: int,
    num_partitions: int
) -> dict:
    """
    Estimate consumer throughput capacity.
    """
    # Time per poll batch
    batch_time = processing_time_per_record_ms * max_poll_records
    
    # Records per second per consumer
    records_per_sec = (max_poll_records * 1000) / max(batch_time, 1)
    
    # Total capacity
    total_records_per_sec = records_per_sec * num_consumers
    
    # Bottleneck: partitions
    partitions_per_consumer = num_partitions / num_consumers
    max_load_per_consumer = records_per_sec * partitions_per_consumer
    
    return {
        'records_per_sec_per_consumer': records_per_sec,
        'total_records_per_sec': total_records_per_sec,
        'partitions_per_consumer': partitions_per_consumer,
        'recommended_consumers': min(num_partitions, total_records_per_sec // records_per_sec)
    }

# Example
result = estimate_consumer_throughput(
    max_poll_records=500,
    processing_time_per_record_ms=1.0,
    num_consumers=6,
    num_partitions=12
)
print(f"Per consumer: {result['records_per_sec_per_consumer']:.0f} records/sec")
print(f"Total: {result['total_records_per_sec']:.0f} records/sec")

Common Rebalance Scenarios

Architecture Diagram
Scenario 1: Consumer crash
  Trigger: Heartbeat timeout (session.timeout.ms exceeded)
  Behavior: Eager - all consumers rebalance
  Behavior: Cooperative - only affected partitions move
  
Scenario 2: Rolling deployment
  Trigger: Consumer sends LeaveGroup on shutdown
  Behavior: Without static membership - rebalance per consumer
  Behavior: With static membership - no rebalance (within session timeout)
  
Scenario 3: Partition expansion
  Trigger: New partitions added to topic
  Behavior: Rebalance to distribute new partitions
  Behavior: Only new partitions assigned (cooperative)
  
Scenario 4: Consumer processing too slow
  Trigger: max.poll.interval.ms exceeded
  Behavior: Consumer kicked from group
  Behavior: Rebalance triggered

⚠️Production Recommendation

Use CooperativeStickyAssignor with static group membership for production workloads. Set session.timeout.ms to 5-10 minutes and heartbeat.interval.ms to 3 seconds. This minimizes rebalance impact while detecting real failures quickly.

Consumer Group State Machine

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  Consumer Group States                   β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                         β”‚
β”‚  EMPTY β†’ PREPARING_REBALANCE β†’ COMPLETING_REBALANCE   β”‚
β”‚    ↑              ↑                    β”‚                β”‚
β”‚    β”‚              β”‚                    ↓                β”‚
β”‚    β”‚              └────────────── STABLE               β”‚
β”‚    β”‚                                                   β”‚
β”‚    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚                                                         β”‚
β”‚  States:                                                β”‚
β”‚  - EMPTY: No members                                    β”‚
β”‚  - PREPARING_REBALANCE: Rebalance in progress          β”‚
β”‚  - COMPLETING_REBALANCE: Assignment computed            β”‚
β”‚  - STABLE: All members processing                       β”‚
β”‚                                                         β”‚
β”‚  Transitions:                                           β”‚
β”‚  - Consumer joins: PREPARING β†’ COMPLETING β†’ STABLE     β”‚
β”‚  - Consumer leaves: STABLE β†’ PREPARING β†’ COMPLETING   β”‚
β”‚  - All consumers leave: β†’ EMPTY                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

ℹ️Monitoring Tip

Track rebalance-rate-per-hour metric. If it's consistently high, consider static membership or cooperative rebalancing. A healthy system should have < 1 rebalance per hour under normal operation.

Advertisement