Kafka Multi-Datacenter Replication
Overview
Multi-datacenter Kafka replication enables disaster recovery, geo-redundancy, and data locality across distributed infrastructure. This guide covers MirrorMaker 2 replication, failover strategies, and active-active architectures.
Use Cases
- Disaster Recovery: Failover to secondary DC during outages
- Geo-Redundancy: Serve consumers from nearest datacenter
- Compliance: Meet data residency requirements
- Performance: Reduce latency for geographically distributed users
MirrorMaker 2 Architecture
Core Components
| Component | Function |
|---|---|
| Source Connector | Reads from source cluster |
| Sink Connector | Writes to target cluster |
| Checkpoint Connector | Syncs consumer offsets |
| Heartbeat Connector | Monitors cluster health |
MirrorMaker 2 Configuration
# mm2.properties
clusters = dc1, dc2
# DC1 connection
dc1.bootstrap.servers = kafka-dc1:9092
dc1.security.protocol = SSL
# DC2 connection
dc2.bootstrap.servers = kafka-dc2:9092
dc2.security.protocol = SSL
# Replication flow
dc1->dc2.enabled = true
dc1->dc2.topics = orders.*, payments.*
# Topic naming
topics.pattern = ${sourceCluster}.${topic}
# Replication policy
replication.factor = 3
sync.topic.configs.enabled = true
refresh.topics.interval.seconds = 60
Kafka Connect Configuration
{
"name": "mirror-maker-2",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"tasks.max": "5",
"clusters": "dc1,dc2",
"dc1.bootstrap.servers": "kafka-dc1-1:9092,kafka-dc1-2:9092,kafka-dc1-3:9092",
"dc2.bootstrap.servers": "kafka-dc2-1:9092,kafka-dc2-2:9092,kafka-dc2-3:9092",
"dc1->dc2.enabled": "true",
"dc1->dc2.topics": "orders,payments,users",
"topics.pattern": "${sourceCluster}.${topic}",
"sync.topic.configs.enabled": "true",
"replication.factor": "3",
"emit.heartbeats.enabled": "true",
"emit.checkpoints.enabled": "true"
}
}
Replication Policies
Topic Translation
# Custom topic naming convention
# Source: orders (DC1)
# Replicated: dc1.orders (DC2)
TOPIC_TRANSLATION = {
'source_pattern': '{topic}',
'target_pattern': '{source_cluster}.{topic}',
'examples': {
'orders': 'dc1.orders',
'payments': 'dc1.payments',
'users': 'dc1.users'
}
}
Replication Factor
# Check replication status
kafka-topics.sh --bootstrap-server kafka-dc2:9092 \
--describe --topic dc1.orders
# Output:
# Topic: dc1.orders Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
# Topic: dc1.orders Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Filter Policies
# Replicate only specific topics
dc1->dc2.topics = orders.*, payments.*
# Exclude certain topics
dc1->dc2.topics.exclude = .*\.internal, .*\.temp
# Replicate by pattern
topics.regex = orders|payments|users
Offset Synchronization
Checkpoint Connector
# Offset sync configuration
emit.checkpoints.enabled = true
emit.checkpoints.interval.seconds = 60
# Consumer group translation
groups.enabled = true
groups = order-processor, payment-processor
Offset Sync Example
# DC1 Consumer Group Offsets
{
"group": "order-processor",
"topic": "orders",
"partition": 0,
"offset": 152345,
"timestamp": 1640995200000
}
# Synced to DC2
{
"group": "dc1.order-processor",
"topic": "dc1.orders",
"partition": 0,
"offset": 152345, # Same offset
"timestamp": 1640995200000
}
Failover with Offset Reset
from kafka import KafkaConsumer
# After failover, reset to synced offset
consumer = KafkaConsumer(
'dc1.orders',
bootstrap_servers=['kafka-dc2:9092'],
group_id='order-processor-failover',
auto_offset_reset='earliest',
enable_auto_commit=False
)
# Manually seek to synced offset
offsets = get_synced_offsets('order-processor', 'orders')
for partition, offset in offsets.items():
consumer.seek(partition, offset)
Failover Strategies
Active-Passive Failover
class KafkaFailoverManager:
def __init__(self, primary_cluster, secondary_cluster):
self.primary = primary_cluster
self.secondary = secondary_cluster
self.active = 'primary'
def get_producer(self):
if self.active == 'primary':
return KafkaProducer(bootstrap_servers=self.primary)
else:
return KafkaProducer(bootstrap_servers=self.secondary)
def failover(self):
self.active = 'secondary'
# Reset consumer offsets to synced position
self.reset_consumer_offsets()
def failback(self):
# Ensure all data is replicated back
self.wait_for_replication_complete()
self.active = 'primary'
Health Check Configuration
import requests
from kafka import KafkaConsumer
def check_cluster_health(bootstrap_servers):
"""Check if Kafka cluster is healthy"""
try:
consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
request_timeout_ms=5000
)
# Check if we can list topics
topics = consumer.topics()
consumer.close()
return True
except Exception as e:
print(f"Cluster health check failed: {e}")
return False
# Monitor and failover
def monitor_and_failover(primary, secondary):
while True:
if not check_cluster_health(primary):
print("Primary cluster failed, initiating failover")
failover_manager.failover()
break
time.sleep(10)
Automatic Failover with ZooKeeper
from kazoo.client import KazooClient
class ZooKeeperFailover:
def __init__(self, zk_hosts):
self.zk = KazooClient(hosts=zk_hosts)
self.zk.start()
def watch_cluster(self, cluster_path, callback):
@self.zk.DataWatch(cluster_path)
def watch_node(data, stat):
if data is None:
callback('cluster_down')
else:
callback('cluster_up')
Active-Active Architecture
Bi-directional Replication
# Both directions enabled
dc1->dc2.enabled = true
dc1->dc2.topics = orders.*, payments.*
dc2->dc1.enabled = true
dc2->dc1.topics = orders.*, payments.*
# Prevent infinite loops
replication.policy.class = org.apache.kafka.connect.mirror.IdentityReplicationPolicy
Conflict Resolution
# Use unique keys to prevent conflicts
def produce_with_unique_key(producer, topic, message):
# Add datacenter identifier to key
key = f"{message['id']}-{get_datacenter_id()}"
producer.send(topic, key=key.encode(), value=json.dumps(message).encode())
# Merge strategy for conflicts
class ConflictResolver:
def resolve(self, dc1_message, dc2_message):
# Last-write-wins based on timestamp
if dc1_message['timestamp'] > dc2_message['timestamp']:
return dc1_message
return dc2_message
Monitoring Multi-DC Replication
Key Metrics
# Prometheus metrics to monitor
metrics:
- name: mirrormaker_replication_latency
description: "Replication latency between DCs"
threshold: "> 1000ms"
- name: mirrormaker_replication_lag
description: "Messages pending replication"
threshold: "> 10000"
- name: mirrormaker_replication_rate
description: "Messages replicated per second"
alert: "< expected_rate * 0.5"
Monitoring Dashboard
{
"panels": [
{
"title": "DC1 to DC2 Replication Lag",
"type": "timeseries",
"targets": [
{
"expr": "sum(mirrormaker_replication_lag{source='dc1', target='dc2'})",
"legendFormat": "{{topic}}"
}
]
},
{
"title": "Replication Latency",
"type": "timeseries",
"targets": [
{
"expr": "histogram_quantile(0.99, mirrormaker_replication_latency_bucket)",
"legendFormat": "p99 latency"
}
]
}
]
}
Disaster Recovery Testing
Regular DR Drills
#!/bin/bash
# dr_drill.sh
echo "Starting DR drill..."
# 1. Verify replication status
kafka-mirror-maker.sh --describe --mm2-cluster dc1-to-dc2
# 2. Simulate primary failure
kubectl scale statefulset kafka-dc1 --replicas=0
# 3. Verify failover
sleep 30
check_consumer_lags --cluster dc2
# 4. Failback
kubectl scale statefulset kafka-dc1 --replicas=3
# 5. Verify replication restored
verify_replication_complete --mm2-cluster dc1-to-dc2
Summary
Multi-datacenter Kafka replication with MirrorMaker 2 provides robust disaster recovery and geo-redundancy. Implement proper offset synchronization, failover strategies, and active-active configurations to ensure high availability across datacenters.