Compacted Topics, Log Deduplication, Changelog Topics
Difficulty: Senior | Asked at: Confluent, LinkedIn, Uber
ℹ️Interview Context
Compacted topics are fundamental to Kafka's state management. Interviewers expect you to understand how compaction works, when to use it, and how changelog topics enable exactly-once processing.
The Question
How do Kafka compacted topics work? Explain the log compaction process and how changelog topics enable state management in Kafka Streams.
Compacted Topic Basics
How Compaction Works
Before Compaction:
Offset: 0 1 2 3 4 5 6 7
Key: A=1 B=2 A=3 C=1 B=4 A=5 C=2 B=6
↑ ↑ ↑ ↑
Latest A Latest B Latest A Latest B
After Compaction:
Offset: 0 1 2 3 4 5 6 7
Key: A=1 B=2 A=3 C=1 B=4 A=5 C=2 B=6
(old A deleted, old B deleted, old C deleted)
Result:
Offset: 2 3 4 5 6 7
Key: A=3 C=1 B=4 A=5 C=2 B=6
Compaction guarantees:
1. For each key, at least one value remains
2. Newer values are never deleted
3. Order within key is preserved (newest value appears last)
Compaction Configuration
# Topic-level compaction configuration
from kafka.admin import KafkaAdminClient, NewTopic
admin = KafkaAdminClient(bootstrap_servers='localhost:9092')
# Create compacted topic
topic = NewTopic(
name='user-sessions',
num_partitions=6,
replication_factor=3,
topic_configs={
'cleanup.policy': 'compact', # Enable compaction
'min.cleanable.dirty.ratio': 0.3, # 30% dirty triggers clean
'segment.ms': 604800000, # 7 days
'delete.retention.ms': 86400000, # 24 hours for tombstones
'min.compaction.lag.ms': 0, # No minimum lag
'max.compaction.lag.ms': 604800000 # Max 7 days before compaction
}
)
admin.create_topics([topic])
Compaction Properties
compaction_properties = {
'cleanup.policy': {
'delete': 'Default. Delete segments after retention.',
'compact': 'Enable log compaction.',
'compact,delete': 'Both compaction and deletion.'
},
'min.cleanable.dirty.ratio': {
'description': 'Ratio of dirty (uncompacted) messages to trigger cleaning.',
'default': 0.5,
'range': '0.0 to 1.0',
'recommendation': '0.3 for frequent compaction'
},
'segment.ms': {
'description': 'Time before segment is eligible for compaction.',
'default': 604800000, # 7 days
'recommendation': 'Lower for faster compaction'
},
'delete.retention.ms': {
'description': 'How long tombstone markers are retained.',
'default': 86400000, # 24 hours
'recommendation': 'Match your delete expectations'
}
}
⚠️Compaction Warning
Compaction is NOT real-time. There's a delay between writing and compaction. Don't rely on immediate deduplication. Use min.cleanable.dirty.ratio=0.1 for more frequent compaction.
Tombstone Records
What are Tombstones
# Tombstone = key with null value
# Signals deletion of a key
# Writing a tombstone
producer.send('user-sessions', key=b'user123', value=None)
# Tombstone behavior:
# 1. Tombstone is written to topic
# 2. During compaction, tombstone deletes the key
# 3. Tombstone is removed after delete.retention.ms
# Tombstone retention
tombstone_retention = {
'delete.retention.ms': 86400000, # 24 hours
'purpose': 'Allow consumers to see deletion',
'cleanup': 'Tombstone removed after retention period'
}
Tombstone Handling
class TombstoneHandler:
"""
Handle tombstone records in compacted topics.
"""
def __init__(self):
self.active_keys = {}
def process_message(self, key, value):
"""
Process message from compacted topic.
If value is None, it's a tombstone (deletion).
"""
if value is None:
# Tombstone - key should be deleted
if key in self.active_keys:
del self.active_keys[key]
log.info(f"Key {key} deleted via tombstone")
else:
# Regular message - update state
self.active_keys[key] = value
def get_state(self):
"""Get current state (all non-deleted keys)"""
return self.active_keys.copy()
Changelog Topics
How Changelog Topics Work
Kafka Streams Changelog:
1. Processor maintains local state store
2. State changes written to changelog topic
3. Changelog topic is compacted
4. On failure, state rebuilt from changelog
Processor Flow:
Input Event → Update State → Write to Changelog Topic
State Recovery:
Read Changelog Topic → Rebuild Local State → Resume Processing
Example:
Event: {user: "alice", action: "login"}
State Update: {alice: {login_count: 1}}
Changelog: alice → {login_count: 1}
Event: {user: "alice", action: "purchase"}
State Update: {alice: {login_count: 1, purchases: 1}}
Changelog: alice → {login_count: 1, purchases: 1}
Changelog Topic Configuration
from kafka.streams import StreamsBuilder, Materialized
from kafka.streams.state import RocksDBStore
# Changelog topic configuration
changelog_config = {
'changelog.replication.factor': 3,
'changelog.num.partitions': 1,
'changelog.retention.ms': 604800000, # 7 days
'changelog.cleanup.policy': 'compact'
}
# Materialized state store with changelog
builder = StreamsBuilder()
stream = builder.stream('input-topic')
# Aggregate with state store
aggregated = (
stream
.group_by_key()
.aggregate(
initializer=lambda: {'count': 0},
aggregator=lambda key, value, agg: {
'count': agg['count'] + 1,
'last_value': value
},
materialized=Materialized.as('user-counts')
.with_key_serde(Serdes.String())
.with_value_serde(JsonSerde())
.with_logging_enabled() # Enable changelog
.with_config(changelog_config)
)
)
Changelog Topic Naming
# Changelog topic naming convention
# {application.id}-{store.name}-changelog
# Example:
# application.id: user-analytics
# store.name: user-counts
# changelog topic: user-analytics-user-counts-changelog
# Internal topics:
# - {app.id}-{store.name}-changelog (state store backup)
# - {app.id}-{store.name}-repartition (repartitioning)
# - {app.id}-{store.name}-compact (compacted store)
ℹ️Changelog Purpose
Changelog topics serve as durable backups for Kafka Streams state stores. If a node crashes, the state is rebuilt from the changelog. This enables exactly-once semantics by ensuring state consistency.
Log Deduplication
Deduplication Strategies
# Strategy 1: Compacted topic with latest value
# Producer sends multiple values for same key
# Compaction keeps only the latest
# Strategy 2: Deduplication window
class DeduplicationWindow:
"""
Deduplicate messages within a time window.
"""
def __init__(self, window_size_ms=60000):
self.window_size = window_size_ms
self.seen = {} # key -> timestamp
def is_duplicate(self, key, message_id):
"""Check if message is duplicate"""
current_time = int(time.time() * 1000)
# Clean old entries
self.seen = {
k: v for k, v in self.seen.items()
if current_time - v < self.window_size
}
# Check if seen
if message_id in self.seen:
return True
# Mark as seen
self.seen[message_id] = current_time
return False
# Strategy 3: Bloom filter for probabilistic dedup
class BloomFilterDedup:
"""
Probabilistic deduplication using Bloom filter.
Lower memory, but false positives possible.
"""
def __init__(self, expected_items=1000000, false_positive_rate=0.01):
from pybloom_live import BloomFilter
self.bloom = BloomFilter(
capacity=expected_items,
error_rate=false_positive_rate
)
def is_duplicate(self, message_id):
"""Check if message might be duplicate"""
if message_id in self.bloom:
return True # Might be duplicate (false positive possible)
self.bloom.add(message_id)
return False
Deduplication Formula
Where:
- = filter size (bits)
- = number of elements
- = number of hash functions
def calculate_bloom_filter_size(
expected_items: int,
false_positive_rate: float
) -> dict:
"""
Calculate optimal Bloom filter parameters.
"""
import math
# Optimal filter size
m = - (expected_items * math.log(false_positive_rate)) / (math.log(2) ** 2)
# Optimal number of hash functions
k = (m / expected_items) * math.log(2)
# Memory in bytes
memory_bytes = m / 8
return {
'filter_size_bits': int(m),
'num_hash_functions': int(k),
'memory_bytes': int(memory_bytes),
'memory_mb': memory_bytes / (1024 * 1024)
}
# Example
bloom = calculate_bloom_filter_size(
expected_items=1000000,
false_positive_rate=0.01
)
print(f"Filter size: {bloom['memory_mb']:.1f} MB")
print(f"Hash functions: {bloom['num_hash_functions']}")
⚠️Deduplication Limitation
Compacted topics only guarantee at-least-once delivery until compaction occurs. For strict exactly-once, you need idempotent producers or application-level deduplication.
Use Cases for Compacted Topics
Materialized View Pattern
# Materialized view using compacted topic
class MaterializedView:
"""
Maintain materialized view in compacted topic.
Each update writes the complete current state.
Compaction ensures only latest state per key.
"""
def __init__(self, topic, producer):
self.topic = topic
self.producer = producer
self.state = {}
def update(self, key, value):
"""
Update materialized view.
Writes complete state to compacted topic.
Old values are cleaned up by compaction.
"""
# Merge with existing state
if key in self.state:
self.state[key].update(value)
else:
self.state[key] = value
# Write to compacted topic
self.producer.send(
self.topic,
key=key.encode(),
value=json.dumps(self.state[key]).encode()
)
def delete(self, key):
"""Delete key via tombstone"""
if key in self.state:
del self.state[key]
# Write tombstone
self.producer.send(
self.topic,
key=key.encode(),
value=None
)
Event Sourcing Pattern
# Event sourcing with compacted topics
class EventSourcing:
"""
Event sourcing using compacted topics.
Events are the source of truth.
Compacted topic maintains current state.
"""
def __init__(self, events_topic, state_topic, producer):
self.events_topic = events_topic
self.state_topic = state_topic
self.producer = producer
def append_event(self, event):
"""
Append event to event log.
"""
self.producer.send(
self.events_topic,
key=event['entity_id'].encode(),
value=json.dumps(event).encode()
)
def update_state(self, entity_id, state):
"""
Update state in compacted topic.
State is derived from events.
Compaction keeps only latest state.
"""
self.producer.send(
self.state_topic,
key=entity_id.encode(),
value=json.dumps(state).encode()
)
Configuration Store Pattern
# Configuration store using compacted topics
class ConfigurationStore:
"""
Distributed configuration store using compacted topics.
Configuration updates written to compacted topic.
Consumers read latest configuration.
"""
def __init__(self, config_topic, producer, consumer):
self.config_topic = config_topic
self.producer = producer
self.consumer = consumer
self.config = {}
def update_config(self, key, value):
"""Update configuration"""
self.producer.send(
self.config_topic,
key=key.encode(),
value=json.dumps(value).encode()
)
def load_config(self):
"""
Load configuration from compacted topic.
Read all messages, keeping only latest per key.
"""
self.consumer.subscribe([self.config_topic])
for message in self.consumer:
key = message.key.decode()
value = json.loads(message.value.decode()) if message.value else None
if value is None:
# Tombstone - delete config
if key in self.config:
del self.config[key]
else:
self.config[key] = value
return self.config
ℹ️Key Insight
Compacted topics are the foundation for:
- Stream processing state: Changelog topics backup state stores
- Materialized views: Current state per key
- Configuration distribution: Latest config per component
- Event sourcing: Derived state from event logs
Compaction Performance
def estimate_compaction_performance(
topic_size_gb: int,
dirty_ratio: float,
compaction_throughput_mbps: float
) -> dict:
"""
Estimate compaction time and resource usage.
"""
# Data to compact
dirty_data_gb = topic_size_gb * dirty_ratio
# Compaction time
compaction_time_seconds = (dirty_data_gb * 1024) / compaction_throughput_mbps
return {
'topic_size_gb': topic_size_gb,
'dirty_data_gb': dirty_data_gb,
'compaction_time_seconds': compaction_time_seconds,
'compaction_time_minutes': compaction_time_seconds / 60,
'recommended_min_cleanable_dirty_ratio': 0.3
}
# Example
perf = estimate_compaction_performance(
topic_size_gb=100,
dirty_ratio=0.5,
compaction_throughput_mbps=100
)
print(f"Compaction time: {perf['compaction_time_minutes']:.1f} minutes")
⚠️Compaction Cost
Compaction is CPU and I/O intensive. Monitor:
LogEndOffsetvsLogStartOffsetfor uncleaned dataCleanerRecleanPercentfor compaction efficiencyBytesInPerSecduring compaction