🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Compacted Topics, Log Deduplication, Changelog Topics

Apache KafkaCompacted Topics⭐ Premium

Advertisement

Compacted Topics, Log Deduplication, Changelog Topics

Difficulty: Senior | Asked at: Confluent, LinkedIn, Uber

ℹ️Interview Context

Compacted topics are fundamental to Kafka's state management. Interviewers expect you to understand how compaction works, when to use it, and how changelog topics enable exactly-once processing.

The Question

How do Kafka compacted topics work? Explain the log compaction process and how changelog topics enable state management in Kafka Streams.

Compacted Topic Basics

How Compaction Works

Architecture Diagram
Before Compaction:
Offset: 0   1   2   3   4   5   6   7
Key:    A=1 B=2 A=3 C=1 B=4 A=5 C=2 B=6
        ↑           ↑       ↑       ↑
        Latest A    Latest B Latest A Latest B

After Compaction:
Offset: 0   1   2   3   4   5   6   7
Key:    A=1 B=2 A=3 C=1 B=4 A=5 C=2 B=6
        (old A deleted, old B deleted, old C deleted)

Result:
Offset: 2   3   4   5   6   7
Key:    A=3 C=1 B=4 A=5 C=2 B=6

Compaction guarantees:
1. For each key, at least one value remains
2. Newer values are never deleted
3. Order within key is preserved (newest value appears last)

Compaction Configuration

# Topic-level compaction configuration
from kafka.admin import KafkaAdminClient, NewTopic

admin = KafkaAdminClient(bootstrap_servers='localhost:9092')

# Create compacted topic
topic = NewTopic(
    name='user-sessions',
    num_partitions=6,
    replication_factor=3,
    topic_configs={
        'cleanup.policy': 'compact',  # Enable compaction
        'min.cleanable.dirty.ratio': 0.3,  # 30% dirty triggers clean
        'segment.ms': 604800000,  # 7 days
        'delete.retention.ms': 86400000,  # 24 hours for tombstones
        'min.compaction.lag.ms': 0,  # No minimum lag
        'max.compaction.lag.ms': 604800000  # Max 7 days before compaction
    }
)

admin.create_topics([topic])

Compaction Properties

compaction_properties = {
    'cleanup.policy': {
        'delete': 'Default. Delete segments after retention.',
        'compact': 'Enable log compaction.',
        'compact,delete': 'Both compaction and deletion.'
    },
    'min.cleanable.dirty.ratio': {
        'description': 'Ratio of dirty (uncompacted) messages to trigger cleaning.',
        'default': 0.5,
        'range': '0.0 to 1.0',
        'recommendation': '0.3 for frequent compaction'
    },
    'segment.ms': {
        'description': 'Time before segment is eligible for compaction.',
        'default': 604800000,  # 7 days
        'recommendation': 'Lower for faster compaction'
    },
    'delete.retention.ms': {
        'description': 'How long tombstone markers are retained.',
        'default': 86400000,  # 24 hours
        'recommendation': 'Match your delete expectations'
    }
}

⚠️Compaction Warning

Compaction is NOT real-time. There's a delay between writing and compaction. Don't rely on immediate deduplication. Use min.cleanable.dirty.ratio=0.1 for more frequent compaction.

Tombstone Records

What are Tombstones

# Tombstone = key with null value
# Signals deletion of a key

# Writing a tombstone
producer.send('user-sessions', key=b'user123', value=None)

# Tombstone behavior:
# 1. Tombstone is written to topic
# 2. During compaction, tombstone deletes the key
# 3. Tombstone is removed after delete.retention.ms

# Tombstone retention
tombstone_retention = {
    'delete.retention.ms': 86400000,  # 24 hours
    'purpose': 'Allow consumers to see deletion',
    'cleanup': 'Tombstone removed after retention period'
}

Tombstone Handling

class TombstoneHandler:
    """
    Handle tombstone records in compacted topics.
    """
    
    def __init__(self):
        self.active_keys = {}
    
    def process_message(self, key, value):
        """
        Process message from compacted topic.
        
        If value is None, it's a tombstone (deletion).
        """
        if value is None:
            # Tombstone - key should be deleted
            if key in self.active_keys:
                del self.active_keys[key]
                log.info(f"Key {key} deleted via tombstone")
        else:
            # Regular message - update state
            self.active_keys[key] = value
    
    def get_state(self):
        """Get current state (all non-deleted keys)"""
        return self.active_keys.copy()

Changelog Topics

How Changelog Topics Work

Architecture Diagram
Kafka Streams Changelog:

1. Processor maintains local state store
2. State changes written to changelog topic
3. Changelog topic is compacted
4. On failure, state rebuilt from changelog

Processor Flow:
  Input Event → Update State → Write to Changelog Topic
  
State Recovery:
  Read Changelog Topic → Rebuild Local State → Resume Processing

Example:
  Event: {user: "alice", action: "login"}
  State Update: {alice: {login_count: 1}}
  Changelog: alice → {login_count: 1}
  
  Event: {user: "alice", action: "purchase"}
  State Update: {alice: {login_count: 1, purchases: 1}}
  Changelog: alice → {login_count: 1, purchases: 1}

Changelog Topic Configuration

from kafka.streams import StreamsBuilder, Materialized
from kafka.streams.state import RocksDBStore

# Changelog topic configuration
changelog_config = {
    'changelog.replication.factor': 3,
    'changelog.num.partitions': 1,
    'changelog.retention.ms': 604800000,  # 7 days
    'changelog.cleanup.policy': 'compact'
}

# Materialized state store with changelog
builder = StreamsBuilder()

stream = builder.stream('input-topic')

# Aggregate with state store
aggregated = (
    stream
    .group_by_key()
    .aggregate(
        initializer=lambda: {'count': 0},
        aggregator=lambda key, value, agg: {
            'count': agg['count'] + 1,
            'last_value': value
        },
        materialized=Materialized.as('user-counts')
            .with_key_serde(Serdes.String())
            .with_value_serde(JsonSerde())
            .with_logging_enabled()  # Enable changelog
            .with_config(changelog_config)
    )
)

Changelog Topic Naming

# Changelog topic naming convention
# {application.id}-{store.name}-changelog

# Example:
# application.id: user-analytics
# store.name: user-counts
# changelog topic: user-analytics-user-counts-changelog

# Internal topics:
# - {app.id}-{store.name}-changelog (state store backup)
# - {app.id}-{store.name}-repartition (repartitioning)
# - {app.id}-{store.name}-compact (compacted store)

ℹ️Changelog Purpose

Changelog topics serve as durable backups for Kafka Streams state stores. If a node crashes, the state is rebuilt from the changelog. This enables exactly-once semantics by ensuring state consistency.

Log Deduplication

Deduplication Strategies

# Strategy 1: Compacted topic with latest value
# Producer sends multiple values for same key
# Compaction keeps only the latest

# Strategy 2: Deduplication window
class DeduplicationWindow:
    """
    Deduplicate messages within a time window.
    """
    
    def __init__(self, window_size_ms=60000):
        self.window_size = window_size_ms
        self.seen = {}  # key -> timestamp
    
    def is_duplicate(self, key, message_id):
        """Check if message is duplicate"""
        current_time = int(time.time() * 1000)
        
        # Clean old entries
        self.seen = {
            k: v for k, v in self.seen.items()
            if current_time - v < self.window_size
        }
        
        # Check if seen
        if message_id in self.seen:
            return True
        
        # Mark as seen
        self.seen[message_id] = current_time
        return False

# Strategy 3: Bloom filter for probabilistic dedup
class BloomFilterDedup:
    """
    Probabilistic deduplication using Bloom filter.
    
    Lower memory, but false positives possible.
    """
    
    def __init__(self, expected_items=1000000, false_positive_rate=0.01):
        from pybloom_live import BloomFilter
        self.bloom = BloomFilter(
            capacity=expected_items,
            error_rate=false_positive_rate
        )
    
    def is_duplicate(self, message_id):
        """Check if message might be duplicate"""
        if message_id in self.bloom:
            return True  # Might be duplicate (false positive possible)
        self.bloom.add(message_id)
        return False

Deduplication Formula

False Positive Rate=(1ekn/m)k\text{False Positive Rate} = (1 - e^{-kn/m})^k

Where:

  • mm = filter size (bits)
  • nn = number of elements
  • kk = number of hash functions
def calculate_bloom_filter_size(
    expected_items: int,
    false_positive_rate: float
) -> dict:
    """
    Calculate optimal Bloom filter parameters.
    """
    import math
    
    # Optimal filter size
    m = - (expected_items * math.log(false_positive_rate)) / (math.log(2) ** 2)
    
    # Optimal number of hash functions
    k = (m / expected_items) * math.log(2)
    
    # Memory in bytes
    memory_bytes = m / 8
    
    return {
        'filter_size_bits': int(m),
        'num_hash_functions': int(k),
        'memory_bytes': int(memory_bytes),
        'memory_mb': memory_bytes / (1024 * 1024)
    }

# Example
bloom = calculate_bloom_filter_size(
    expected_items=1000000,
    false_positive_rate=0.01
)
print(f"Filter size: {bloom['memory_mb']:.1f} MB")
print(f"Hash functions: {bloom['num_hash_functions']}")

⚠️Deduplication Limitation

Compacted topics only guarantee at-least-once delivery until compaction occurs. For strict exactly-once, you need idempotent producers or application-level deduplication.

Use Cases for Compacted Topics

Materialized View Pattern

# Materialized view using compacted topic
class MaterializedView:
    """
    Maintain materialized view in compacted topic.
    
    Each update writes the complete current state.
    Compaction ensures only latest state per key.
    """
    
    def __init__(self, topic, producer):
        self.topic = topic
        self.producer = producer
        self.state = {}
    
    def update(self, key, value):
        """
        Update materialized view.
        
        Writes complete state to compacted topic.
        Old values are cleaned up by compaction.
        """
        # Merge with existing state
        if key in self.state:
            self.state[key].update(value)
        else:
            self.state[key] = value
        
        # Write to compacted topic
        self.producer.send(
            self.topic,
            key=key.encode(),
            value=json.dumps(self.state[key]).encode()
        )
    
    def delete(self, key):
        """Delete key via tombstone"""
        if key in self.state:
            del self.state[key]
        
        # Write tombstone
        self.producer.send(
            self.topic,
            key=key.encode(),
            value=None
        )

Event Sourcing Pattern

# Event sourcing with compacted topics
class EventSourcing:
    """
    Event sourcing using compacted topics.
    
    Events are the source of truth.
    Compacted topic maintains current state.
    """
    
    def __init__(self, events_topic, state_topic, producer):
        self.events_topic = events_topic
        self.state_topic = state_topic
        self.producer = producer
    
    def append_event(self, event):
        """
        Append event to event log.
        """
        self.producer.send(
            self.events_topic,
            key=event['entity_id'].encode(),
            value=json.dumps(event).encode()
        )
    
    def update_state(self, entity_id, state):
        """
        Update state in compacted topic.
        
        State is derived from events.
        Compaction keeps only latest state.
        """
        self.producer.send(
            self.state_topic,
            key=entity_id.encode(),
            value=json.dumps(state).encode()
        )

Configuration Store Pattern

# Configuration store using compacted topics
class ConfigurationStore:
    """
    Distributed configuration store using compacted topics.
    
    Configuration updates written to compacted topic.
    Consumers read latest configuration.
    """
    
    def __init__(self, config_topic, producer, consumer):
        self.config_topic = config_topic
        self.producer = producer
        self.consumer = consumer
        self.config = {}
    
    def update_config(self, key, value):
        """Update configuration"""
        self.producer.send(
            self.config_topic,
            key=key.encode(),
            value=json.dumps(value).encode()
        )
    
    def load_config(self):
        """
        Load configuration from compacted topic.
        
        Read all messages, keeping only latest per key.
        """
        self.consumer.subscribe([self.config_topic])
        
        for message in self.consumer:
            key = message.key.decode()
            value = json.loads(message.value.decode()) if message.value else None
            
            if value is None:
                # Tombstone - delete config
                if key in self.config:
                    del self.config[key]
            else:
                self.config[key] = value
        
        return self.config

ℹ️Key Insight

Compacted topics are the foundation for:

  1. Stream processing state: Changelog topics backup state stores
  2. Materialized views: Current state per key
  3. Configuration distribution: Latest config per component
  4. Event sourcing: Derived state from event logs

Compaction Performance

def estimate_compaction_performance(
    topic_size_gb: int,
    dirty_ratio: float,
    compaction_throughput_mbps: float
) -> dict:
    """
    Estimate compaction time and resource usage.
    """
    # Data to compact
    dirty_data_gb = topic_size_gb * dirty_ratio
    
    # Compaction time
    compaction_time_seconds = (dirty_data_gb * 1024) / compaction_throughput_mbps
    
    return {
        'topic_size_gb': topic_size_gb,
        'dirty_data_gb': dirty_data_gb,
        'compaction_time_seconds': compaction_time_seconds,
        'compaction_time_minutes': compaction_time_seconds / 60,
        'recommended_min_cleanable_dirty_ratio': 0.3
    }

# Example
perf = estimate_compaction_performance(
    topic_size_gb=100,
    dirty_ratio=0.5,
    compaction_throughput_mbps=100
)
print(f"Compaction time: {perf['compaction_time_minutes']:.1f} minutes")

⚠️Compaction Cost

Compaction is CPU and I/O intensive. Monitor:

  • LogEndOffset vs LogStartOffset for uncleaned data
  • CleanerRecleanPercent for compaction efficiency
  • BytesInPerSec during compaction

Advertisement