πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Multi-Datacenter Kafka

Apache KafkaMulti-DC⭐ Premium

Advertisement

Multi-Datacenter Kafka

Difficulty: Senior Level | Companies: LinkedIn, Uber, Netflix, Spotify, Confluent

Content

Multi-datacenter Kafka enables disaster recovery, geo-redundancy, and low-latency access across regions. This is critical for global applications.

Architecture Patterns

Architecture Diagram
Pattern 1: Active-Passive
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Active DC (US-East)                            β”‚
β”‚  └── Kafka Cluster (Primary)                    β”‚
β”‚       └── Producers & Consumers                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                    β”‚ MirrorMaker 2
                    β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Passive DC (US-West)                           β”‚
β”‚  └── Kafka Cluster (Backup)                     β”‚
β”‚       └── No Producers (Standby)                β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Pattern 2: Active-Active
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  DC 1 (US-East)              DC 2 (EU-West)    β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
β”‚  β”‚ Kafka       │◄──────────►│ Kafka       β”‚    β”‚
β”‚  β”‚ Cluster     β”‚ MirrorMakerβ”‚ Cluster     β”‚    β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
β”‚       β–²                          β–²             β”‚
β”‚       β”‚                          β”‚             β”‚
β”‚  Producers                  Producers          β”‚
β”‚  Consumers                  Consumers          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

MirrorMaker 2 Configuration

MM2 Properties

# mm2.properties
clusters = us-east, us-west

# Connection settings
us-east.bootstrap.servers = kafka-us-east1:9092,kafka-us-east2:9092
us-west.bootstrap.servers = kafka-us-west1:9092,kafka-us-west2:9092

# Replication flows
us-east->us-west.enabled = true
us-east->us-west.topics = orders, payments, users

us-west->us-east.enabled = true
us-west->us-east.topics = orders, payments, users

# Replication settings
replication.factor = 3
sync.topic.configs.enabled = true
emit.heartbeats.enabled = true
emit.heartbeats.interval.seconds = 1
emit.checkpoints.enabled = true
sync.group.offsets.enabled = true
offset.lag.max = 1000

# Checkpoint settings
checkpoint.group.consistency.level = 2
checkpoint.topic = __consumer_offsets

MM2 Configuration File

# mm2.properties
clusters = primary, secondary

primary.bootstrap.servers = kafka-primary1:9092,kafka-primary2:9092,kafka-primary3:9092
secondary.bootstrap.servers = kafka-secondary1:9092,kafka-secondary2:9092,kafka-secondary3:9092

primary->secondary.enabled = true
primary->secondary.topics = .*

secondary->primary.enabled = false  # Active-Passive mode

replication.factor = 3
sync.topic.configs.enabled = true
emit.heartbeats.enabled = true
emit.heartbeats.interval.seconds = 1
emit.checkpoints.enabled = true
sync.group.offsets.enabled = true

Java Admin Client for Multi-DC

import org.apache.kafka.clients.admin.*;

public class MultiDCAdmin {
    
    private final AdminClient primaryAdmin;
    private final AdminClient secondaryAdmin;
    
    public MultiDCAdmin() {
        Properties primaryProps = new Properties();
        primaryProps.put("bootstrap.servers", "kafka-primary:9092");
        primaryAdmin = AdminClient.create(primaryProps);
        
        Properties secondaryProps = new Properties();
        secondaryProps.put("bootstrap.servers", "kafka-secondary:9092");
        secondaryAdmin = AdminClient.create(secondaryProps);
    }
    
    public void createTopicInBothDCs(String topic, int partitions) {
        NewTopic newTopic = new NewTopic(topic, partitions, (short) 3);
        
        // Create in primary
        primaryAdmin.createTopics(Collections.singleton(newTopic));
        
        // Create in secondary
        secondaryAdmin.createTopics(Collections.singleton(newTopic));
        
        System.out.println("Topic created in both DCs");
    }
    
    public void verifyReplication(String topic) throws Exception {
        // Get topic details from primary
        DescribeTopicsResult primaryResult = primaryAdmin.describeTopics(
            Collections.singleton(topic)
        );
        
        // Get topic details from secondary
        DescribeTopicsResult secondaryResult = secondaryAdmin.describeTopics(
            Collections.singleton(topic)
        );
        
        // Compare
        TopicDescription primaryDesc = primaryResult.all().get().get(topic);
        TopicDescription secondaryDesc = secondaryResult.all().get().get(topic);
        
        System.out.println("Primary partitions: " + primaryDesc.partitions().size());
        System.out.println("Secondary partitions: " + secondaryDesc.partitions().size());
    }
}

Python Multi-DC Operations

from kafka import KafkaAdminClient
from kafka.admin import TopicPartition, NewTopic
import time

class MultiDCManager:
    def __init__(self, primary_servers, secondary_servers):
        self.primary_admin = KafkaAdminClient(
            bootstrap_servers=primary_servers,
            client_id='primary-admin'
        )
        self.secondary_admin = KafkaAdminClient(
            bootstrap_servers=secondary_servers,
            client_id='secondary-admin'
        )
    
    def create_topic(self, topic, num_partitions, replication_factor=3):
        """Create topic in both DCs"""
        topic_config = NewTopic(
            name=topic,
            num_partitions=num_partitions,
            replication_factor=replication_factor
        )
        
        # Create in primary
        self.primary_admin.create_topics([topic_config])
        print(f"Created topic {topic} in primary DC")
        
        # Create in secondary
        self.secondary_admin.create_topics([topic_config])
        print(f"Created topic {topic} in secondary DC")
    
    def verify_replication(self, topic):
        """Verify topic exists in both DCs"""
        primary_topics = self.primary_admin.list_topics()
        secondary_topics = self.secondary_admin.list_topics()
        
        if topic in primary_topics and topic in secondary_topics:
            print(f"Topic {topic} exists in both DCs")
            return True
        else:
            print(f"Topic {topic} missing in one or both DCs")
            return False
    
    def monitor_lag(self, topic):
        """Monitor replication lag between DCs"""
        while True:
            # Get offsets from primary
            primary_offsets = self.get_offsets(self.primary_admin, topic)
            
            # Get offsets from secondary
            secondary_offsets = self.get_offsets(self.secondary_admin, topic)
            
            # Calculate lag
            for partition in primary_offsets:
                primary_offset = primary_offsets[partition]
                secondary_offset = secondary_offsets.get(partition, 0)
                lag = primary_offset - secondary_offset
                
                print(f"Partition {partition}: Primary={primary_offset}, "
                      f"Secondary={secondary_offset}, Lag={lag}")
            
            time.sleep(10)
    
    def get_offsets(self, admin, topic):
        """Get latest offsets for topic"""
        # This is a simplified example
        # In production, use list_offsets method
        return {}

Disaster Recovery Procedures

Failover Process

#!/bin/bash
# failover.sh - Manual failover to secondary DC

PRIMARY_DC="us-east"
SECONDARY_DC="us-west"
TOPIC="orders"

echo "Starting failover from $PRIMARY_DC to $SECONDARY_DC"

# 1. Stop producers in primary DC
echo "Stopping producers..."
# Send shutdown signal to producer applications

# 2. Wait for replication to catch up
echo "Waiting for replication lag to reach 0..."
while [ $(get_lag $TOPIC) -gt 0 ]; do
    sleep 5
done

# 3. Verify secondary has all data
echo "Verifying secondary DC data..."
verify_secondary_data $TOPIC

# 4. Update DNS/load balancers to point to secondary
echo "Updating DNS records..."
update_dns $SECONDARY_DC

# 5. Start consumers in secondary DC
echo "Starting consumers in secondary DC..."
start_consumers $SECONDARY_DC

echo "Failover complete. Secondary DC is now active."

Failback Process

#!/bin/bash
# failback.sh - Return to primary DC

echo "Starting failback to primary DC"

# 1. Stop producers in secondary DC
echo "Stopping producers in secondary DC..."

# 2. Enable replication from secondary to primary
echo "Enabling reverse replication..."
enable_reverse_replication

# 3. Wait for primary to catch up
echo "Waiting for primary to catch up..."
while [ $(get_reverse_lag) -gt 0 ]; do
    sleep 5
done

# 4. Update DNS to point to primary
echo "Updating DNS records..."
update_dns "us-east"

# 5. Start producers in primary DC
echo "Starting producers in primary DC..."

echo "Failback complete. Primary DC is now active."

Geo-Partitioning Strategy

// Geo-aware partitioning
public class GeoPartitioner implements Partitioner {
    
    private Map<String, Integer> regionPartitions;
    
    @Override
    public void configure(Map<String, ?> configs) {
        regionPartitions = new HashMap<>();
        regionPartitions.put("us-east", 0);
        regionPartitions.put("us-west", 1);
        regionPartitions.put("eu-west", 2);
        regionPartitions.put("ap-south", 3);
    }
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        // Extract region from value
        String region = extractRegion(value.toString());
        
        // Route to region-specific partition
        int basePartition = regionPartitions.getOrDefault(region, 0);
        
        // Add some distribution within region
        int hash = Math.abs(Utils.murmur2(keyBytes));
        return basePartition + (hash % 2);  # 2 partitions per region
    }
}

Follow-Up Questions

  1. What is the difference between MirrorMaker 1 and MirrorMaker 2?
  2. How does Kafka handle exactly-once semantics in multi-DC replication?
  3. Explain the trade-offs between active-active and active-passive multi-DC setups.
  4. How would you handle schema evolution across multiple datacenters?
  5. What are the network requirements for multi-DC Kafka deployments?

Advertisement