Multi-Datacenter Kafka: MirrorMaker 2, Replication
Difficulty: Staff | Asked at: Netflix, Uber, LinkedIn, AWS
βΉοΈInterview Context
Multi-datacenter Kafka is a complex topic. Interviewers expect you to understand replication strategies, failover mechanisms, and the trade-offs between consistency and availability.
The Question
How do you replicate Kafka topics across datacenters? Explain MirrorMaker 2 architecture and how it handles failover. What are the trade-offs between active-active and active-passive setups?
Multi-DC Architecture Patterns
Active-Passive (Cold Standby)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Active-Passive Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Primary DC β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Broker 1 β β Broker 2 β β Broker 3 β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β β β β
β ββββββββββββββββββΌβββββββββββββββββ β
β β β
β βββββββββββββ΄ββββββββββββ β
β β MirrorMaker 2 β β
β βββββββββββββ¬ββββββββββββ β
β β β
β Replication β
β β β
β DR DC β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Broker 4 β β Broker 5 β β Broker 6 β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β
β Passive: Ready for failover β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Active-Active
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Active-Active Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β DC 1 β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Broker 1 β β Broker 2 β β Broker 3 β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β β β β
β ββββββββββββββββββΌβββββββββββββββββ β
β β β
β βββββββββββββ΄ββββββββββββ β
β β MirrorMaker 2 βββββββββ β
β βββββββββββββ¬ββββββββββββ β β
β β β β
β Bidirectional β β
β Replication β β
β β β β
β DC 2 β β β
β βββββββββββββββ βββββββ΄ββββββββ βββββββββ΄βββββββ β
β β Broker 4 β β Broker 5 β β Broker 6 β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β β β β
β ββββββββββββββββββΌβββββββββββββββββ β
β β β
β βββββββββββββ΄ββββββββββββ β
β β MirrorMaker 2 βββββββββ β
β βββββββββββββββββββββββββ β
β β
β Both DCs active, bidirectional replication β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Pattern Comparison
| Pattern | RPO | RTO | Complexity | Use Case |
|---|---|---|---|---|
| Active-Passive | Minutes | Minutes-Hours | Low | DR, compliance |
| Active-Active | Seconds | Seconds | High | Global availability |
| Multi-Active | Sub-second | Seconds | Very High | Ultra-low latency |
βΉοΈPattern Selection
- Active-Passive: When you need DR for compliance but don't need immediate failover
- Active-Active: When you need global availability and can handle conflict resolution
- Multi-Active: When you need sub-second failover and have the infrastructure budget
MirrorMaker 2 Architecture
How MM2 Works
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β MirrorMaker 2 Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Source Cluster β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Topics: orders, users, events β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β β Consumer β
β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β MirrorMaker 2 β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββ β β
β β β Source Connector β β β
β β β - Reads from source topics β β β
β β β - Preserves partitioning β β β
β β β - Handles schema translation β β β
β β βββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββ β β
β β β Sink Connector β β β
β β β - Writes to target topics β β β
β β β - Remaps topic names β β β
β β β - Translates consumer groups β β β
β β βββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β β Producer β
β β β
β Target Cluster β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Topics: DC1.orders, DC1.users, DC1.events β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
MM2 Configuration
# mm2.properties
clusters = source, target
source.bootstrap.servers = kafka-source:9092
target.bootstrap.servers = kafka-target:9092
# Replication configuration
source->target.enabled = true
target->source.enabled = false # One-way replication
# Topic naming
replication.policy.class = org.apache.kafka.connect.mirror.IdentityReplicationPolicy
# Or custom: com.example.CustomReplicationPolicy
# Topic pattern to replicate
topics = orders.*, users.*, events.*
# Exclude topics
topics.exclude = .*\\.internal, __consumer_offsets
# Sync group offsets
sync.group.offsets.enabled = true
sync.group.offsets.interval.seconds = 60
# Consumer group translation
groups = my-consumer-group, another-group
# Checkpointing
checkpoints.topic = mm2-checkpoints
heartbeats.topic = mm2-heartbeats
MM2 Topic Naming
# Default topic naming with IdentityReplicationPolicy
# Source topic: orders
# Target topic: orders (same name)
# With DefaultReplicationPolicy
# Source topic: orders
# Target topic: source.cluster.orders
class CustomReplicationPolicy:
"""
Custom topic naming for cross-DC replication.
"""
def __init__(self, prefix='dc2'):
self.prefix = prefix
def sourceTopic(self, target_topic):
"""Map target topic back to source"""
if target_topic.startswith(self.prefix + '.'):
return target_topic[len(self.prefix) + 1:]
return None
def targetTopic(self, source_topic):
"""Map source topic to target"""
return f"{self.prefix}.{source_topic}"
# Usage
policy = CustomReplicationPolicy(prefix='dc2')
print(policy.targetTopic('orders')) # dc2.orders
print(policy.sourceTopic('dc2.orders')) # orders
MM2 Checkpointing
class CheckpointManager:
"""
Manage MM2 checkpoints for offset tracking.
"""
def __init__(self):
self.checkpoints = {}
def record_checkpoint(self, source_cluster, topic, partition, offset):
"""
Record checkpoint for replicated offset.
Checkpoints are stored in mm2-checkpoints topic.
Used for exactly-once replication.
"""
key = f"{source_cluster}.{topic}.{partition}"
self.checkpoints[key] = {
'source_cluster': source_cluster,
'topic': topic,
'partition': partition,
'offset': offset,
'timestamp': int(time.time() * 1000)
}
def get_checkpoint(self, source_cluster, topic, partition):
"""Get last checkpoint for partition"""
key = f"{source_cluster}.{topic}.{partition}"
return self.checkpoints.get(key)
def calculate_lag(self, source_cluster, topic, partition, current_offset):
"""
Calculate replication lag.
Lag = Current Source Offset - Checkpointed Offset
"""
checkpoint = self.get_checkpoint(source_cluster, topic, partition)
if checkpoint is None:
return current_offset # Full replication needed
return current_offset - checkpoint['offset']
β οΈMM2 Limitation
MM2 uses at-least-once delivery. Messages may be duplicated during failover. For exactly-once replication, you need custom solutions or Confluent Replicator.
Failover Strategies
Failover Decision Matrix
def evaluate_failover_conditions(dc1_metrics, dc2_metrics):
"""
Evaluate conditions for failover.
Failover triggers:
1. DC completely unreachable
2. Broker cluster down
3. Replication lag too high
4. Manual failover for maintenance
"""
failover_needed = False
reasons = []
# Check DC reachability
if not dc1_metrics.get('reachable', True):
failover_needed = True
reasons.append("DC1 unreachable")
# Check broker health
if dc1_metrics.get('active_brokers', 0) == 0:
failover_needed = True
reasons.append("No active brokers in DC1")
# Check replication lag
if dc1_metrics.get('replication_lag_seconds', 0) > 300:
failover_needed = True
reasons.append(f"Replication lag: {dc1_metrics['replication_lag_seconds']}s")
# Check partition availability
if dc1_metrics.get('offline_partitions', 0) > 0:
failover_needed = True
reasons.append(f"Offline partitions: {dc1_metrics['offline_partitions']}")
return {
'failover_needed': failover_needed,
'reasons': reasons,
'recommended_action': 'FAILOVER' if failover_needed else 'MONITOR'
}
Automated Failover
class FailoverController:
"""
Automated failover controller for multi-DC Kafka.
"""
def __init__(self, primary_dc, dr_dc):
self.primary = primary_dc
self.dr = dr_dc
self.failover_in_progress = False
def monitor_and_failover(self):
"""
Monitor primary DC and failover if needed.
"""
while True:
# Check primary DC health
health = self.check_dc_health(self.primary)
if not health['healthy']:
log.warning(f"Primary DC unhealthy: {health['issues']}")
# Wait for confirmation period
time.sleep(30)
# Re-check
health = self.check_dc_health(self.primary)
if not health['healthy']:
self.execute_failover()
time.sleep(10)
def execute_failover(self):
"""
Execute failover to DR DC.
Steps:
1. Stop producers to primary
2. Wait for replication to catch up
3. Redirect consumers to DR
4. Update DNS/load balancers
5. Verify DR is serving traffic
"""
if self.failover_in_progress:
return
self.failover_in_progress = True
try:
# Step 1: Stop producers
self.stop_producers(self.primary)
# Step 2: Wait for replication
self.wait_for_replication_catchup()
# Step 3: Redirect consumers
self.redirect_consumers(self.dr)
# Step 4: Update DNS
self.update_dns(self.dr)
# Step 5: Verify
if self.verify_dr_healthy():
log.info("Failover completed successfully")
else:
log.error("Failover verification failed")
except Exception as e:
log.error(f"Failover failed: {e}")
# Attempt rollback
self.rollback_failover()
finally:
self.failover_in_progress = False
def check_dc_health(self, dc):
"""Check datacenter health"""
metrics = {
'reachable': self.ping_dc(dc),
'active_brokers': self.count_active_brokers(dc),
'offline_partitions': self.get_offline_partitions(dc),
'replication_lag': self.get_replication_lag(dc)
}
healthy = (
metrics['reachable'] and
metrics['active_brokers'] > 0 and
metrics['offline_partitions'] == 0 and
metrics['replication_lag'] < 60
)
return {'healthy': healthy, 'metrics': metrics}
Failover Timing
def estimate_failover_time(
health_check_interval_sec: int,
failure_threshold: int,
decision_delay_sec: int,
dns_propagation_sec: int
) -> dict:
"""
Estimate total failover time.
"""
detection_time = health_check_interval_sec * failure_threshold
decision_time = decision_delay_sec
execution_time = dns_propagation_sec
total_time = detection_time + decision_time + execution_time
return {
'detection_time_sec': detection_time,
'decision_time_sec': decision_time,
'execution_time_sec': execution_time,
'total_failover_time_sec': total_time,
'total_failover_time_min': total_time / 60
}
# Example
failover_estimate = estimate_failover_time(
health_check_interval_sec=10,
failure_threshold=3,
decision_delay_sec=30,
dns_propagation_sec=60
)
print(f"Total failover time: {failover_estimate['total_failover_time_min']:.1f} minutes")
# Total failover time: 2.0 minutes
βΉοΈFailover Best Practice
Test failover regularly. Document the process. Use runbooks for manual failover. Automate where possible but keep manual override capability.
Latency Considerations
Replication Latency
def calculate_replication_latency(
network_latency_ms: float,
batch_size_bytes: int,
throughput_mbps: float
) -> dict:
"""
Calculate replication latency components.
"""
# Network transfer time
transfer_time_ms = (batch_size_bytes * 8) / (throughput_mbps * 1000000) * 1000
# Total replication latency
total_latency_ms = network_latency_ms + transfer_time_ms
return {
'network_latency_ms': network_latency_ms,
'transfer_time_ms': transfer_time_ms,
'total_latency_ms': total_latency_ms,
'rpo_seconds': total_latency_ms / 1000
}
# Example
latency = calculate_replication_latency(
network_latency_ms=50, # 50ms cross-DC
batch_size_bytes=16384, # 16KB batch
throughput_mbps=1000 # 1 Gbps
)
print(f"Replication latency: {latency['total_latency_ms']:.1f} ms")
print(f"RPO: {latency['rpo_seconds']:.2f} seconds")
Latency vs Throughput Trade-off
def optimize_batch_for_latency(
target_latency_ms: float,
network_rtt_ms: float,
throughput_mbps: float
) -> int:
"""
Calculate optimal batch size for target latency.
"""
# Available time for batch transfer
available_time_ms = target_latency_ms - network_rtt_ms
if available_time_ms <= 0:
return 0 # Cannot achieve target latency
# Calculate batch size
batch_size_bytes = throughput_mbps * 1000000 * available_time_ms / 8 / 1000
return int(batch_size_bytes)
# Example
optimal_batch = optimize_batch_for_latency(
target_latency_ms=100,
network_rtt_ms=50,
throughput_mbps=1000
)
print(f"Optimal batch size: {optimal_batch} bytes")
β οΈLatency Warning
Cross-DC replication always adds latency. For active-active setups, ensure your application can handle eventual consistency. Consider conflict resolution strategies for simultaneous writes to the same key.
Conflict Resolution
class ConflictResolver:
"""
Resolve conflicts in active-active replication.
"""
def __init__(self, strategy='last-write-wins'):
self.strategy = strategy
def resolve(self, key, dc1_value, dc2_value, dc1_timestamp, dc2_timestamp):
"""
Resolve conflict between two DCs.
Strategies:
- last-write-wins: Most recent timestamp wins
- source-priority: Primary DC always wins
- merge: Custom merge logic
- custom: Application-specific resolution
"""
if self.strategy == 'last-write-wins':
return dc1_value if dc1_timestamp > dc2_timestamp else dc2_value
elif self.strategy == 'source-priority':
return dc1_value # Primary DC always wins
elif self.strategy == 'merge':
return self.merge_values(dc1_value, dc2_value)
elif self.strategy == 'custom':
return self.custom_resolve(key, dc1_value, dc2_value)
else:
raise ValueError(f"Unknown strategy: {self.strategy}")
def merge_values(self, dc1_value, dc2_value):
"""Merge two values (application-specific)"""
# Example: merge user profiles
merged = dc1_value.copy()
for field, value in dc2_value.items():
if field not in merged:
merged[field] = value
elif merged[field] != value:
# Conflict - use merge logic
merged[field] = self.merge_field(merged[field], value)
return merged
βΉοΈKey Insight
Multi-datacenter Kafka is about trade-offs:
- Consistency vs Availability: Active-active provides availability but eventual consistency
- Latency vs Durability: Synchronous replication is durable but slow
- Complexity vs Features: More complex setups provide better failover