Exactly-Once Semantics
Difficulty: Senior Level | Companies: LinkedIn, Uber, Netflix, Spotify, Confluent
Content
Exactly-once semantics (EOS) ensures each message is processed exactly once, even in failure scenarios. This is critical for financial transactions and data integrity.
Delivery Semantics Comparison
Architecture Diagram
At-Most-Once:
- Message may be lost
- No retries
- Fast but unreliable
At-Least-Once:
- Message may be duplicated
- Retries on failure
- Requires idempotent consumers
Exactly-Once:
- Each message processed once
- Combines idempotent producers + transactions
- Requires careful configuration
Idempotent Producers
Prevent duplicate messages from producer retries:
// Enable idempotent producer
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Critical settings for exactly-once
props.put("enable.idempotence", true);
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", 5); // Safe with idempotence
Producer<String, String> producer = new KafkaProducer<>(props);
// Each message gets unique ID (PID + sequence number)
// Broker deduplicates based on PID and sequence
Idempotence Internals
Architecture Diagram
Producer sends message:
1. Producer has unique PID (Producer ID)
2. Each message has sequence number (0, 1, 2, ...)
3. Broker tracks: (PID, Topic, Partition) β Last Sequence
Duplicate Detection:
- Sequence N arrives
- If Last Sequence == N-1: accept, update Last Sequence to N
- If Last Sequence >= N: reject (duplicate)
- If Last Sequence < N-1: reject (out of order)
β οΈ
Important: Idempotent producers only prevent duplicates from producer retries. They don't handle consumer retries or application-level failures. For full exactly-once, use transactions.
Transactional Producers
Enable atomic writes across multiple partitions:
// Transactional producer setup
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("transactional.id", "order-processor-tx-1"); // Unique per instance
props.put("enable.idempotence", true);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
// Write to multiple partitions atomically
producer.send(new ProducerRecord<>("orders", "order-123", orderData));
producer.send(new ProducerRecord<>("inventory", "item-456", inventoryUpdate));
producer.send(new ProducerRecord<>("payments", "pay-789", paymentRecord));
// Commit consumer offsets as part of transaction
producer.sendOffsetsToTransaction(offsets, consumerGroupId);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
throw e;
}
Consumer Configuration
// Consumer must read committed messages
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("group.id", "order-processor");
props.put("enable.auto.commit", false);
// Read only committed messages
props.put("isolation.level", "read_committed");
// Alternative: read uncommitted (default)
props.put("isolation.level", "read_uncommitted");
Isolation Levels
Architecture Diagram
read_uncommitted (default):
- Sees all messages, including uncommitted
- Lower latency
- May see transaction aborts
read_committed:
- Sees only committed messages
- Higher latency (waits for transaction commit)
- Guaranteed consistency
End-to-End Exactly-Once Pattern
// Complete exactly-once processing pattern
public class ExactlyOnceProcessor {
public void process() {
KafkaConsumer<String, String> consumer = createConsumer();
KafkaProducer<String, String> producer = createTransactionalProducer();
producer.initTransactions();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) continue;
try {
producer.beginTransaction();
// Process and produce
for (ConsumerRecord<String, String> record : records) {
// Transform message
OutputMessage output = transform(record.value());
// Produce to output topic
producer.send(new ProducerRecord<>(
"output-topic",
record.key(),
output.toString()
));
}
// Commit consumer offsets within transaction
Map<TopicPartition, OffsetAndMetadata> offsets = getOffsets(records);
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
// Log error, continue processing
}
}
}
}
Python Exactly-Once Pattern
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import json
class ExactlyOnceProcessor:
def __init__(self):
self.producer = KafkaProducer(
bootstrap_servers=['kafka1:9092'],
transactional_id='order-processor-tx-1',
key_serializer=lambda k: k.encode('utf-8'),
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
self.consumer = KafkaConsumer(
'input-topic',
bootstrap_servers=['kafka1:9092'],
group_id='order-processor',
enable_auto_commit=False,
isolation_level='read_committed',
auto_offset_reset='earliest'
)
self.producer.init_transactions()
def process(self):
while True:
records = self.consumer.poll(timeout_ms=1000)
if not records:
continue
try:
self.producer.begin_transaction()
for tp, messages in records.items():
for message in messages:
output = self.transform(message.value)
self.producer.send(
'output-topic',
key=message.key,
value=output
)
# Commit offsets within transaction
self.producer.send_offsets_to_transaction(
{tp: self.consumer.position(tp) for tp in records.keys()},
self.consumer.group_metadata()
)
self.producer.commit_transaction()
except KafkaError as e:
self.producer.abort_transaction()
print(f"Transaction failed: {e}")
def transform(self, message):
# Your transformation logic
return {'processed': True, 'data': message}
# Run processor
processor = ExactlyOnceProcessor()
processor.process()
Common Pitfalls
1. Transaction Timeouts
// Default transaction timeout: 60 seconds
props.put("transaction.timeout.ms", "60000");
// If processing takes longer than timeout:
// - Transaction is aborted
// - Consumer sees duplicate (if retrying)
// - Need to handle appropriately
2. Consumer Rebalances During Transaction
Architecture Diagram
Problem:
1. Consumer starts transaction
2. Rebalance occurs
3. Transaction aborted
4. Consumer resumes, reprocesses
Solution:
- Use static group membership
- Implement proper offset management
- Handle rebalance in ConsumerRebalanceListener
3. Idempotent Producer + Transaction Interaction
// Transactional producer automatically enables idempotence
// Don't set both explicitly
props.put("transactional.id", "my-tx-id"); // Implies idempotence
// props.put("enable.idempotence", true); // Not needed
Follow-Up Questions
- What is the difference between idempotent producers and transactional producers?
- How does
isolation.level=read_committedaffect consumer behavior? - Explain what happens when a transaction times out during processing.
- How would you handle consumer rebalances within a transactional consumer?
- What are the limitations of exactly-once semantics in Kafka?