Exactly-Once Semantics: Transactions, Idempotent Producer
Difficulty: Staff | Asked at: Google, Confluent, Uber, Stripe
βΉοΈInterview Context
Exactly-once semantics (EOS) is a cornerstone of modern data systems. This question tests your understanding of Kafka's transactional model and how to achieve true exactly-once delivery across producers and consumers.
The Question
How does Apache Kafka achieve exactly-once semantics? Explain the role of idempotent producers, the transactional API, and how end-to-end exactly-once delivery works. What are the limitations?
Exactly-Once Delivery Semantics
Definition
- At-Least-Once: Message delivered at least once (retry-safe)
- At-Most-Once: Message delivered at most once (no duplicates)
- Exactly-Once: Message delivered exactly once (no duplicates, no loss)
Kafka's Three Levels of Delivery Guarantees
# Level 1: At-Least-Once (default)
producer = KafkaProducer(
acks='all',
retries=3
# No idempotence β duplicates possible on retry
)
# Level 2: At-Most-Once
producer = KafkaProducer(
acks='0',
retries=0
# No retry β messages may be lost
)
# Level 3: Exactly-Once (requires idempotence)
producer = KafkaProducer(
enable_idempotence=True,
acks='all',
retries=2147483647
# Duplicates prevented by sequence numbers
)
β οΈKey Distinction
Kafka's exactly-once is NOT about exactly-once delivery to the consumer. It's about exactly-once processing within the Kafka ecosystem. The consumer still sees messages at-least-once unless using transactions with read_committed isolation.
Idempotent Producer
Sequence Number Protocol
Each producer gets a unique Producer ID (PID) and assigns monotonically increasing sequence numbers to each message:
from kafka import KafkaProducer
from kafka.errors import DuplicateSequenceError
class IdempotentProducer:
def __init__(self):
self.producer = KafkaProducer(
bootstrap_servers='localhost:9092',
enable_idempotence=True,
acks='all',
retries=2147483647,
max_in_flight_requests_per_connection=5,
delivery_timeout_ms=120000
)
self.sequence_numbers = {} # partition -> seq_num
def send(self, topic, key, value):
"""
Send message with automatic deduplication.
If network error occurs:
1. Producer retries the message
2. Broker receives duplicate (same PID + Partition + SeqNum)
3. Broker detects duplicate, returns success without appending
4. Consumer sees message exactly once
"""
future = self.producer.send(topic, key=key, value=value)
try:
metadata = future.get(timeout=30)
return metadata
except DuplicateSequenceError:
# Message was already committed
# This is safe - idempotency working as expected
return None
def send_batch(self, topic, messages):
"""
Send multiple messages with idempotency.
Sequence numbers are assigned per partition automatically.
"""
futures = []
for key, value in messages:
future = self.producer.send(topic, key=key, value=value)
futures.append(future)
# Wait for all acknowledgments
results = []
for future in futures:
try:
metadata = future.get(timeout=30)
results.append(metadata)
except DuplicateSequenceError:
results.append(None)
return results
Idempotent Producer Limitations
Limitations:
1. Per-partition ordering only
- Sequences are per (PID, Partition) pair
- Cross-partition ordering NOT guaranteed
2. Max 5 in-flight requests (with idempotence)
- Without idempotence: unlimited
- With idempotence: max 5 for sequence ordering
3. Producer restart resets sequences
- New PID assigned on restart
- Old PID's sequences expire after retention
4. No cross-producer deduplication
- Each producer has unique PID
- Two producers can send same data (different PIDs)
5. Single transactional.id required for transactions
- Multiple producers cannot share transactional ID
βΉοΈIdempotence vs Transactions
Idempotent producer handles deduplication within a single producer instance. Transactions extend this to atomic multi-partition writes. For true exactly-once across multiple producers or consumer-producer loops, you need transactions.
Transactional API
How Transactions Work
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Transaction Flow β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Producer β
β βββββββββββββββ β
β βinitTxn() β β Coordinator registers transactional.id
β ββββββββ¬βββββββ β
β β β
β βββββββββββββββ β
β βbeginTransaction()β β Begin collecting messages β
β ββββββββ¬βββββββ β
β β β
β βββββββββββββββ β
β βsend() β β Messages buffered (not committed) β
β βsend() β β
β βsend() β β
β ββββββββ¬βββββββ β
β β β
β βββββββββββββββ β
β βcommitTxn() β β Atomic commit to all partitions β
β ββββββββ¬βββββββ β
β β β
β Messages visible to read_committed consumers β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Transaction Coordinator
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import ProducerFenced
class TransactionalProducer:
def __init__(self, transactional_id):
self.producer = KafkaProducer(
bootstrap_servers='localhost:9092',
# Transactional configuration
transactional_id=transactional_id,
enable_idempotence=True,
acks='all',
retries=2147483647,
# Performance
batch_size=32768,
linger_ms=10,
compression_type='lz4'
)
# Initialize transaction
self.producer.init_transactions()
def produce_atomically(self, topic, messages):
"""
Produce messages atomically.
Either ALL messages commit or NONE do.
"""
try:
self.producer.begin_transaction()
for key, value in messages:
self.producer.send(topic, key=key, value=value)
self.producer.commit_transaction()
return True
except ProducerFenced as e:
# Another producer with same transactional.id started
# This producer is now fenced (invalidated)
self.producer.close()
raise
except Exception as e:
# Any error β abort transaction
self.producer.abort_transaction()
raise
# Usage
txn_producer = TransactionalProducer('my-app-txn-1')
txn_producer.produce_atomically('events', [
(b'user1', b'login'),
(b'user2', b'logout'),
(b'user3', b'purchase')
])
Transaction Coordinator Protocol
Transaction Coordinator (runs on a broker):
1. Transaction Registration:
- Producer calls initTransactions()
- Coordinator maps transactional.id β producer PID
- If existing PID exists, it's fenced (invalidated)
2. Transaction Begin:
- Producer calls beginTransaction()
- Coordinator creates transaction metadata
- State: ONGOING
3. Message Production:
- Producer sends messages to partition leaders
- Leaders forward to coordinator for transaction logging
- Messages are buffered, not visible to consumers
4. Transaction Commit:
- Producer calls commitTransaction()
- Coordinator writes COMMIT marker to transaction log
- Partition leaders apply COMMIT to their logs
- Messages become visible to read_committed consumers
5. Transaction Abort:
- Producer calls abortTransaction()
- Coordinator writes ABORT marker
- Messages are discarded (never visible)
β οΈTransaction Timeout
Transactions have a default timeout of 60 seconds (transaction.timeout.ms). If a producer crashes during a transaction, the coordinator will abort it after the timeout. Ensure your transaction processing completes within this window.
Exactly-Once Consumer (read_committed)
from kafka import KafkaConsumer
# Consumer with read_committed isolation
consumer = KafkaConsumer(
'events',
group_id='exactly-once-consumer',
bootstrap_servers='localhost:9092',
# Critical for exactly-once
isolation_level='read_committed',
# Manual offset commit for control
enable_auto_commit=False,
# Read from beginning for replay
auto_offset_reset='earliest'
)
def consume_exactly_once():
"""
Consume messages with exactly-once semantics.
Messages are only visible after transaction commit.
Aborted messages are never seen by consumer.
"""
while True:
messages = consumer.poll(timeout_ms=1000)
for topic_partition, records in messages.items():
for record in records:
# record.value is the actual message
# Messages from aborted transactions are filtered out
process_message(record)
# Commit offset after processing
consumer.commit()
Transactional Consumer-Producer (Exactly-Once Pipeline)
class ExactlyOncePipeline:
"""
Consumer-Producer pipeline with exactly-once semantics.
Reads from input topic, processes, writes to output topic.
"""
def __init__(self, input_topic, output_topic, transactional_id):
# Consumer reads from input
self.consumer = KafkaConsumer(
input_topic,
group_id='exactly-once-pipeline',
bootstrap_servers='localhost:9092',
isolation_level='read_committed',
enable_auto_commit=False,
auto_offset_reset='earliest'
)
# Producer writes to output with transactions
self.producer = KafkaProducer(
bootstrap_servers='localhost:9092',
transactional_id=transactional_id,
enable_idempotence=True,
acks='all'
)
self.producer.init_transactions()
def process_batch(self, max_messages=100):
"""
Process a batch of messages atomically.
Either:
- All messages processed and offsets committed
- Nothing processed, offsets not committed
"""
try:
self.producer.begin_transaction()
messages_processed = 0
offsets_to_commit = {}
# Read messages
messages = self.consumer.poll(timeout_ms=1000)
for topic_partition, records in messages.items():
for record in records:
# Transform message
transformed = self.transform(record.value)
# Send to output topic
self.producer.send(
'output-topic',
key=record.key,
value=transformed
)
messages_processed += 1
# Track offset
offsets_to_commit[topic_partition] = (
record.offset + 1 # Next offset to read
)
if messages_processed == 0:
self.producer.abort_transaction()
return 0
# Commit consumer offsets as part of transaction
self.producer.send_offsets_to_transaction(
offsets_to_commit,
self.consumer.group_metadata()
)
# Commit everything atomically
self.producer.commit_transaction()
return messages_processed
except Exception as e:
self.producer.abort_transaction()
raise
def transform(self, message):
"""Transform message (your business logic)"""
return message.upper() # Example transformation
βΉοΈCritical Pattern
send_offsets_to_transaction() is the key to end-to-end exactly-once. It atomically commits both the produced messages AND the consumer offsets. If either fails, both are rolled back.
Transaction Isolation Levels
# Three isolation levels for consumers
# 1. read_uncommitted (default)
# Sees ALL messages, including uncommitted and aborted
consumer = KafkaConsumer(
'topic',
isolation_level='read_uncommitted'
)
# 2. read_committed
# Only sees committed messages
# Aborted messages filtered out
consumer = KafkaConsumer(
'topic',
isolation_level='read_committed'
)
# 3. read_committed with consumer
# Sees messages in transaction order
# Guarantees: committed messages appear in order
# Aborted messages are skipped (not visible)
consumer = KafkaConsumer(
'topic',
isolation_level='read_committed'
)
Producer Fencing
from kafka.errors import ProducerFenced
class FencedProducerHandler:
"""
Handle producer fencing gracefully.
Fencing occurs when:
1. Producer A starts transaction with transactional.id="app-1"
2. Producer A crashes
3. Producer B starts with same transactional.id="app-1"
4. Producer A recovers and tries to commit
5. Coordinator fences Producer A (invalidates it)
6. Producer A gets ProducerFenced exception
"""
def __init__(self, transactional_id):
self.transactional_id = transactional_id
self.producer = None
self._initialize_producer()
def _initialize_producer(self):
self.producer = KafkaProducer(
bootstrap_servers='localhost:9092',
transactional_id=self.transactional_id,
enable_idempotence=True,
acks='all',
retries=2147483647
)
self.producer.init_transactions()
def produce_with_fencing_handling(self, topic, messages):
"""
Produce messages with fencing detection.
On ProducerFenced:
1. Close old producer
2. Create new producer (gets new PID)
3. Retry operation
"""
max_retries = 3
for attempt in range(max_retries):
try:
self.producer.begin_transaction()
for key, value in messages:
self.producer.send(topic, key=key, value=value)
self.producer.commit_transaction()
return True
except ProducerFenced:
log.warning("Producer fenced, creating new producer")
self.producer.close()
self._initialize_producer()
except Exception as e:
self.producer.abort_transaction()
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt)
return False
Transaction Timeout Handling
import time
from kafka import KafkaProducer
class TransactionWithTimeout:
"""
Handle long-running transactions with timeout management.
"""
def __init__(self, transactional_id, timeout_ms=60000):
self.producer = KafkaProducer(
bootstrap_servers='localhost:9092',
transactional_id=transactional_id,
transaction_timeout_ms=timeout_ms,
enable_idempotence=True,
acks='all'
)
self.producer.init_transactions()
self.transaction_timeout = timeout_ms
def produce_with_chunking(self, topic, large_dataset):
"""
Chunk large datasets to fit within transaction timeout.
If processing takes longer than transaction timeout:
1. Commit partial batch
2. Start new transaction
3. Continue processing
"""
chunk_size = 1000
total_committed = 0
start_time = time.time()
for i in range(0, len(large_dataset), chunk_size):
# Check if we're approaching timeout
elapsed = (time.time() - start_time) * 1000
if elapsed > self.transaction_timeout * 0.8:
# Commit current batch
self.producer.commit_transaction()
total_committed += i
# Start new transaction
self.producer.begin_transaction()
start_time = time.time()
# Process chunk
chunk = large_dataset[i:i + chunk_size]
for record in chunk:
self.producer.send(topic, value=record)
# Final commit
self.producer.commit_transaction()
return total_committed + len(large_dataset)
β οΈTransaction Timeout Warning
Set transaction.timeout.ms high enough for your processing but low enough to detect dead producers. Default 60s works for most cases. For batch processing, increase to 300s or use chunking.
Exactly-Once Formula
def calculate_exactly_once_overhead(
message_rate_per_sec: int,
avg_message_size: int,
batch_size: int,
coordinator_rtt_ms: float
) -> dict:
"""
Calculate overhead of exactly-once semantics.
"""
# Sequence number overhead (8 bytes per message)
seq_overhead_per_msg = 8 # bytes
seq_overhead_per_sec = seq_overhead_per_msg * message_rate_per_sec
# Batch overhead
batches_per_sec = message_rate_per_sec / (batch_size / avg_message_size)
# Transaction overhead (1 RTT per transaction)
transaction_overhead_per_sec = batches_per_sec * coordinator_rtt_ms / 1000
# Total overhead
total_overhead_bytes = seq_overhead_per_sec + transaction_overhead_per_sec * avg_message_size
return {
'sequence_overhead_mbps': seq_overhead_per_sec / (1024 * 1024),
'transaction_overhead_mbps': transaction_overhead_per_sec * avg_message_size / (1024 * 1024),
'total_overhead_percentage': (total_overhead_bytes / (message_rate_per_sec * avg_message_size)) * 100
}
# Example
overhead = calculate_exactly_once_overhead(
message_rate_per_sec=100000,
avg_message_size=500,
batch_size=32768,
coordinator_rtt_ms=10
)
print(f"Sequence overhead: {overhead['sequence_overhead_mbps']:.2f} MB/s")
print(f"Transaction overhead: {overhead['transaction_overhead_mbps']:.2f} MB/s")
print(f"Total overhead: {overhead['total_overhead_percentage']:.2f}%")
Common Exactly-Once Patterns
Pattern 1: Kafka β Kafka Exactly-Once
Producer: Transactional with send_offsets_to_transaction()
Consumer: read_committed isolation
Result: Messages processed exactly once
Pattern 2: Kafka β Database Exactly-Once
Producer: Transactional to Kafka
Consumer: Process and write to DB
Offset commit: In same DB transaction (dual write problem)
Solution: Use database transaction to atomically write + commit offset
Pattern 3: Database β Kafka Exactly-Once
Source: CDC (Change Data Capture) with transactions
Kafka: Transactional producer
Result: Each DB change appears exactly once in Kafka
Pattern 4: Exactly-Once Aggregation
Input: Kafka topic with events
Processing: Kafka Streams with EOS
Output: Kafka topic with aggregated results
Result: Exactly-once aggregation
βΉοΈProduction Tip
For most use cases, at-least-once with idempotent consumer processing is sufficient. Use exactly-once transactions only when:
- Duplicate processing is expensive (e.g., financial transactions)
- You need atomic multi-topic writes
- You need consumer-producer pipeline atomicity
Exactly-Once vs At-Least-Once Comparison
| Aspect | At-Least-Once | Exactly-Once |
|---|---|---|
| Duplicates | Possible | Eliminated |
| Throughput | Higher | Lower (5-15%) |
| Latency | Lower | Higher |
| Complexity | Simple | Complex |
| Consumer Logic | Simple retry | Transaction-aware |
| Use Cases | Most applications | Financial, critical |
β οΈKey Insight
Kafka's exactly-once is a system-level guarantee, not an end-to-end guarantee across external systems. If your consumer writes to a database and crashes after processing but before committing offsets, you'll process duplicates. True end-to-end exactly-once requires careful system design.